跳至主要內容

Rocketry

大约 2 分钟

Rocketry

Miksus/rocketry: Modern scheduling library for Python (github.com)open in new window

Why Rocketry? — Rocketryopen in new window


Rocketry is a modern statement-based scheduling framework for Python. It is simple, clean and extensive. It is suitable for small and big projects.


Quick Start

pip install rocketry
from rocketry import Rocketry
from rocketry.conds import secondly

app = Rocketry()

@app.task(secondly)
def do_daily():
    print('Doing daily task')

if __name__ == '__main__':
    app.run()

Code_cPuY6UcJ2K

除了这种基本的用法, 还可以给任务的执行加上其他自定义条件, 比如:

from rocketry import Rocketry
from rocketry.conds import daily, time_of_week
from pathlib import Path

app = Rocketry()

@app.cond()
def file_exists(file):
    return Path(file).exists()

@app.task(daily.after("08:00") & file_exists("start.py"))
def do_work():
    print('Doing work')

app.run()

Code_YQPZjeVSn7


官网提供的更多示例:

from rocketry.conds import daily, after_success
from rocketry.args import Return

@app.task(daily.after("07:00"))
def do_first():
    ...
    return 'Hello World'

@app.task(after_success(do_first))
def do_second(arg=Return('do_first')):
    # arg contains the value of the task do_first's return
    ...
    return 'Hello Python'
from rocketry.conds import daily

@app.task(daily, execution="main")
def do_unparallel():
    ...

@app.task(daily, execution="async")
async def do_async():
    ...

@app.task(daily, execution="thread")
def do_on_separate_thread():
    ...

@app.task(daily, execution="process")
def do_on_separate_process():
    ...
@app.task('every 10 seconds')
def do_constantly():
    ...

@app.task('every 1 minute')
def do_minutely():
    ...

@app.task('every 1 hour')
def do_hourly():
    ...

@app.task('every 1 day')
def do_daily():
    ...

@app.task('every 2 days 2 hours 20 seconds')
def do_custom():
    ...
@app.task('minutely')
def do_minutely():
    ...

@app.task('hourly')
def do_hourly():
    ...

@app.task('daily')
def do_daily():
    ...

@app.task('weekly')
def do_weekly():
    ...

@app.task('monthly')
def do_monthly():
    ...
@app.task("minutely before 45")
def do_minutely():
    ...

@app.task("hourly after 45:00")
def do_hourly():
    ...

@app.task("daily between 08:00 and 14:00")
def do_daily():
    ...

@app.task("weekly on Monday")
def do_weekly():
    ...

@app.task("monthly starting 3rd")
def do_monthly():
    ...
@app.task('time of day between 10:00 and 18:00')
def do_constantly_during_day():
    ...

@app.task('time of week between Saturday and Sunday')
def do_constantly_during_weekend():
    ...
app = Rocketry(execution="async")

@app.task("daily")
def do_main():
    ...

PS: 因为描述都比较明确, 能看出大致用法, 所以贴上来了, 等到后面再结合具体使用场景进行记录


条件表达式

API

Cron

Cron Scheduling — Rocketryopen in new window

from rocketry.conds import cron

@app.task(cron('* * * * *'))
def do_minutely():
    ...

@app.task(cron('*/2 12-18 * Oct Fri'))
def do_complex():
    "Run at every 2nd minute past every hour from 12 through 18 on Friday in October."
    ...

报错处理

pydantic.errors.PydanticUserError: const is removed, use Literal instead

Pydantic v2 support · Issue #210 · Miksus/rocketry (github.com)open in new window

from rocketry import Rocketry

报错 pydantic.errors.PydanticUserError: const is removed, use Literal instead 可能是因为当前 Rocketry 不兼容 Pydantic2, 可以指定 Pydantic 版本为 1.10.10 以解决此问题

pip install pydantic==1.10.10