欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 新车 > Celery注册装饰器@app.task和@shared_task

Celery注册装饰器@app.task和@shared_task

2024/10/25 6:33:27 来源:https://blog.csdn.net/qq_46137324/article/details/140911407  浏览:    关键词:Celery注册装饰器@app.task和@shared_task

注册装饰器

@app.task和@shared_task是Celery中用于定义任务的两种不同装饰器, 它们之间存在明显的区别.
from celery import Celery  app = Celery('my_app', broker='amqp://guest@localhost//')  @app.task  
def my_task():  # 任务逻辑  passfrom celery import shared_task  @shared_task  
def add(x, y):  return x + y
以下是这两者的详细比较:
* 1. 定义与用途.- @app.task: 这是Celery库提供的装饰器, 用于在具体的Celery应用程序中定义任务. 当创建一个Celery应用程序对象(通常命名为 app), 并使用@app.task装饰器来定义任务函数时,这些任务函数仅在该特定的Celery应用程序中可用.- @shared_task: 这是Celery提供的另一个装饰器, 用于定义共享任务(shared task).共享任务是指可以在多个Celery应用程序之间共享的任务.通过使用@shared_task装饰器, 可以在一个Celery应用程序中定义任务, 并将其标记为共享任务,以便其他使用相同配置的Celery应用程序可以直接导入和使用该任务.* 2. 依赖性与可移植性.- @app.task: 任务依赖于特定的Celery应用程序实例.因此, 如果在不同的Celery应用程序中想要使用相同的任务, 需要在每个应用程序中都重新定义它.这在单个项目或单个Celery应用程序的上下文中很有用, 但不利于跨项目的代码复用.- @shared_task: 不依赖于特定的Celery应用程序实例.它加载到内存后会自动添加到Celery对象中, 因此可以在多个Celery应用程序之间共享.这使得任务的可移植性和可重用性更强, 特别是在多个项目或组件需要共享通用任务时.* 3. 使用场景.- 如果任务仅在特定的Celery应用程序中使用, 那么使用@app.task就足够了.- 如果任务需要在多个Celery应用程序之间共享, 或者想要提高任务的代码复用性和可维护性, 那么使用@shared_task会更合适.* 4. 注意事项.- 当使用@shared_task时, 请确保所有相关的Celery应用程序都已正确配置, 并且它们使用相同的消息代理(: RabbitMQ, Redis等)和结果后端(如果需要的话).- 无论是@app.task还是@shared_task, 定义的任务都可以异步执行, 并可以返回结果.可以使用delay()方法或apply_async()方法来异步调用这些任务.
通过一个具体的例子来说明任务在多个Celery应用程序之间共享. 
假设你有两个Python项目, 它们都需要执行一些通用的后台任务, 比如发送电子邮件, 处理图片上传等.
这些任务在逻辑上是相似的, 因此希望避免在两个项目中重复编写相同的代码.
使用@app.task的情况需要会在每个Django项目的Celery配置中分别定义这些任务. 
例如, 在第一个项目中:
# Project 1  
from celery import Celery  app = Celery('project1', broker='amqp://guest@localhost//')  @app.task  
def send_email(recipient, subject, body):  # 发送电子邮件的逻辑  pass
然后在第二个项目中, 需要再次定义相同的任务:
# Project 2  
from celery import Celery  app = Celery('project2', broker='amqp://guest@localhost//')  @app.task  
def send_email(recipient, subject, body):  # 发送电子邮件的逻辑(与Project 1中的完全相同)  pass
这种方法的问题在于, 如果发送电子邮件的逻辑需要更新, 需要在两个项目中都进行更改, 这增加了维护的复杂性和出错的风险.
使用@shared_task, 可以在一个独立的Python模块或包中定义这些共享任务, 并在需要时从两个项目中导入它们.
注意: 使用@shared_task, 多个Celery应用程序必须连接到同一个消息代理(Redis等中间件), 或者至少能够互相通信的消息代理.
只要它们都能连接到同一个消息代理实例或兼容的实例集, 任务就可以被正确发送和接收.
先说明一下@shared_task的绑定任务特性:
from celery import Celery, shared_task# 创建共享任务
@shared_task
def add(x, y):return x + yprint(add.app)  # <Celery default at 0x148b1a83e50># 创建app1实例
app1 = Celery(broker='amqp://')
print(add.app is app1)  # True
print(add.app)  # <Celery __main__ at 0x148b4281700># 创建app2实例
app2 = Celery(broker='redis://')
print(add.app is app2)  # True 
print(add.app)  # <Celery __main__ at 0x148b4295d30>
调用@shared_task饰后的add任务, 它会尝试找到一个已经存在的Celery应用实例.
(通常是通过查找当前模块或父模块中的celery应用实例, '如果有多个则使用离调用最近的').
如果没有找到, 它会创建一个新的默认Celery应用实例. 这个新创建的实例将绑定到被装饰的任务上, 并通过task.app属性访问.代码中, 首先定义了一个add任务, 然后创建了app1和app2两个Celery应用实例.
add任务在app1和app2创建之前就已经被定义, 并且由于此时还没有显式创建Celery应用实例,
@shared_task装饰器会创建一个默认的Celery应用实例并将其绑定到add任务上.在定义Celery实例之后, 调用饰后的add任务, 那么它会自动将任务重新绑定到最近Celery实例上.
了解情况后可以继续进行实验了.
首先, 创建一个包含共享任务的模块(例如, shared_tasks.py):
# shared_tasks.py  
from celery import shared_task@shared_task
def add(x, y):return x + y

image-20240804202251739

然后, 在第一个Python项目中导入共享任务:
# p1.py  
from celery import Celery
from shared_task import add  # 导入共享任务app = Celery('project1',broker='redis://localhost:6379/0')  # Redis作为Broker# 可以通过app.tasks来访问任务, 比如检查它是否已经注册
print(add.name in app.tasks)  # 输出True

image-20240804202351424

启动Celery Worker进程: celery -A p1  worker --loglevel=info  -P eventlet
在第二个Django目中导入这个共享任务:
# p2.py  
from celery import Celery
from shared_task import add  # 导入共享任务app = Celery('project2',broker='redis://localhost:6379/1')  # Redis作为Broker# 可以通过app.tasks来访问任务, 比如检查它是否已经注册
print(add.name in app.tasks)  # 输出True

image-20240804202439161

并启动Celery Worker进程: celery -A p2  worker --loglevel=info  -P eventlet
定义一个脚本使用提交任务, 使用标识符为project1的Celery实例作为配置:
# p3.py
from p1 import add  # 调用add任务时, 会自动绑定p1文件中的celery实例result = add.delay(4, 4)  # 使用.delay()方法异步触发任务
print(f'任务ID:: {result.id}')

image-20240804202503043

再定义一个脚本使用提交任务, 使用标识符为project1的Celery实例作为配置:
# p4.py
from p2 import add  # 调用add任务时, 会自动绑定p2文件中的celery实例result = add.delay(5, 5)  # 使用.delay()方法异步触发任务
print(f'使用结果ID触发的任务: {result.id}')

image-20240804202525754

分别运行脚本p3, p4两次, 查看消息日志:

image-20240804200819352

这样, 如果发送电子邮件的逻辑需要更新, 只需在shared_tasks.py中进行更改, 
然后两个项目都会自动使用更新后的逻辑, 而无需在每个项目中分别进行更改.
这大大提高了代码的复用性和可维护性.
如果将两个Celery实例配置为使用同一个消息中间件, 
这些实例可以共享任务队列, worker就是随机的了(一个累死累活, 另外一个不干活).
# p1
# ...
app = Celery('project1',
BROKER_CONNECTION_RETRY_ON_STARTUP=True,
broker='redis://localhost:6379/0') # p2
# ...
app = Celery('project2',
BROKER_CONNECTION_RETRY_ON_STARTUP=True,
broker='redis://localhost:6379/0') 

image-20240804201454073

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com