当前位置: 动力学知识库 > 问答 > 编程问答 >

python - How to track the progress of individual tasks inside a group which forms the header to a chord in celery?

问题描述:

import celery

def temptask(n):

header=list(tempsubtask.si(i) for i in range(n))

callback=templink.si('printed at last?')

r = celery.chord(celery.group(header))(callback)

return r

@task()

def tempsubtask(i):

print i

for x in range(i):

time.sleep(2)

current_task.update_state(

state='PROGRESS', meta={'completed': x, 'total': i })

@task()

def templink(x):

print 'this should be run at last %s'%x

#executing temptask

r = temptask(100)

I want acccess to the progress status updated by tempsubtask. How can I go about achieving it?

网友答案:

After hours of googling I stumbled upon www.manasupo.com/2012/03/chord-progress-in-celery.html . Though the solution there didn't work for me out of the box, it did inspire me to try something similar.

from celery.utils import uuid
from celery import chord

class ProgressChord(chord):

    def __call__(self, body=None, **kwargs):
        _chord = self.Chord
        body = (body or self.kwargs['body']).clone()
        kwargs = dict(self.kwargs, body=body, **kwargs)
        if _chord.app.conf.CELERY_ALWAYS_EAGER:
            return self.apply((), kwargs)
        callback_id = body.options.setdefault('task_id', uuid())
        r= _chord(**kwargs)
        return _chord.AsyncResult(callback_id), r

and instead of executing celery.chord I use ProgressChord as follows:

def temptask(n):
    header=list(tempsubtask.si(i) for i in range(n))
    callback=templink.si('printed at last?')
    r = celery.Progresschord(celery.group(header))(callback)
    return r

returned value of r contained a tuple having both, callback's asyncresult and a group result. So success looked something like this:

In [3]: r
Out[3]: 
(<AsyncResult: bf87507c-14cb-4ac4-8070-d32e4ff326a6>,
 <GroupResult: af69e131-5a93-492d-b985-267484651d95 [4672cbbb-8ec3-4a9e-971a-275807124fae, a236e55f-b312-485c-a816-499d39d7de41, e825a072-b23c-43f2-b920-350413fd5c9e, e3f8378d-fd02-4a34-934b-39a5a735871d, c4f7093b-9f1a-4e5e-b90d-66f83b9c97c4, d5c7dc2c-4e10-4e71-ba2b-055a33e15f02, 07b1c6f7-fe95-4c1f-b0ba-6bc82bceaa4e, 00966cb8-41c2-4e95-b5e7-d8604c000927, e039c78e-6647-4c8d-b59b-e9baf73171a0, 6cfdef0a-25a2-4905-a40e-fea9c7940044]>)

I inherited and overrode [celery.chord][1] instead of [celery.task.chords.Chord][2] because I couldn't find it's source anywhere.

分享给朋友:
您可能感兴趣的文章:
随机阅读: