Вслед за этим ответом на Stack Overflow, вот игрушечный пример, который выполняет обратный вызов для аккорда Celery независимо от того, был ли заголовок успешным.
tasks.py
:
from celery import group, chord, Celery
cel = Celery(__name__,
broker='redis://localhost:6379',
backend='redis://localhost:6379')
@cel.task
def add(a, b):
print("adding {} + {}".format(a, b))
return a + b
@cel.task
def mult(a, b):
print("multiplying {} * {}".format(a, b))
return a * b
@cel.task
def div(a, b):
print("dividing {} by {}".format(a, b))
return a / b
@cel.task
def subtract(a, b):
print("subtracting {} from {}".format(b, a))
return a - b
@cel.task
def div_with_err(a, b):
print("This task was called with arguments {} and {} and should raise an error.".format(a, b))
return 1/0
@cel.task(name='tasks.callback')
def callback(*args, **kwargs):
print("Callback is executing")
print("args are ", args)
print("kwargs are ", kwargs)
group_foo = group([subtract.s(1, 3), add.s(2, 4)])
chain_bar = mult.s(1, 3) | div.s(4)
chain_with_err = mult.s(1, 3) | div_with_err.s(4)
header = group([group_foo, chain_with_err])
callback_baz = callback.s()
callback_baz.set(link_error=['tasks.callback'])
job = chord(header, callback_baz)
job.apply_async()
Но я застрял на том, чего я действительно хочу, а именно, чтобы обратный вызов был цепочкой, которая выполняется независимо от того, был ли аккорд успешным или неудачным. Вот что я пробовал очень наивно, не ожидая, что это сработает:
@cel.task(name='tasks.callback')
def callback_task_1(*args, **kwargs):
print("Callback is executing")
print("args are ", args)
print("kwargs are ", kwargs)
return 1 + 1
@cel.task
def callback_task_2(a):
print("Callback task 2 is executing; received {} from callback task 1".format(a))
callback_chain = chain(callback_task_1.s(), callback_task_2.s(), name='callback_chain')
group_foo = group([subtract.s(1, 3), add.s(2, 4)])
chain_bar = mult.s(1, 3) | div.s(4)
chain_with_err = mult.s(1, 3) | div_with_err.s(4)
header = group([group_foo, chain_with_err])
callback_baz = callback_task_1.s()
callback_chain.set(link_error=['callback_chain'])
job = chord(header, callback_chain)
job.apply_async()
и это не так. Это терпит неудачу с
[2020-05-08 13:21:15,480: ERROR/MainProcess] Received unregistered task of type 'callback_chain'.
The message has been ignored and discarded.
Может ли кто-нибудь предложить другой подход? Если в моем реальном приложении все пойдет еще хуже, я мог бы сделать функции, представленные callback_chain
, одной большой задачей, но это действительно было бы обломом. Например, последняя задача в цепочке — это задача send_email
, которая использует много мест в приложении, и я не хочу дублировать все ее функции в отдельной задаче только для этого случая.
callback_chain.set(link_error=['callback_chain'])
? - person DejanLekic   schedule 08.05.2020chain_with_err
), я получаю интересную ошибку:send_task() got multiple values for argument 'name'
. И затем, во время обработки этого исключения,TypeError: sequence item 1: expected a bytes-like object, NoneType found
. - person Katie   schedule 08.05.2020