0%

Python 实现定时任务的九种方案 - Sitin - SegmentFault 思否

Excerpt

time.sleep()是Python标准库中的函数,它可以帮助你暂停程序的执行一段指定的时间。通过组合time.sleep()和循环,可以实现简单的定时任务。


定时任务是编程中常见的需求,它可以按照预定的时间表执行特定的任务或操作。在Python中,有多种方法可以实现定时任务。

本文将介绍九种常见的Python定时任务解决方案,包括使用标准库、第三方库和操作系统工具。

方案一:使用time.sleep()

time.sleep()是Python标准库中的函数,它可以帮助你暂停程序的执行一段指定的时间。通过组合time.sleep()和循环,可以实现简单的定时任务。

示例代码:

1
2
3
4
5
6
7
8
import time

def my_task():
print("定时任务执行中...")

while True:
my_task()
time.sleep(3600)

Python的sched模块提供了一个事件调度器,可以在指定的时间执行函数。这是一个更灵活的解决方案,可以安排多个任务。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
import sched
import time

s = sched.scheduler(time.time, time.sleep)

def my_task(sc):
print("定时任务执行中...")
sc.enter(3600, 1, my_task, (sc,))

s.enter(3600, 1, my_task, (s,))
s.run()

APScheduler是一个功能强大的Python库,可用于调度各种类型的任务,包括定时任务。它支持多种调度方式,如间隔调度、定时调度等。

示例代码:

1
2
3
4
5
6
7
8
from apscheduler.schedulers.blocking import BlockingScheduler

def my_task():
print("定时任务执行中...")

scheduler = BlockingScheduler()
scheduler.add_job(my_task, 'interval', hours=1)
scheduler.start()

Celery是一个强大的分布式任务队列框架,可以用于管理和调度定时任务。它支持任务的异步执行和分布式部署。

示例代码:

1
2
3
4
5
6
7
8
9
from celery import Celery

app = Celery('myapp', broker='pyamqp://guest@localhost//')

@app.task
def my_task():
print("定时任务执行中...")

my_task.apply_async(countdown=3600)

如果正在构建基于Flask的Web应用程序,可以结合使用APScheduler和Flask-APScheduler扩展来执行定时任务。

示例代码:

1
2
3
4
5
6
7
from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler

app = Flask(__name__)
scheduler = BackgroundScheduler()
scheduler.add_job(my_task, 'interval', hours=1)
scheduler.start()

在Linux和Unix系统上,可以使用cron任务调度器来执行定时任务。通过编写一个包含Python命令的脚本,并将其添加到cron作业中,可以在指定的时间执行Python脚本。

示例代码:

创建一个Python脚本(例如my_task.py):

1
2
3
4
5
def my_task():
print("定时任务执行中...")

if __name__ == "__main__":
my_task()
1
crontab -e
1
0 * * * * /usr/bin/python /path/to/my_task.py

如果正在构建Django Web应用程序,可以使用APScheduler和Django-APScheduler扩展来执行定时任务。

示例代码:

1
2
3
4
5
6
7
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import register_events, register_job

scheduler = BackgroundScheduler()
scheduler.add_job(my_task, 'interval', hours=1)
register_events(scheduler)
scheduler.start()

在Windows操作系统上,可以使用任务计划程序来执行定时任务。通过创建批处理文件(.bat)并在其中运行Python脚本,然后将批处理文件添加到Windows任务计划程序中,可以实现定时任务。

示例代码:

创建一个批处理文件(例如my_task.bat):

1
2
@echo off
C:\Python\python.exe C:\path\to\my_task.py

方案九:使用第三方库schedule

schedule是一个Python库,它提供了一种非常简单的方式来执行定时任务。它的语法直观易懂,适用于各种定时任务需求。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
import schedule
import time

def my_task():
print("定时任务执行中...")

schedule.every(1).hour.do(my_task)

while True:
schedule.run_pending()
time.sleep(1)