一、轻量级:django-apscheduler(无额外服务,适合单机)
安装
pip install django-apscheduler
注册并建表
settings.py INSTALLED_APPS = [ ... 'django_apscheduler', # 新增 ] python manage.py migrate # 会生成任务表与执行记录表
写任务文件(任意 app 下新建 scheduled_task.py)
python
import os
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
import datetime
# 1. 单例调度器
executors = {'default': ThreadPoolExecutor(max_workers=1)}
scheduler = BackgroundScheduler(
executors=executors,
jobstores={'default': DjangoJobStore()},
daemon=True, # 随主线程退出
)
def print_hello():
print(f'[{datetime.datetime.now()}] hello, 定时任务已执行')
def start_scheduler():
# 2. 只在主进程启动(关键判断)
if os.environ.get('RUN_MAIN') == 'true':
scheduler.add_job(
print_hello,
'interval',
seconds=2,
id='hello_job',
replace_existing=True,
max_instances=1,
coalesce=True
)
scheduler.start()让 Django 启动时自动加载调度器
对应 app 的 apps.py
python
from django.apps import AppConfig
class BackendConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'backend'
def ready(self):
from .scheduled_task import start_scheduler
start_scheduler() # 项目启动即开始调度运行
python manage.py runserver 控制台每 10 秒会看到一次打印,数据库里也会写入执行记录 。
二、企业级:Celery + Celery Beat(支持分布式、动态增删改)
安装
pip install celery redis django-celery-beat
项目根目录新建 celery.py
python
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
app = Celery('your_project', broker='redis://127.0.0.1:6379/0')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()init.py 确保启动
python
from .celery import app as celery_app
__all__ = ('celery_app',)在任意 app 建 tasks.py
python
from celery import shared_task
from datetime import datetime
@shared_task
def send_report():
print(f"[{datetime.now()}] 日报已生成并发送")把定时策略写进 settings.py
python
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'send-report-every-day': {
'task': 'your_app.tasks.send_report',
'schedule': crontab(hour=9, minute=0), # 每天 9:00
},
}启动两个进程(开两个终端)
终端 1:celery -A your_project worker -l info
终端 2:celery -A your_project beat -l info
Beat 负责把任务按时投到 Redis,Worker 负责取出来执行;支持横向扩容、失败重试、后台管理动态增删改 。
把触发器从 'interval' 换成 'cron',然后写“每周几、几点”即可。
下面给出 3 种最常见写法,直接替换 add_job 那一行即可。
- 每周一 9:00 执行
python
scheduler.add_job(
print_hello,
'cron', # ← 关键
day_of_week='0', # 0=周一 … 6=周日
hour=9,
minute=0,
id='hello_job',
replace_existing=True,
max_instances=1,
coalesce=True
)- 每周日 23:30 执行
python
scheduler.add_job(
print_hello,
'cron',
day_of_week='6', # 周日
hour=23,
minute=30,
id='hello_job',
replace_existing=True
)- 每周一、三、五 上午 8:15 执行
python
scheduler.add_job(
print_hello,
'cron',
day_of_week='0,2,4', # 周一、三、五
hour=8,
minute=15,
id='hello_job',
replace_existing=True
)字段速查
day_of_week:'0'周一 …'6'周日,可多值'0,2,4'hour:0-23minute:0-59- 可选再追加
second=0让秒级对齐,不写默认 0 秒
改完重启一次,python manage.py runserver,日志会提示
Adding job tentatively -- it will be properly scheduled when the scheduler starts等到你设定的“周几 几点”第一次到来时就会打印,其余时间安静等待,确认无误即可。