开发者

Celery: abort task on connection error

开发者 https://www.devze.com 2023-04-11 08:00 出处:网络
I have to implement a Task subclass that gracefully fails if the broker is not running - currently I\'m using RabbitMQ.

I have to implement a Task subclass that gracefully fails if the broker is not running - currently I'm using RabbitMQ. I could probably just use a try statement to catch the exception:

try:
    Mytask.delay(arg1, arg2)
except socket.error:
    # Send an notice to an admin
    pass

but I'd like to create a subclass of Task that can handle that. I've tried something like that:

class MyTask(Task):
    ignore_result = True

    def __call__(self, *args, **kwargs):
        try:
            return self.run(*args, **kwargs)
        except socket.error:
            开发者_如何学Go# Send an notice to an admin
            return None

but the workflow is clearly wrong. I think I need to inject maybe a backend subclass or a failure policy somehow. Do you have any suggestion?


A possible solution I came up with:

import socket
from celery.decorators import task
from celery.task import Task
from celery.backends.base import BaseBackend

UNDELIVERED = 'UNDELIVERED'


class DummyBackend(BaseBackend):
    """
    Dummy queue backend for undelivered messages (due to the broker being down).
    """
    def store_result(self, *args, **kwargs):
        pass

    def get_status(self, *args, **kwargs):
        return UNDELIVERED

    def _dummy(self, *args, **kwargs):
        return None

    wait_for = get_result = get_traceback = _dummy


class SafeTask(Task):
    """
    A task not raising socket errors if the broker is down.
    """
    abstract = True
    on_broker_error = None
    errbackend = DummyBackend

    @classmethod
    def apply_async(cls, *args, **kwargs):
        try:
            return super(SafeTask, cls).apply_async(*args, **kwargs)
        except socket.error, err:
            if cls.on_broker_error is not None:
                cls.on_broker_error(err, cls, *args, **kwargs)
            return cls.app.AsyncResult(None, backend=cls.errbackend(), 
                task_name=cls.name)


def safetask(*args, **kwargs):
    """
    Task factory returning safe tasks handling socket errors.
    When a socket error occurs, the given callable *on_broker_error*
    is called passing the exception object, the class of the task
    and the original args and kwargs.
    """
    if 'base' not in kwargs:

        on_broker_error = kwargs.pop('on_broker_error', SafeTask.on_broker_error)
        errbackend = kwargs.pop('errbackend', SafeTask.errbackend)
        kwargs['base'] = type('SafeTask', (SafeTask,), {
            'on_broker_error': staticmethod(on_broker_error), 
            'errbackend': errbackend,
            'abstract': True,
        })

    return task(*args, **kwargs)

You can both subclass SafeTask or use the decorator @safetask. If you can think of an improvement, don't hesitate to contribute.

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号