我如何才能查看 Django 正在运行的原始 SQL 查询?

2025-01-08 08:50:00
admin
原创
111
摘要:问题描述:有没有办法在执行查询时显示 Django 正在运行的 SQL?解决方案 1:请参阅文档常见问题解答:“如何查看 Django 正在运行的原始 SQL 查询? ”django.db.connection.queries包含 SQL 查询列表:from django.db import connecti...

问题描述:

有没有办法在执行查询时显示 Django 正在运行的 SQL?


解决方案 1:

请参阅文档常见问题解答:“如何查看 Django 正在运行的原始 SQL 查询? ”

django.db.connection.queries包含 SQL 查询列表:

from django.db import connection
print(connection.queries)

查询集还具有一个包含要执行的查询的query属性:

print(MyModel.objects.filter(name="my name").query)

请注意,查询的输出不是有效的 SQL,因为:

“Django 实际上从未插入参数:它将查询和参数分别发送到数据库适配器,然后执行适当的操作。”

来自 Django 错误报告#17741。

因此,您不应将查询输出直接发送到数据库。

如果您需要重置查询,例如,查看给定时间段内运行了多少个查询,则可以使用reset_queriesfrom django.db

from django.db import reset_queries
from django.db import connection

reset_queries()
# Run your query here
print(connection.queries)
>>> []

解决方案 2:

Django-extensions有一个带参数的命令shell_plusprint-sql

./manage.py shell_plus --print-sql

在 django-shell 中,所有执行的查询都将被打印

例子:

User.objects.get(pk=1)
SELECT "auth_user"."id",
       "auth_user"."password",
       "auth_user"."last_login",
       "auth_user"."is_superuser",
       "auth_user"."username",
       "auth_user"."first_name",
       "auth_user"."last_name",
       "auth_user"."email",
       "auth_user"."is_staff",
       "auth_user"."is_active",
       "auth_user"."date_joined"
FROM "auth_user"
WHERE "auth_user"."id" = 1

Execution time: 0.002466s [Database: default]

<User: username>

解决方案 3:

看一下debug_toolbar,它对于调试非常有用。

文档和源代码可在http://django-debug-toolbar.readthedocs.io/上找到。

调试工具栏的屏幕截图

解决方案 4:

该查询实际上嵌入在模型 API 中:

q = Query.objects.values('val1','val2','val_etc')

print(q.query)

解决方案 5:

我发现迄今为止最有用、最简单、最可靠的方法是询问您的数据库。例如,在 Linux 上对于 Postgres,您可以执行以下操作:

sudo su postgres
tail -f /var/log/postgresql/postgresql-8.4-main.log

每个数据库的程序略有不同。在数据库日志中,您不仅会看到原始 SQL,还会看到 django 在系统上设置的任何连接或事务开销。

解决方案 6:

我想介绍一种日志记录方法,它非常简单;django.db.backends在 settings.py 中添加 logger

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
        },
    },
    'loggers': {
        'django.db.backends': {
            'handlers': ['console'],
            'level': 'DEBUG',
        },
    },
}

我还使用环境变量来设置级别。因此,当我想查看 SQL 查询时,我只需设置环境变量,调试日志就会显示实际查询。

解决方案 7:

另一个选项,请参阅此帖子描述的 settings.py 中的日志记录选项

http://dabapps.com/blog/logging-sql-queries-django-13/

debug_toolbar 会减慢开发服务器上每个页面的加载速度,而日志记录不会减慢速度,因此速度更快。输出可以转储到控制台或文件,因此 UI 不太好。但对于包含大量 SQL 的视图,通过 debug_toolbar 调试和优化 SQL 可能需要很长时间,因为每个页面的加载速度都很慢。

解决方案 8:

虽然您可以使用提供的代码来完成此操作,但我发现使用调试工具栏应用程序是显示查询的绝佳工具。您可以从此处的github 下载它。

这为您提供了显示给定页面上运行的所有查询以及查询所用时间的选项。它还会汇总页面上的查询数量以及总时间,以便快速查看。当您想查看 Django ORM 在幕后做什么时,这是一个很棒的工具。它还有很多其他不错的功能,您可以根据需要使用它们。

解决方案 9:

为此,我开发了一个扩展,因此您可以轻松地在视图函数上放置一个装饰器并查看执行了多少个查询。

安装:

pip install django-print-sql

用作上下文管理器:

from django_print_sql import print_sql

# Set `count_only` to `True` will print the number of executed SQL statements only
with print_sql(count_only=False):

  # Write the code you want to analyze in here,
  # e.g., some complex foreign key lookup,
  # or analyzing a DRF serializer's performance

  for user in User.objects.all()[:10]:
      user.groups.first()

用作装饰器:

from django_print_sql import print_sql_decorator


@print_sql_decorator(count_only=False)  # This works on class-based views as well
def get(request):
    # Your view code here

GitHub:django-print-sql

解决方案 10:

补充一下,在 Django 中,如果你有如下查询:

MyModel.objects.all()

做:

MyModel.objects.all().query.sql_with_params()

或者:

str(MyModel.objects.all().query)

获取 SQL 字符串。

解决方案 11:

如果你确定你的 settings.py 文件包含:

  1. django.core.context_processors.debug列出CONTEXT_PROCESSORS

  2. DEBUG=True

  3. IPINTERNAL_IPS元组中

然后,您应该可以访问该sql_queries变量。我在每个页面后面附加了一个页脚,如下所示:

{%if sql_queries %}
  <div class="footNav">
    <h2>Queries</h2>
    <p>
      {{ sql_queries|length }} Quer{{ sql_queries|pluralize:"y,ies" }}, {{sql_time_sum}} Time
    {% ifnotequal sql_queries|length 0 %}
      (<span style="cursor: pointer;" onclick="var s=document.getElementById('debugQueryTable').style;s.disp\nlay=s.display=='none'?'':'none';this.innerHTML=this.innerHTML=='Show'?'Hide':'Show';">Show</span>)
    {% endifnotequal %}
    </p>
    <table id="debugQueryTable" style="display: none;">
      <col width="1"></col>
      <col></col>
      <col width="1"></col>
      <thead>
        <tr>
          <th scope="col">#</th>
          <th scope="col">SQL</th>
          <th scope="col">Time</th>
        </tr>
      </thead>
      <tbody>
        {% for query in sql_queries %}
          <tr class="{% cycle odd,even %}">
            <td>{{ forloop.counter }}</td>
            <td>{{ query.sql|escape }}</td>
            <td>{{ query.time }}</td>
          </tr>
        {% endfor %}
      </tbody>
    </table>
  </div>
{% endif %}

sql_time_sum我通过添加行来获取变量

context_extras['sql_time_sum'] = sum([float(q['time']) for q in connection.queries])

到 django_src/django/core/context_processors.py 中的调试函数。

解决方案 12:

Django SQL Sniffer 是查看(并查看统计信息)使用 Django ORM 的任何进程产生的原始执行查询的另一种选择。我构建它是为了满足我的一个特定用例,我从未在任何地方见过这种用例,即:

  • 目标进程正在执行的源代码无需更改(无需在 django 设置中注册新应用程序、在各处导入装饰器等)

  • 日志配置没有变化(例如,因为我对一个特定的流程感兴趣,而不是该配置适用的整个流程)

  • 不需要重新启动目标进程(例如因为它是一个重要组件,重新启动可能会导致一些停机时间)

因此,Django SQL Sniffer 可以临时使用,并附加到已运行的进程。然后,该工具会“嗅探”已执行的查询,并在执行时将其打印到控制台。当该工具停止时,会显示一个统计摘要,其中包含基于某些可能指标(计数、最大持续时间和总持续时间)的异常查询。

下面是我附加到 Python shell 的示例截图
在此处输入图片描述

您可以在github 页面上查看现场演示和更多详细信息。

解决方案 13:

根据https://code.djangoproject.com/ticket/17741,以下查询将返回为有效 SQL:

def str_query(qs):
    """
    qs.query returns something that isn't valid SQL, this returns the actual
    valid SQL that's executed: https://code.djangoproject.com/ticket/17741
    """
    cursor = connections[qs.db].cursor()
    query, params = qs.query.sql_with_params()
    cursor.execute('EXPLAIN ' + query, params)
    res = str(cursor.db.ops.last_executed_query(cursor, query, params))
    assert res.startswith('EXPLAIN ')
    return res[len('EXPLAIN '):]

解决方案 14:

我将此功能放在我的项目中的一个应用程序的实用程序文件中:

import logging
import re

from django.db import connection

logger = logging.getLogger(__name__)

def sql_logger():
    logger.debug('TOTAL QUERIES: ' + str(len(connection.queries)))
    logger.debug('TOTAL TIME: ' + str(sum([float(q['time']) for q in connection.queries])))

    logger.debug('INDIVIDUAL QUERIES:')
    for i, query in enumerate(connection.queries):
        sql = re.split(r'(SELECT|FROM|WHERE|GROUP BY|ORDER BY|INNER JOIN|LIMIT)', query['sql'])
        if not sql[0]: sql = sql[1:]
        sql = [(' ' if i % 2 else '') + x for i, x in enumerate(sql)]
        logger.debug('
### {} ({} seconds)

{};
'.format(i, query['time'], '
'.join(sql)))

然后,当需要时,我只需导入它并从必要的上下文(通常是视图)中调用它,例如:

# ... other imports
from .utils import sql_logger

class IngredientListApiView(generics.ListAPIView):
    # ... class variables and such

    # Main function that gets called when view is accessed
    def list(self, request, *args, **kwargs):
        response = super(IngredientListApiView, self).list(request, *args, **kwargs)

        # Call our function
        sql_logger()

        return response

在模板之外执行此操作很好,因为如果您有 API 视图(通常是 Django Rest Framework),它也可以在那里应用。

解决方案 15:

我相信如果你使用 PostgreSQL,这应该有效:

from django.db import connections
from app_name import models
from django.utils import timezone

# Generate a queryset, use your favorite filter, QS objects, and whatnot.
qs=models.ThisDataModel.objects.filter(user='bob',date__lte=timezone.now())

# Get a cursor tied to the default database
cursor=connections['default'].cursor()

# Get the query SQL and parameters to be passed into psycopg2, then pass
# those into mogrify to get the query that would have been sent to the backend
# and print it out. Note F-strings require python 3.6 or later.
print(f'{cursor.mogrify(*qs.query.sql_with_params())}')

解决方案 16:

如果您需要重复使用查询来处理某些自定义 SQL,那么还有另一种非常有用的方法。我曾在一个分析应用中使用过这种方法,它远远超出了 Django ORM 可以轻松完成的范围,因此我将 ORM 生成的 SQL 作为子查询包含在内。

from django.db import connection
from myapp.models import SomeModel

queryset = SomeModel.objects.filter(foo='bar')

sql_query, params = queryset.query.as_sql(None, connection)

这将为您提供带有占位符的 SQL,以及要使用的带有查询参数的元组。您可以将其直接传递给数据库:

with connection.connection.cursor(cursor_factory=DictCursor) as cursor:
    cursor.execute(sql_query, params)
    data = cursor.fetchall()

解决方案 17:

要将结果查询从 django 获取到数据库(使用正确的参数替换),
您可以使用此函数:

from django.db import connection

def print_database_query_formatted(query):
    sql, params = query.sql_with_params()
    cursor = connection.cursor()
    cursor.execute('EXPLAIN ' + sql, params)
    db_query = cursor.db.ops.last_executed_query(cursor, sql, params).replace('EXPLAIN ', '')

    parts = '{}'.format(db_query).split('FROM')
    print(parts[0])
    if len(parts) > 1:
        parts = parts[1].split('WHERE')
        print('FROM{}'.format(parts[0]))
        if len(parts) > 1:
            parts = parts[1].split('ORDER BY')
            print('WHERE{}'.format(parts[0]))
            if len(parts) > 1:
                print('ORDER BY{}'.format(parts[1]))

# USAGE
users = User.objects.filter(email='admin@admin.com').order_by('-id')
print_database_query_formatted(users.query)

输出示例

SELECT "users_user"."password", "users_user"."last_login", "users_user"."is_superuser", "users_user"."deleted", "users_user"."id", "users_user"."phone", "users_user"."username", "users_user"."userlastname", "users_user"."email", "users_user"."is_staff", "users_user"."is_active", "users_user"."date_joined", "users_user"."latitude", "users_user"."longitude", "users_user"."point"::bytea, "users_user"."default_search_radius", "users_user"."notifications", "users_user"."admin_theme", "users_user"."address", "users_user"."is_notify_when_buildings_in_radius", "users_user"."active_campaign_id", "users_user"."is_unsubscribed", "users_user"."sf_contact_id", "users_user"."is_agree_terms_of_service", "users_user"."is_facebook_signup", "users_user"."type_signup" 
FROM "users_user" 
WHERE "users_user"."email" = 'admin@admin.com' 
ORDER BY "users_user"."id" DESC

它基于此票证评论: https://code.djangoproject.com/ticket/17741#comment:4

解决方案 18:

生成Django 中立即可用的CREATE / UPDATE / DELETE / 命令的SQL

from django.db.models import sql

def generate_update_sql(queryset, update_kwargs):
    """Converts queryset with update_kwargs
    like : queryset.update(**update_kwargs) to UPDATE SQL"""

    query = queryset.query.clone(sql.UpdateQuery)
    query.add_update_values(update_kwargs)
    compiler = query.get_compiler(queryset.db)
    sql, params = compiler.as_sql()
    return sql % params
from django.db.models import sql

def generate_delete_sql(queryset):
    """Converts select queryset to DELETE SQL """
    query = queryset.query.chain(sql.DeleteQuery)
    compiler = query.get_compiler(queryset.db)
    sql, params = compiler.as_sql()
    return sql % params
from django.db.models import sql

def generate_create_sql(model, model_data):
    """Converts queryset with create_kwargs
    like if was: queryset.create(**create_kwargs) to SQL CREATE"""
    
    not_saved_instance = model(**model_data)
    not_saved_instance._for_write = True

    query = sql.InsertQuery(model)

    fields = [f for f in model._meta.local_concrete_fields if not isinstance(f, AutoField)]
    query.insert_values(fields, [not_saved_instance], raw=False)

    compiler = query.get_compiler(model.objects.db)
    sql, params = compiler.as_sql()[0]
    return sql % params

测试和使用

    def test_generate_update_sql_with_F(self):
        qs = Event.objects.all()
        update_kwargs = dict(description=F('slug'))
        result = generate_update_sql(qs, update_kwargs)
        sql = "UPDATE `api_event` SET `description` = `api_event`.`slug`"
        self.assertEqual(sql, result)

    def test_generate_create_sql(self):
        result = generate_create_sql(Event, dict(slug='a', app='b', model='c', action='e'))
        sql = "INSERT INTO `api_event` (`slug`, `app`, `model`, `action`, `action_type`, `description`) VALUES (a, b, c, e, , )"
        self.assertEqual(sql, result)

解决方案 19:

我制作了一个小片段供您使用:

from django.conf import settings
from django.db import connection


def sql_echo(method, *args, **kwargs):
    settings.DEBUG = True
    result = method(*args, **kwargs)
    for query in connection.queries:
        print(query)
    return result


# HOW TO USE EXAMPLE:
#
# result = sql_echo(my_method, 'whatever', show=True)

它以要检查的函数(包含 SQL 查询)和调用该函数所需的 args、kwargs 作为参数。结果,它返回函数返回的内容并在控制台中打印 SQL 查询。

解决方案 20:

from django.db import reset_queries, connection
class ShowSQL(object):
    def __enter__(self):
        reset_queries()
        return self

    def __exit__(self, *args):
        for sql in connection.queries:
            print('Time: %s
SQL: %s' % (sql['time'], sql['sql']))

然后您可以使用:

with ShowSQL() as t:
    some queries <select>|<annotate>|<update> or other 

它打印

  • 时间:%s

  • SQL:%s

解决方案 21:

您可以使用它connection.queries来获取在 Django 中运行的原始 SQL 查询,如下所示:

# "store/views.py"

from django.db import transaction
from .models import Person
from django.db import connection
from django.http import HttpResponse

@transaction.atomic
def test(request):
    Person.objects.create(name="John") # INSERT
    
    qs = Person.objects.select_for_update().get(name="John") # SELECT FOR UPDATE
    qs.name = "Tom"
    qs.save() # UPDATE
    qs.delete() # DELETE
                 
    for query in connection.queries: # Here
        print(query)

    return HttpResponse("Test")

然后,原始查询将打印在控制台上,如下所示:

{'sql': 'INSERT INTO "store_person" ("name") VALUES (\'John\') RETURNING "store_person"."id"', 'time': '0.000'}
{'sql': 'SELECT "store_person"."id", "store_person"."name" FROM "store_person" WHERE "store_person"."name" = \'John\' LIMIT 21 FOR UPDATE', 'time': '0.000'}      
{'sql': 'UPDATE "store_person" SET "name" = \'Tom\' WHERE "store_person"."id" = 179', 'time': '0.000'}
{'sql': 'DELETE FROM "store_person" WHERE "store_person"."id" IN (179)', 'time': '0.000'}
[24/Dec/2022 06:29:32] "GET /store/test/ HTTP/1.1" 200 9

然后,如果您只想获取查询而不获取查询,reset_queries()请将其放在后面,如下所示:Person.objects.select_for_update()UPDATE`DELETE****INSERT`SELECT FOR UPDATE

# "store/views.py"

from django.db import transaction
from .models import Person
from django.db import reset_queries
from django.db import connection
from django.http import HttpResponse

@transaction.atomic
def test(request):
    Person.objects.create(name="John") # INSERT
    
    qs = Person.objects.select_for_update().get(name="John") # SELECT FOR UPDATE
    reset_queries() # Here
    qs.name = "Tom"
    qs.save() # UPDATE
    qs.delete() # DELETE
                 
    for query in connection.queries: # Here
        print(query)

    return HttpResponse("Test")

然后,只打印UPDATEDELETE查询,而不打印INSERTSELECT FOR UPDATE查询,如下所示:

{'sql': 'UPDATE "store_person" SET "name" = \'Tom\' WHERE "store_person"."id" = 190', 'time': '0.000'}
{'sql': 'DELETE FROM "store_person" WHERE "store_person"."id" IN (190)', 'time': '0.000'}
[24/Dec/2022 07:00:01] "GET /store/test/ HTTP/1.1" 200 9

解决方案 22:

修改您的settings.py文件:

# settings.py

LOGGING = {
    'version': 1,
    'formatters': {
        'verbose': {
            'format': '{levelname} {asctime} {module} {message}',
            'style': '{',
        },
    },
    'handlers': {
        'console': {
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',
            'formatter': 'verbose',
        },
    },
    'loggers': {
        'django.db.backends': {
            'handlers': ['console'],
            'level': 'DEBUG',
            'propagate': True,
        },
    },
}

然后确保您的应用程序在 DEBUG 模式下运行,


# settings.py

DEBUG = True

立即运行您的应用程序,您将看到打印到控制台的 SQL 查询。

解决方案 23:

对于那些只想从查询本身寻找结果的人来说,有一种最简单的方法:

假设我们有一个名为的模型Musico

class Musico(models.Model):
    INSTRUMENTOS = [
        ('violao', 'Violão'),
        ('piano', 'Piano'),
        ('cavaquinho', 'Cavaquinho'),
    ]
    usuario = models.OneToOneField(User, on_delete=models.DO_NOTHING, null=True)
    primeiro_nome = models.CharField(max_length=120)
    sobrenome = models.CharField(max_length=120, null=True, blank=True)
    tipo_instrumento = models.CharField(choices=INSTRUMENTOS, max_length=200)
    idade = models.IntegerField(null=True, blank=True)

    def __str__(self):
        return f"Musico: {self.primeiro_nome}"

检查原始 SQL 查询如下:

>>> str(Musico.objects.all().query) 
'SELECT "model_lesson_app_musico"."id", "model_lesson_app_musico"."usuario_id", "model_lesson_app_musico"."primeiro_nome", "model_lesson_app_musico"."sobrenome", "model_lesson_app_musico"."tipo_instrumento", "model_lesson_app_musico"."idade" FROM "model_lesson_app_musico"'

解决方案 24:

使用 django.db.connection.queries查看查询

from django.db import connection
print(connection.queries)

访问 QuerySet 对象上的原始 SQL 查询

 qs = MyModel.objects.all()
 print(qs.query)

解决方案 25:

对于 Django 2.2:

因为大多数答案在使用时对我没什么帮助./manage.py shell。最后我找到了答案。希望这对某些人有帮助。

查看所有查询:

from django.db import connection
connection.queries

要查看单个查询的查询:

q=Query.objects.all()
q.query.__str__()

q.query只为我显示对象。使用__str__()(字符串表示)显示完整查询。

解决方案 26:

这里已经有几个很好的答案。

还有一种方法。

在测试中,执行以下操作:

with self.assertNumQueries(3):
    response = self.client.post(reverse('payments:pay_list'))
    # or whatever

如果查询数量错误,测试失败并在控制台中打印所有原始 SQL 查询。

此外,这样的测试有助于控制 SQL 查询的数量不会随着代码的变化而增加,并且数据库负载不会过大。

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用