我如何才能查看 Django 正在运行的原始 SQL 查询?
- 2025-01-08 08:50:00
- admin 原创
- 111
问题描述:
有没有办法在执行查询时显示 Django 正在运行的 SQL?
解决方案 1:
请参阅文档常见问题解答:“如何查看 Django 正在运行的原始 SQL 查询? ”
django.db.connection.queries
包含 SQL 查询列表:
from django.db import connection
print(connection.queries)
查询集还具有一个包含要执行的查询的query
属性:
print(MyModel.objects.filter(name="my name").query)
请注意,查询的输出不是有效的 SQL,因为:
“Django 实际上从未插入参数:它将查询和参数分别发送到数据库适配器,然后执行适当的操作。”
来自 Django 错误报告#17741。
因此,您不应将查询输出直接发送到数据库。
如果您需要重置查询,例如,查看给定时间段内运行了多少个查询,则可以使用reset_queries
from django.db
:
from django.db import reset_queries
from django.db import connection
reset_queries()
# Run your query here
print(connection.queries)
>>> []
解决方案 2:
Django-extensions有一个带参数的命令shell_plusprint-sql
./manage.py shell_plus --print-sql
在 django-shell 中,所有执行的查询都将被打印
例子:
User.objects.get(pk=1)
SELECT "auth_user"."id",
"auth_user"."password",
"auth_user"."last_login",
"auth_user"."is_superuser",
"auth_user"."username",
"auth_user"."first_name",
"auth_user"."last_name",
"auth_user"."email",
"auth_user"."is_staff",
"auth_user"."is_active",
"auth_user"."date_joined"
FROM "auth_user"
WHERE "auth_user"."id" = 1
Execution time: 0.002466s [Database: default]
<User: username>
解决方案 3:
看一下debug_toolbar,它对于调试非常有用。
文档和源代码可在http://django-debug-toolbar.readthedocs.io/上找到。
解决方案 4:
该查询实际上嵌入在模型 API 中:
q = Query.objects.values('val1','val2','val_etc')
print(q.query)
解决方案 5:
我发现迄今为止最有用、最简单、最可靠的方法是询问您的数据库。例如,在 Linux 上对于 Postgres,您可以执行以下操作:
sudo su postgres
tail -f /var/log/postgresql/postgresql-8.4-main.log
每个数据库的程序略有不同。在数据库日志中,您不仅会看到原始 SQL,还会看到 django 在系统上设置的任何连接或事务开销。
解决方案 6:
我想介绍一种日志记录方法,它非常简单;django.db.backends
在 settings.py 中添加 logger
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
},
},
'loggers': {
'django.db.backends': {
'handlers': ['console'],
'level': 'DEBUG',
},
},
}
我还使用环境变量来设置级别。因此,当我想查看 SQL 查询时,我只需设置环境变量,调试日志就会显示实际查询。
解决方案 7:
另一个选项,请参阅此帖子描述的 settings.py 中的日志记录选项
http://dabapps.com/blog/logging-sql-queries-django-13/
debug_toolbar 会减慢开发服务器上每个页面的加载速度,而日志记录不会减慢速度,因此速度更快。输出可以转储到控制台或文件,因此 UI 不太好。但对于包含大量 SQL 的视图,通过 debug_toolbar 调试和优化 SQL 可能需要很长时间,因为每个页面的加载速度都很慢。
解决方案 8:
虽然您可以使用提供的代码来完成此操作,但我发现使用调试工具栏应用程序是显示查询的绝佳工具。您可以从此处的github 下载它。
这为您提供了显示给定页面上运行的所有查询以及查询所用时间的选项。它还会汇总页面上的查询数量以及总时间,以便快速查看。当您想查看 Django ORM 在幕后做什么时,这是一个很棒的工具。它还有很多其他不错的功能,您可以根据需要使用它们。
解决方案 9:
为此,我开发了一个扩展,因此您可以轻松地在视图函数上放置一个装饰器并查看执行了多少个查询。
安装:
pip install django-print-sql
用作上下文管理器:
from django_print_sql import print_sql
# Set `count_only` to `True` will print the number of executed SQL statements only
with print_sql(count_only=False):
# Write the code you want to analyze in here,
# e.g., some complex foreign key lookup,
# or analyzing a DRF serializer's performance
for user in User.objects.all()[:10]:
user.groups.first()
用作装饰器:
from django_print_sql import print_sql_decorator
@print_sql_decorator(count_only=False) # This works on class-based views as well
def get(request):
# Your view code here
GitHub:django-print-sql
解决方案 10:
补充一下,在 Django 中,如果你有如下查询:
MyModel.objects.all()
做:
MyModel.objects.all().query.sql_with_params()
或者:
str(MyModel.objects.all().query)
获取 SQL 字符串。
解决方案 11:
如果你确定你的 settings.py 文件包含:
django.core.context_processors.debug
列出CONTEXT_PROCESSORS
DEBUG=True
你
IP
在INTERNAL_IPS
元组中
然后,您应该可以访问该sql_queries
变量。我在每个页面后面附加了一个页脚,如下所示:
{%if sql_queries %}
<div class="footNav">
<h2>Queries</h2>
<p>
{{ sql_queries|length }} Quer{{ sql_queries|pluralize:"y,ies" }}, {{sql_time_sum}} Time
{% ifnotequal sql_queries|length 0 %}
(<span style="cursor: pointer;" onclick="var s=document.getElementById('debugQueryTable').style;s.disp\nlay=s.display=='none'?'':'none';this.innerHTML=this.innerHTML=='Show'?'Hide':'Show';">Show</span>)
{% endifnotequal %}
</p>
<table id="debugQueryTable" style="display: none;">
<col width="1"></col>
<col></col>
<col width="1"></col>
<thead>
<tr>
<th scope="col">#</th>
<th scope="col">SQL</th>
<th scope="col">Time</th>
</tr>
</thead>
<tbody>
{% for query in sql_queries %}
<tr class="{% cycle odd,even %}">
<td>{{ forloop.counter }}</td>
<td>{{ query.sql|escape }}</td>
<td>{{ query.time }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endif %}
sql_time_sum
我通过添加行来获取变量
context_extras['sql_time_sum'] = sum([float(q['time']) for q in connection.queries])
到 django_src/django/core/context_processors.py 中的调试函数。
解决方案 12:
Django SQL Sniffer 是查看(并查看统计信息)使用 Django ORM 的任何进程产生的原始执行查询的另一种选择。我构建它是为了满足我的一个特定用例,我从未在任何地方见过这种用例,即:
目标进程正在执行的源代码无需更改(无需在 django 设置中注册新应用程序、在各处导入装饰器等)
日志配置没有变化(例如,因为我对一个特定的流程感兴趣,而不是该配置适用的整个流程)
不需要重新启动目标进程(例如因为它是一个重要组件,重新启动可能会导致一些停机时间)
因此,Django SQL Sniffer 可以临时使用,并附加到已运行的进程。然后,该工具会“嗅探”已执行的查询,并在执行时将其打印到控制台。当该工具停止时,会显示一个统计摘要,其中包含基于某些可能指标(计数、最大持续时间和总持续时间)的异常查询。
下面是我附加到 Python shell 的示例截图
您可以在github 页面上查看现场演示和更多详细信息。
解决方案 13:
根据https://code.djangoproject.com/ticket/17741,以下查询将返回为有效 SQL:
def str_query(qs):
"""
qs.query returns something that isn't valid SQL, this returns the actual
valid SQL that's executed: https://code.djangoproject.com/ticket/17741
"""
cursor = connections[qs.db].cursor()
query, params = qs.query.sql_with_params()
cursor.execute('EXPLAIN ' + query, params)
res = str(cursor.db.ops.last_executed_query(cursor, query, params))
assert res.startswith('EXPLAIN ')
return res[len('EXPLAIN '):]
解决方案 14:
我将此功能放在我的项目中的一个应用程序的实用程序文件中:
import logging
import re
from django.db import connection
logger = logging.getLogger(__name__)
def sql_logger():
logger.debug('TOTAL QUERIES: ' + str(len(connection.queries)))
logger.debug('TOTAL TIME: ' + str(sum([float(q['time']) for q in connection.queries])))
logger.debug('INDIVIDUAL QUERIES:')
for i, query in enumerate(connection.queries):
sql = re.split(r'(SELECT|FROM|WHERE|GROUP BY|ORDER BY|INNER JOIN|LIMIT)', query['sql'])
if not sql[0]: sql = sql[1:]
sql = [(' ' if i % 2 else '') + x for i, x in enumerate(sql)]
logger.debug('
### {} ({} seconds)
{};
'.format(i, query['time'], '
'.join(sql)))
然后,当需要时,我只需导入它并从必要的上下文(通常是视图)中调用它,例如:
# ... other imports
from .utils import sql_logger
class IngredientListApiView(generics.ListAPIView):
# ... class variables and such
# Main function that gets called when view is accessed
def list(self, request, *args, **kwargs):
response = super(IngredientListApiView, self).list(request, *args, **kwargs)
# Call our function
sql_logger()
return response
在模板之外执行此操作很好,因为如果您有 API 视图(通常是 Django Rest Framework),它也可以在那里应用。
解决方案 15:
我相信如果你使用 PostgreSQL,这应该有效:
from django.db import connections
from app_name import models
from django.utils import timezone
# Generate a queryset, use your favorite filter, QS objects, and whatnot.
qs=models.ThisDataModel.objects.filter(user='bob',date__lte=timezone.now())
# Get a cursor tied to the default database
cursor=connections['default'].cursor()
# Get the query SQL and parameters to be passed into psycopg2, then pass
# those into mogrify to get the query that would have been sent to the backend
# and print it out. Note F-strings require python 3.6 or later.
print(f'{cursor.mogrify(*qs.query.sql_with_params())}')
解决方案 16:
如果您需要重复使用查询来处理某些自定义 SQL,那么还有另一种非常有用的方法。我曾在一个分析应用中使用过这种方法,它远远超出了 Django ORM 可以轻松完成的范围,因此我将 ORM 生成的 SQL 作为子查询包含在内。
from django.db import connection
from myapp.models import SomeModel
queryset = SomeModel.objects.filter(foo='bar')
sql_query, params = queryset.query.as_sql(None, connection)
这将为您提供带有占位符的 SQL,以及要使用的带有查询参数的元组。您可以将其直接传递给数据库:
with connection.connection.cursor(cursor_factory=DictCursor) as cursor:
cursor.execute(sql_query, params)
data = cursor.fetchall()
解决方案 17:
要将结果查询从 django 获取到数据库(使用正确的参数替换),
您可以使用此函数:
from django.db import connection
def print_database_query_formatted(query):
sql, params = query.sql_with_params()
cursor = connection.cursor()
cursor.execute('EXPLAIN ' + sql, params)
db_query = cursor.db.ops.last_executed_query(cursor, sql, params).replace('EXPLAIN ', '')
parts = '{}'.format(db_query).split('FROM')
print(parts[0])
if len(parts) > 1:
parts = parts[1].split('WHERE')
print('FROM{}'.format(parts[0]))
if len(parts) > 1:
parts = parts[1].split('ORDER BY')
print('WHERE{}'.format(parts[0]))
if len(parts) > 1:
print('ORDER BY{}'.format(parts[1]))
# USAGE
users = User.objects.filter(email='admin@admin.com').order_by('-id')
print_database_query_formatted(users.query)
输出示例
SELECT "users_user"."password", "users_user"."last_login", "users_user"."is_superuser", "users_user"."deleted", "users_user"."id", "users_user"."phone", "users_user"."username", "users_user"."userlastname", "users_user"."email", "users_user"."is_staff", "users_user"."is_active", "users_user"."date_joined", "users_user"."latitude", "users_user"."longitude", "users_user"."point"::bytea, "users_user"."default_search_radius", "users_user"."notifications", "users_user"."admin_theme", "users_user"."address", "users_user"."is_notify_when_buildings_in_radius", "users_user"."active_campaign_id", "users_user"."is_unsubscribed", "users_user"."sf_contact_id", "users_user"."is_agree_terms_of_service", "users_user"."is_facebook_signup", "users_user"."type_signup"
FROM "users_user"
WHERE "users_user"."email" = 'admin@admin.com'
ORDER BY "users_user"."id" DESC
它基于此票证评论: https://code.djangoproject.com/ticket/17741#comment:4
解决方案 18:
生成Django 中立即可用的CREATE / UPDATE / DELETE / 命令的SQL
from django.db.models import sql
def generate_update_sql(queryset, update_kwargs):
"""Converts queryset with update_kwargs
like : queryset.update(**update_kwargs) to UPDATE SQL"""
query = queryset.query.clone(sql.UpdateQuery)
query.add_update_values(update_kwargs)
compiler = query.get_compiler(queryset.db)
sql, params = compiler.as_sql()
return sql % params
from django.db.models import sql
def generate_delete_sql(queryset):
"""Converts select queryset to DELETE SQL """
query = queryset.query.chain(sql.DeleteQuery)
compiler = query.get_compiler(queryset.db)
sql, params = compiler.as_sql()
return sql % params
from django.db.models import sql
def generate_create_sql(model, model_data):
"""Converts queryset with create_kwargs
like if was: queryset.create(**create_kwargs) to SQL CREATE"""
not_saved_instance = model(**model_data)
not_saved_instance._for_write = True
query = sql.InsertQuery(model)
fields = [f for f in model._meta.local_concrete_fields if not isinstance(f, AutoField)]
query.insert_values(fields, [not_saved_instance], raw=False)
compiler = query.get_compiler(model.objects.db)
sql, params = compiler.as_sql()[0]
return sql % params
测试和使用
def test_generate_update_sql_with_F(self):
qs = Event.objects.all()
update_kwargs = dict(description=F('slug'))
result = generate_update_sql(qs, update_kwargs)
sql = "UPDATE `api_event` SET `description` = `api_event`.`slug`"
self.assertEqual(sql, result)
def test_generate_create_sql(self):
result = generate_create_sql(Event, dict(slug='a', app='b', model='c', action='e'))
sql = "INSERT INTO `api_event` (`slug`, `app`, `model`, `action`, `action_type`, `description`) VALUES (a, b, c, e, , )"
self.assertEqual(sql, result)
解决方案 19:
我制作了一个小片段供您使用:
from django.conf import settings
from django.db import connection
def sql_echo(method, *args, **kwargs):
settings.DEBUG = True
result = method(*args, **kwargs)
for query in connection.queries:
print(query)
return result
# HOW TO USE EXAMPLE:
#
# result = sql_echo(my_method, 'whatever', show=True)
它以要检查的函数(包含 SQL 查询)和调用该函数所需的 args、kwargs 作为参数。结果,它返回函数返回的内容并在控制台中打印 SQL 查询。
解决方案 20:
from django.db import reset_queries, connection
class ShowSQL(object):
def __enter__(self):
reset_queries()
return self
def __exit__(self, *args):
for sql in connection.queries:
print('Time: %s
SQL: %s' % (sql['time'], sql['sql']))
然后您可以使用:
with ShowSQL() as t:
some queries <select>|<annotate>|<update> or other
它打印
时间:%s
SQL:%s
解决方案 21:
您可以使用它connection.queries
来获取在 Django 中运行的原始 SQL 查询,如下所示:
# "store/views.py"
from django.db import transaction
from .models import Person
from django.db import connection
from django.http import HttpResponse
@transaction.atomic
def test(request):
Person.objects.create(name="John") # INSERT
qs = Person.objects.select_for_update().get(name="John") # SELECT FOR UPDATE
qs.name = "Tom"
qs.save() # UPDATE
qs.delete() # DELETE
for query in connection.queries: # Here
print(query)
return HttpResponse("Test")
然后,原始查询将打印在控制台上,如下所示:
{'sql': 'INSERT INTO "store_person" ("name") VALUES (\'John\') RETURNING "store_person"."id"', 'time': '0.000'}
{'sql': 'SELECT "store_person"."id", "store_person"."name" FROM "store_person" WHERE "store_person"."name" = \'John\' LIMIT 21 FOR UPDATE', 'time': '0.000'}
{'sql': 'UPDATE "store_person" SET "name" = \'Tom\' WHERE "store_person"."id" = 179', 'time': '0.000'}
{'sql': 'DELETE FROM "store_person" WHERE "store_person"."id" IN (179)', 'time': '0.000'}
[24/Dec/2022 06:29:32] "GET /store/test/ HTTP/1.1" 200 9
然后,如果您只想获取和查询而不获取和查询,reset_queries()
请将其放在后面,如下所示:Person.objects.select_for_update()
UPDATE
`DELETE****
INSERT`SELECT FOR UPDATE
# "store/views.py"
from django.db import transaction
from .models import Person
from django.db import reset_queries
from django.db import connection
from django.http import HttpResponse
@transaction.atomic
def test(request):
Person.objects.create(name="John") # INSERT
qs = Person.objects.select_for_update().get(name="John") # SELECT FOR UPDATE
reset_queries() # Here
qs.name = "Tom"
qs.save() # UPDATE
qs.delete() # DELETE
for query in connection.queries: # Here
print(query)
return HttpResponse("Test")
然后,只打印UPDATE
和DELETE
查询,而不打印INSERT
和SELECT FOR UPDATE
查询,如下所示:
{'sql': 'UPDATE "store_person" SET "name" = \'Tom\' WHERE "store_person"."id" = 190', 'time': '0.000'}
{'sql': 'DELETE FROM "store_person" WHERE "store_person"."id" IN (190)', 'time': '0.000'}
[24/Dec/2022 07:00:01] "GET /store/test/ HTTP/1.1" 200 9
解决方案 22:
修改您的settings.py
文件:
# settings.py
LOGGING = {
'version': 1,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {message}',
'style': '{',
},
},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'verbose',
},
},
'loggers': {
'django.db.backends': {
'handlers': ['console'],
'level': 'DEBUG',
'propagate': True,
},
},
}
然后确保您的应用程序在 DEBUG 模式下运行,
# settings.py
DEBUG = True
立即运行您的应用程序,您将看到打印到控制台的 SQL 查询。
解决方案 23:
对于那些只想从查询本身寻找结果的人来说,有一种最简单的方法:
假设我们有一个名为的模型Musico
:
class Musico(models.Model):
INSTRUMENTOS = [
('violao', 'Violão'),
('piano', 'Piano'),
('cavaquinho', 'Cavaquinho'),
]
usuario = models.OneToOneField(User, on_delete=models.DO_NOTHING, null=True)
primeiro_nome = models.CharField(max_length=120)
sobrenome = models.CharField(max_length=120, null=True, blank=True)
tipo_instrumento = models.CharField(choices=INSTRUMENTOS, max_length=200)
idade = models.IntegerField(null=True, blank=True)
def __str__(self):
return f"Musico: {self.primeiro_nome}"
检查原始 SQL 查询如下:
>>> str(Musico.objects.all().query)
'SELECT "model_lesson_app_musico"."id", "model_lesson_app_musico"."usuario_id", "model_lesson_app_musico"."primeiro_nome", "model_lesson_app_musico"."sobrenome", "model_lesson_app_musico"."tipo_instrumento", "model_lesson_app_musico"."idade" FROM "model_lesson_app_musico"'
解决方案 24:
使用 django.db.connection.queries查看查询
from django.db import connection
print(connection.queries)
访问 QuerySet 对象上的原始 SQL 查询
qs = MyModel.objects.all()
print(qs.query)
解决方案 25:
对于 Django 2.2:
因为大多数答案在使用时对我没什么帮助./manage.py shell
。最后我找到了答案。希望这对某些人有帮助。
查看所有查询:
from django.db import connection
connection.queries
要查看单个查询的查询:
q=Query.objects.all()
q.query.__str__()
q.query
只为我显示对象。使用__str__()
(字符串表示)显示完整查询。
解决方案 26:
这里已经有几个很好的答案。
还有一种方法。
在测试中,执行以下操作:
with self.assertNumQueries(3):
response = self.client.post(reverse('payments:pay_list'))
# or whatever
如果查询数量错误,测试失败并在控制台中打印所有原始 SQL 查询。
此外,这样的测试有助于控制 SQL 查询的数量不会随着代码的变化而增加,并且数据库负载不会过大。