设置计划作业?
- 2024-12-05 08:37:00
- admin 原创
- 170
问题描述:
我一直在使用 Django 开发一个 Web 应用程序,我很好奇是否有办法安排一项作业定期运行。
基本上,我只是想运行数据库并定期自动进行一些计算/更新,但我似乎找不到有关执行此操作的任何文档。
有人知道如何设置吗?
澄清一下:我知道我可以设置一个cron
作业来执行此操作,但我很好奇 Django 中是否有某些功能可以提供此功能。我希望人们能够自己部署此应用程序,而无需进行太多配置(最好是零配置)。
我曾考虑通过简单地检查自上次向站点发送请求以来是否应该运行某项作业来“追溯”触发这些操作,但我希望得到一些更清晰的操作。
解决方案 1:
我采用的一个解决方案是这样做:
1)创建自定义管理命令,例如
python manage.py my_cool_command
2)使用cron
(在 Linux 上)或at
(在 Windows 上)在需要的时间运行我的命令。
这是一个简单的解决方案,不需要安装繁重的 AMQP 堆栈。但是,使用 Celery 之类的东西有很多好处,其他答案中提到过。特别是,使用 Celery 时,您不必将应用程序逻辑分散到 crontab 文件中,这很好。但是,对于小型到中型应用程序以及您不需要大量外部依赖项的应用程序,cron 解决方案非常有效。
编辑:
在 Windows 的更高版本中,该at
命令在 Windows 8、Server 2012 及更高版本中已弃用。您可以将其schtasks.exe
用于相同用途。
更新 这是用于编写自定义管理命令的 django 文档的新链接
解决方案 2:
Celery是一个基于 AMQP (RabbitMQ) 的分布式任务队列。它还以类似 cron 的方式处理定期任务(请参阅定期任务)。根据您的应用,它可能值得一看。
使用 django ( docs )设置 Celery 非常容易,如果发生停机,定期任务实际上会跳过错过的任务。Celery 还具有内置重试机制,以防任务失败。
解决方案 3:
我们已经开源了我认为是结构化的应用程序。Brian 的解决方案也提到了这一点。我们欢迎任何/所有反馈!
https://github.com/tivix/django-cron
它附带一个管理命令:
./manage.py runcrons
这样就完成了。每个 cron 都被建模为一个类(因此都是 OO),并且每个 cron 以不同的频率运行,我们确保相同的 cron 类型不会并行运行(以防 cron 本身的运行时间比其频率更长!)
解决方案 4:
如果您使用的是标准 POSIX 操作系统,则可以使用cron。
如果您使用的是 Windows,则可以使用at。
编写 Django 管理命令来
弄清楚他们所处的平台。
为您的用户执行适当的“AT”命令,或者为您的用户更新 crontab。
解决方案 5:
有趣的新可插入 Django 应用程序:django-chronograph
您只需添加一个充当计时器的 cron 条目,然后您就会在要运行的脚本中获得一个非常好的 Django 管理界面。
解决方案 6:
看看 Django Poor Man's Cron,这是一个 Django 应用程序,它利用垃圾邮件机器人、搜索引擎索引机器人等以大约固定的时间间隔运行计划任务
参见:http://code.google.com/p/django-poormanscron/
解决方案 7:
我之前有完全相同的要求,最终使用APScheduler解决了它(用户指南)
它使调度作业变得非常简单,并使其与基于请求的某些代码的执行无关。以下是一个简单的示例。
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
job = None
def tick():
print('One tick!')\n
def start_job():
global job
job = scheduler.add_job(tick, 'interval', seconds=3600)
try:
scheduler.start()
except:
pass
希望这对某人有帮助!
解决方案 8:
Brian Neal 关于通过 cron 运行管理命令的建议效果很好,但如果你正在寻找一些更强大的东西(但不像 Celery 那样复杂),我建议你研究一下像Kronos这样的库:
# app/cron.py
import kronos
@kronos.register('0 * * * *')
def task():
pass
解决方案 9:
Django APScheduler 用于调度作业。高级 Python 调度程序 (APScheduler) 是一个 Python 库,可让您安排 Python 代码稍后执行,一次或定期执行。您可以根据需要随时添加新作业或删除旧作业。
注意:我是这个库的作者
安装 APScheduler
pip install apscheduler
查看要调用的文件函数
文件名:scheduler_jobs.py
def FirstCronTest():
print("")
print("I am executed..!")
配置调度程序
创建execute.py文件并添加以下代码
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
你编写的函数这里,调度程序函数写在scheduler_jobs中
import scheduler_jobs
scheduler.add_job(scheduler_jobs.FirstCronTest, 'interval', seconds=10)
scheduler.start()
链接文件以供执行
现在,在 Url 文件底部添加以下行
import execute
您可以通过执行[点击此处] https://github.com/devchandansh/django-apscheduler查看完整代码
解决方案 10:
RabbitMQ 和 Celery 具有比 Cron 更多的功能和任务处理能力。如果任务失败不是问题,并且您认为将在下一次调用中处理失败的任务,那么 Cron 就足够了。
Celery 和AMQP可让您处理中断的任务,该任务将由另一个工作程序再次执行(Celery 工作程序会监听下一个要执行的任务),直到达到任务的属性。您甚至可以在失败时调用任务,例如记录失败,或在达到属性max_retries
后向管理员发送电子邮件。max_retries
当您需要扩展应用程序时,您可以分发 Celery 和 AMQP 服务器。
解决方案 11:
我个人使用 cron,但django-extensions的作业调度部分看起来很有趣。
解决方案 12:
虽然Airflow不是 Django 的一部分,但它是一个较新的项目(截至 2016 年),对于任务管理很有用。
Airflow 是一个工作流自动化和调度系统,可用于编写和管理数据管道。基于 Web 的 UI 为开发人员提供了一系列用于管理和查看这些管道的选项。
Airflow 是用 Python 编写的,并使用 Flask 构建。
Airflow 由 Airbnb 的 Maxime Beauchemin 创建,并于 2015 年春季开源。它于 2016 年冬季加入了 Apache 软件基金会的孵化计划。这是Git 项目页面和一些额外的背景信息。
解决方案 13:
将以下内容放在 cron.py 文件的顶部:
#!/usr/bin/python
import os, sys
sys.path.append('/path/to/') # the parent directory of the project
sys.path.append('/path/to/project') # these lines only needed if not on path
os.environ['DJANGO_SETTINGS_MODULE'] = 'myproj.settings'
# imports and code below
解决方案 14:
我刚刚想到了这个相当简单的解决方案:
定义一个视图函数do_work(req, param),就像定义其他视图一样,使用 URL 映射,返回 HttpResponse 等等。
根据您的时间偏好设置一个 cron 作业(或使用 Windows 中的 AT 或计划任务),运行curl http://localhost/your/mapped/url?param=value。
您可以添加参数,但只需将参数添加到 URL 即可。
告诉我你们的想法。
[更新]我现在使用django-extensions中的 runjob 命令,而不是 curl。
我的 cron 看起来像这样:
@hourly python /path/to/project/manage.py runjobs hourly
... 以此类推,每日、每月等等。您还可以将其设置为运行特定作业。
我发现它更易于管理,也更简洁。不需要将 URL 映射到视图。只需定义您的作业类和 crontab 即可。
解决方案 15:
在完成这部分代码之后,我可以编写任何内容,就像我的 views.py 一样:)
#######################################
import os,sys
sys.path.append('/home/administrator/development/store')
os.environ['DJANGO_SETTINGS_MODULE']='store.settings'
from django.core.management impor setup_environ
from store import settings
setup_environ(settings)
#######################################
来自
http://www.cotellese.net/2007/09/27/running-external-scripts-against-django-models/
解决方案 16:
您绝对应该看看 django-q!它不需要额外的配置,而且很可能具备处理商业项目上任何生产问题所需的一切。
它正在积极开发,并且与 django、django ORM、mongo、redis 集成得很好。这是我的配置:
# django-q
# -------------------------------------------------------------------------
# See: http://django-q.readthedocs.io/en/latest/configure.html
Q_CLUSTER = {
# Match recommended settings from docs.
'name': 'DjangoORM',
'workers': 4,
'queue_limit': 50,
'bulk': 10,
'orm': 'default',
# Custom Settings
# ---------------
# Limit the amount of successful tasks saved to Django.
'save_limit': 10000,
# See https://github.com/Koed00/django-q/issues/110.
'catch_up': False,
# Number of seconds a worker can spend on a task before it's terminated.
'timeout': 60 * 5,
# Number of seconds a broker will wait for a cluster to finish a task before presenting it again. This needs to be
# longer than `timeout`, otherwise the same task will be processed multiple times.
'retry': 60 * 6,
# Whether to force all async() calls to be run with sync=True (making them synchronous).
'sync': False,
# Redirect worker exceptions directly to Sentry error reporter.
'error_reporter': {
'sentry': RAVEN_CONFIG,
},
}
解决方案 17:
是的,上面的方法太棒了。我也试了几个。最后,我找到了一个这样的方法:
from threading import Timer
def sync():
do something...
sync_timer = Timer(self.interval, sync, ())
sync_timer.start()
就像递归一样。
好的,我希望这个方法可以满足你的要求。:)
解决方案 18:
与 Celery 相比,更现代的解决方案是 Django Q:
https: //django-q.readthedocs.io/en/latest/index.html
它有很好的文档,并且易于理解。缺乏 Windows 支持,因为 Windows 不支持进程分叉。但是,如果您使用 Windows for Linux 子系统创建开发环境,它就可以正常工作。
解决方案 19:
我今天遇到了和你类似的问题。
我不想让服务器通过 cron 来处理它(并且大多数库最终只是 cron 助手)。
因此我创建了一个调度模块并将其附加到init。
这不是最好的方法,但是它可以帮助我将所有代码集中在一个地方,并将其执行与主应用程序相关。
解决方案 20:
我使用 celery 来创建我的定期任务。首先,您需要按如下方式安装它:
pip install django-celery
不要忘记django-celery
在您的设置中注册,然后您可以执行以下操作:
from celery import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from celery.utils.log import get_task_logger
@periodic_task(run_every=crontab(minute="0", hour="23"))
def do_every_midnight():
#your code
解决方案 21:
我不确定这对任何人来说是否有用,因为我必须为系统的其他用户提供安排作业的功能,而又不允许他们访问实际的服务器(Windows)任务计划程序,所以我创建了这个可重复使用的应用程序。
请注意,用户可以访问服务器上的一个共享文件夹,他们可以在其中创建所需的命令/任务/.bat 文件。然后可以使用此应用程序安排此任务。
应用程序名称是Django_Windows_Scheduler
截屏:
解决方案 22:
如果您想要比Celery更可靠的东西,请尝试基于AWS SQS/SNS构建的TaskHawk**。
参考: http: //taskhawk.readthedocs.io
解决方案 23:
对于简单的 dockerized 项目,我实在看不到任何现有的合适的答案。
所以我写了一个非常简单的解决方案,不需要外部库或触发器,它可以独立运行。不需要外部 os-cron,应该可以在任何环境中工作。
它通过添加中间件来工作:middleware.py
import threading
def should_run(name, seconds_interval):
from application.models import CronJob
from django.utils.timezone import now
try:
c = CronJob.objects.get(name=name)
except CronJob.DoesNotExist:
CronJob(name=name, last_ran=now()).save()
return True
if (now() - c.last_ran).total_seconds() >= seconds_interval:
c.last_ran = now()
c.save()
return True
return False
class CronTask:
def __init__(self, name, seconds_interval, function):
self.name = name
self.seconds_interval = seconds_interval
self.function = function
def cron_worker(*_):
if not should_run("main", 60):
return
# customize this part:
from application.models import Event
tasks = [
CronTask("events", 60 * 30, Event.clean_stale_objects),
# ...
]
for task in tasks:
if should_run(task.name, task.seconds_interval):
task.function()
def cron_middleware(get_response):
def middleware(request):
response = get_response(request)
threading.Thread(target=cron_worker).start()
return response
return middleware
models/cron.py
:
from django.db import models
class CronJob(models.Model):
name = models.CharField(max_length=10, primary_key=True)
last_ran = models.DateTimeField()
settings.py
:
MIDDLEWARE = [
...
'application.middleware.cron_middleware',
...
]
解决方案 24:
简单的方法是编写自定义 shell 命令(参见Django 文档)并使用 Linux 上的 cronjob 执行它。不过,我强烈建议使用 RabbitMQ 等消息代理与 celery 结合使用。也许你可以看看这个教程
解决方案 25:
另一种方法是使用Rocketry:
from rocketry import Rocketry
from rocketry.conds import daily, after_success
app = Rocketry()
@app.task(daily.at("10:00"))
def do_daily():
...
@app.task(after_success(do_daily))
def do_after_another():
...
if __name__ == "__main__":
app.run()
它还支持自定义条件:
from pathlib import Path
@app.cond()
def file_exists(file):
return Path(file).exists()
@app.task(daily & file_exists("myfile.csv"))
def do_custom():
...
并且它还支持 Cron:
from rocketry.conds import cron
@app.task(cron('*/2 12-18 * Oct Fri'))
def do_cron():
...
它可以很好地与 FastAPI 集成,我认为它可以与 Django 集成,因为 Rocketry 本质上只是一个可以生成异步任务、线程和进程的复杂循环。
免责声明:我是作者。
解决方案 26:
另一个选择,类似于 Brian Neal 的答案,使用RunScripts
那么你就不需要设置命令了。这样做的好处是文件夹结构更加灵活或更加简洁。
此文件必须实现 run() 函数。这是运行脚本时调用的函数。您可以导入任何模型或 Django 项目的其他部分以在这些脚本中使用。
然后,就
python manage.py runscript path.to.script