как предоставить цепочку в качестве обратного вызова для аккорда Celery, когда эта цепочка должна выполняться при успехе или неудаче аккорда

Вслед за этим ответом на Stack Overflow, вот игрушечный пример, который выполняет обратный вызов для аккорда Celery независимо от того, был ли заголовок успешным.

tasks.py:

from celery import group, chord, Celery

cel = Celery(__name__,
                broker='redis://localhost:6379',
                backend='redis://localhost:6379')

@cel.task
def add(a, b):
    print("adding {} + {}".format(a, b))
    return a + b

@cel.task
def mult(a, b):
    print("multiplying {} * {}".format(a, b))
    return a * b

@cel.task
def div(a, b):
    print("dividing {} by {}".format(a, b))
    return a / b

@cel.task
def subtract(a, b):
    print("subtracting {} from {}".format(b, a))
    return a - b

@cel.task
def div_with_err(a, b):
    print("This task was called with arguments {} and {} and should raise an error.".format(a, b))
    return 1/0

@cel.task(name='tasks.callback')
def callback(*args, **kwargs):
    print("Callback is executing")
    print("args are ", args)
    print("kwargs are ", kwargs)


group_foo = group([subtract.s(1, 3), add.s(2, 4)])

chain_bar = mult.s(1, 3) | div.s(4)

chain_with_err = mult.s(1, 3) | div_with_err.s(4)

header = group([group_foo, chain_with_err])

callback_baz = callback.s()

callback_baz.set(link_error=['tasks.callback'])

job = chord(header, callback_baz)

job.apply_async()

Но я застрял на том, чего я действительно хочу, а именно, чтобы обратный вызов был цепочкой, которая выполняется независимо от того, был ли аккорд успешным или неудачным. Вот что я пробовал очень наивно, не ожидая, что это сработает:

@cel.task(name='tasks.callback')
def callback_task_1(*args, **kwargs):
    print("Callback is executing")
    print("args are ", args)
    print("kwargs are ", kwargs)
    return 1 + 1

@cel.task
def callback_task_2(a):
    print("Callback task 2 is executing; received {} from callback task 1".format(a))

callback_chain = chain(callback_task_1.s(), callback_task_2.s(), name='callback_chain')

group_foo = group([subtract.s(1, 3), add.s(2, 4)])

chain_bar = mult.s(1, 3) | div.s(4)

chain_with_err = mult.s(1, 3) | div_with_err.s(4)

header = group([group_foo, chain_with_err])

callback_baz = callback_task_1.s()

callback_chain.set(link_error=['callback_chain'])

job = chord(header, callback_chain)

job.apply_async()

и это не так. Это терпит неудачу с

[2020-05-08 13:21:15,480: ERROR/MainProcess] Received unregistered task of type 'callback_chain'.
The message has been ignored and discarded.

Может ли кто-нибудь предложить другой подход? Если в моем реальном приложении все пойдет еще хуже, я мог бы сделать функции, представленные callback_chain, одной большой задачей, но это действительно было бы обломом. Например, последняя задача в цепочке — это задача send_email, которая использует много мест в приложении, и я не хочу дублировать все ее функции в отдельной задаче только для этого случая.


person Katie    schedule 08.05.2020    source источник
comment
Вы пытались запустить его без callback_chain.set(link_error=['callback_chain'])?   -  person DejanLekic    schedule 08.05.2020
comment
@DejanLekic, когда я комментирую эту строку (а также строку chain_with_err), я получаю интересную ошибку: send_task() got multiple values for argument 'name'. И затем, во время обработки этого исключения, TypeError: sequence item 1: expected a bytes-like object, NoneType found.   -  person Katie    schedule 08.05.2020