注册装饰器
@app.task和@shared_task是Celery中用于定义任务的两种不同装饰器, 它们之间存在明显的区别.
from celery import Celery app = Celery('my_app', broker='amqp://guest@localhost//') @app.task
def my_task(): passfrom celery import shared_task @shared_task
def add(x, y): return x + y
以下是这两者的详细比较:
* 1. 定义与用途.- @app.task: 这是Celery库提供的装饰器, 用于在具体的Celery应用程序中定义任务. 当创建一个Celery应用程序对象(通常命名为 app), 并使用@app.task装饰器来定义任务函数时,这些任务函数仅在该特定的Celery应用程序中可用.- @shared_task: 这是Celery提供的另一个装饰器, 用于定义共享任务(shared task).共享任务是指可以在多个Celery应用程序之间共享的任务.通过使用@shared_task装饰器, 可以在一个Celery应用程序中定义任务, 并将其标记为共享任务,以便其他使用相同配置的Celery应用程序可以直接导入和使用该任务.* 2. 依赖性与可移植性.- @app.task: 任务依赖于特定的Celery应用程序实例.因此, 如果在不同的Celery应用程序中想要使用相同的任务, 需要在每个应用程序中都重新定义它.这在单个项目或单个Celery应用程序的上下文中很有用, 但不利于跨项目的代码复用.- @shared_task: 不依赖于特定的Celery应用程序实例.它加载到内存后会自动添加到Celery对象中, 因此可以在多个Celery应用程序之间共享.这使得任务的可移植性和可重用性更强, 特别是在多个项目或组件需要共享通用任务时.* 3. 使用场景.- 如果任务仅在特定的Celery应用程序中使用, 那么使用@app.task就足够了.- 如果任务需要在多个Celery应用程序之间共享, 或者想要提高任务的代码复用性和可维护性, 那么使用@shared_task会更合适.* 4. 注意事项.- 当使用@shared_task时, 请确保所有相关的Celery应用程序都已正确配置, 并且它们使用相同的消息代理(如: RabbitMQ, Redis等)和结果后端(如果需要的话).- 无论是@app.task还是@shared_task, 定义的任务都可以异步执行, 并可以返回结果.可以使用delay()方法或apply_async()方法来异步调用这些任务.
通过一个具体的例子来说明任务在多个Celery应用程序之间共享.
假设你有两个Python项目, 它们都需要执行一些通用的后台任务, 比如发送电子邮件, 处理图片上传等.
这些任务在逻辑上是相似的, 因此希望避免在两个项目中重复编写相同的代码.
使用@app.task的情况需要会在每个Django项目的Celery配置中分别定义这些任务.
例如, 在第一个项目中:
from celery import Celery app = Celery('project1', broker='amqp://guest@localhost//') @app.task
def send_email(recipient, subject, body): pass
然后在第二个项目中, 需要再次定义相同的任务:
from celery import Celery app = Celery('project2', broker='amqp://guest@localhost//') @app.task
def send_email(recipient, subject, body): pass
这种方法的问题在于, 如果发送电子邮件的逻辑需要更新, 需要在两个项目中都进行更改, 这增加了维护的复杂性和出错的风险.
使用@shared_task, 可以在一个独立的Python模块或包中定义这些共享任务, 并在需要时从两个项目中导入它们.
注意: 使用@shared_task, 多个Celery应用程序必须连接到同一个消息代理(Redis等中间件), 或者至少能够互相通信的消息代理.
只要它们都能连接到同一个消息代理实例或兼容的实例集, 任务就可以被正确发送和接收.
先说明一下@shared_task的绑定任务特性:
from celery import Celery, shared_task
@shared_task
def add(x, y):return x + yprint(add.app)
app1 = Celery(broker='amqp://')
print(add.app is app1)
print(add.app)
app2 = Celery(broker='redis://')
print(add.app is app2)
print(add.app)
调用@shared_task饰后的add任务, 它会尝试找到一个已经存在的Celery应用实例.
(通常是通过查找当前模块或父模块中的celery应用实例, '如果有多个则使用离调用最近的').
如果没有找到, 它会创建一个新的默认Celery应用实例. 这个新创建的实例将绑定到被装饰的任务上, 并通过task.app属性访问.代码中, 首先定义了一个add任务, 然后创建了app1和app2两个Celery应用实例.
add任务在app1和app2创建之前就已经被定义, 并且由于此时还没有显式创建Celery应用实例,
@shared_task装饰器会创建一个默认的Celery应用实例并将其绑定到add任务上.在定义Celery实例之后, 调用饰后的add任务, 那么它会自动将任务重新绑定到最近Celery实例上.
了解情况后可以继续进行实验了.
首先, 创建一个包含共享任务的模块(例如, shared_tasks.py):
from celery import shared_task@shared_task
def add(x, y):return x + y
然后, 在第一个Python项目中导入共享任务:
from celery import Celery
from shared_task import add app = Celery('project1',broker='redis://localhost:6379/0')
print(add.name in app.tasks)
启动Celery Worker进程: celery -A p1 worker --loglevel=info -P eventlet
在第二个Django目中导入这个共享任务:
from celery import Celery
from shared_task import add app = Celery('project2',broker='redis://localhost:6379/1')
print(add.name in app.tasks)
并启动Celery Worker进程: celery -A p2 worker --loglevel=info -P eventlet
定义一个脚本使用提交任务, 使用标识符为project1的Celery实例作为配置:
from p1 import add result = add.delay(4, 4)
print(f'任务ID:: {result.id}')
再定义一个脚本使用提交任务, 使用标识符为project1的Celery实例作为配置:
from p2 import add result = add.delay(5, 5)
print(f'使用结果ID触发的任务: {result.id}')
分别运行脚本p3, p4两次, 查看消息日志:
这样, 如果发送电子邮件的逻辑需要更新, 只需在shared_tasks.py中进行更改,
然后两个项目都会自动使用更新后的逻辑, 而无需在每个项目中分别进行更改.
这大大提高了代码的复用性和可维护性.
如果将两个Celery实例配置为使用同一个消息中间件,
这些实例可以共享任务队列, worker就是随机的了(一个累死累活, 另外一个不干活).
app = Celery('project1',
BROKER_CONNECTION_RETRY_ON_STARTUP=True,
broker='redis://localhost:6379/0')
app = Celery('project2',
BROKER_CONNECTION_RETRY_ON_STARTUP=True,
broker='redis://localhost:6379/0')