demo
即使任务有延时,也不会影响其他任务执行
from datetime import datetime
import time
from apscheduler.schedulers.blocking import BlockingScheduler
class St:
def func(self):
now = datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func time :',ts)
def func2(self):
#耗时2S
now = datetime.now()
ts = now.strftime('%Y-%m-%d %H:%M:%S')
print('do func2 time:',ts)
time.sleep(2)
def func3(self):
self.scheduler.shutdown()
def dojob(self):
#创建调度器:BlockingScheduler
self.scheduler = BlockingScheduler()
#添加任务,时间间隔2S
self.scheduler.add_job(self.func, 'interval', seconds=2, id='test_job1')
#添加任务,时间间隔5S
self.scheduler.add_job(self.func2, 'interval', seconds=3, id='test_job2')
self.scheduler.add_job(self.func3, "interval", seconds=10, id="stop")
self.scheduler.start()
st = St()
st.dojob()
补充
文档
https://apscheduler.readthedocs.io/en/master/userguide.html
github:https://github.com/agronholm/apscheduler
安装
pip install apscheduler
date 定时调度(作业只会执行一次)
# 任务将于2022年11.6执行
sched.add_job(my_job, 'date', run_date=date(2022, 11, 6), args=['text'])
# 任务将于2022年, 11月6日 16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2022, 11, 6, 16, 30, 5), args=['text'])
interval 间隔调度
weeks (int) – number of weeks to wait
days (int) – number of days to wait
hours (int) – number of hours to wait
minutes (int) – number of minutes to wait
seconds (int) – number of seconds to wait
start_date (datetime|str) – starting point for the interval calculation
end_date (datetime|str) – latest possible date/time to trigger on
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
#表示每隔3天18时20分10秒执行一次任务
sched.add_job(my_job, 'interval',days = 03,hours = 18,minutes = 20,seconds = 10)
crontab触发器:
在某个确切的时间周期性的触发事件。可以使用的参数如下:
year:4位数字的年份。
month:1-12月份。
day:1-31日。
week:1-53周。
day_of_week:一个礼拜中的第几天( 0-6或者 mon、 tue、 wed、 thu、 fri、 sat、 sun)。
hour: 0-23小时。
minute: 0-59分钟。
second: 0-59秒。
start_date: datetime类型或者字符串类型,起始时间。
end_date: datetime类型或者字符串类型,结束时间。
timezone:时区。
jitter:任务触发的误差时间。
scheduler.add_job(tick,"cron",day="4thsun",hour=20,minute=1)