当前位置: 动力学知识库 > 问答 > 编程问答 >

python - Celery "Received unregistered task of type"

问题描述:

I have read quite a few posts that are similar to this but none seem to make sense to me.

I am trying to configure a Celery PeriodicTask to fire every 5 seconds but I'm getting hung up on a Celery configuration issue (I think)

comm/tasks.py

import datetime

from celery.decorators import periodic_task

@periodic_task

def send_queued_messages():

# do something...

myapp/settings.py

...

from comm.tasks import send_queued_messages

from datetime import timedelta

CELERYBEAT_SCHEDULE = {

'send_queued_messages_every_5_seconds': {

'task': 'comm.tasks.send_queued_messages', # Is the issue here? I've tried a dozen variations!!

'schedule': timedelta(seconds=5),

},

}

The relevant output from my error logs:

23:41:00 worker.1 | [2015-06-10 03:41:00,657: ERROR/MainProcess] Received unregistered task of type 'send_queued_messages'.

23:41:00 worker.1 | The message has been ignored and discarded.

23:41:00 worker.1 |

23:41:00 worker.1 | Did you remember to import the module containing this task?

23:41:00 worker.1 | Or maybe you are using relative imports?

23:41:00 worker.1 | Please see http://bit.ly/gLye1c for more information.

23:41:00 worker.1 |

23:41:00 worker.1 | The full contents of the message body was:

23:41:00 worker.1 | {'utc': True, 'chord': None, 'args': [], 'retries': 0, 'expires': None, 'task': 'send_queued_messages', 'callbacks': None, 'errbacks': None, 'timelimit': (None, None), 'taskset': None, 'kwargs': {}, 'eta': None, 'id': 'a8ca18...227a56'} (216b)

网友答案:

See the celery docs for an explanation on task naming.

In this case you need to provide celerybeat with a task name that it can find.

try this:

CELERYBEAT_SCHEDULE = {
    'send_queued_messages_every_5_seconds': {
        'task': 'myapp.tasks.send_queued_messages', 
        'schedule': timedelta(seconds=5),
        },
    }
网友答案:

I ran into this exact problem, and it turns out the issue is not with the name of the task, but that the Celery worker isn't aware of your task module.

In other words, you have the name of the task correct ('comm.tasks.send_queued_messages'), which was generated by the task decorator, you just haven't told Celery where to look for it.

The quickest solution is to add the following to myapp/settings.py:

CELERY_IMPORTS = ['comm.tasks']

According to the docs, this determines the "sequence of modules to import when the worker starts."

Alternatively, you can configure your settings to auto discover tasks (see docs here), but then you would have to namespace your task module(s), moving comm/tasks.py to comm/comm/tasks.py.

For me, the confusion came from Celery's automatic naming convention, which looks like an import statement, which led me to believe I was using CELERYBEAT_SCHEDULE['task'] to tell Celery where to look for the task. Instead, the scheduler just takes the name as a string.

网友答案:

I just run into the same problem. What I did wrong was that I didn't terminate one of my celery task properly (like the one named 'send_queued_messages' in your case), leaving it run in background and keep sending task named 'send_queued_messages'. Which seems like not a big problem, as long as you still have that task in your code.

But in my case I modified my task name after that to something different (like 'comm.tasks.send_queued_messages'). Which made the task named 'send_queued_messages' become a 'unregistered task', I think.

Below is what I did to solve:

  1. kill all celery process: (grep 'celery' instead of 'celery worker' as indicated by celery documents)

ps auxww | grep 'celery' | awk '{print $2}' | xargs kill -9

  1. restart rabbitmq: (or whatever broker you use)

sudo -u rabbitmq rabbitmqctl stop
sudo rabbitmq-server

Then the error was gone.

分享给朋友:
您可能感兴趣的文章:
随机阅读: