运行更多任务的芹菜任务

| 我正在使用celerybeat来启动一项主要任务,该任务要执行许多次要任务。我已经写了两个任务。 有没有一种方法可以轻松地做到这一点? Celery是否允许从任务内部运行任务? 我的例子:
@task
def compute(users=None):
    if users is None:
        users = User.objects.all()

    tasks = []
    for user in users:
        tasks.append(compute_for_user.subtask((user.id,)))

    job = TaskSet(tasks)
    job.apply_async() # raises a IOError: Socket closed

@task
def compute_for_user(user_id):
    #do some stuff
从celerybeat调用ѭ1called,但尝试运行
apply_async
时会导致IOError。有任何想法吗?     
已邀请:
        回答开放性问题:从2.0版开始,Celery提供了一种从其他任务开始任务的简便方法。您所说的“次级任务”就是所谓的“子任务”。请参阅“任务集,子任务和回调”的文档,@ Paperino足以链接到该文档。 对于3.0版,Celery更改为对此行为及其他行为类型使用组。 您的代码表明您已经熟悉此界面。您的实际问题似乎是,“当我尝试运行我的子任务集时,为什么我会得到一个'Socket Closed'?3ѭ?”有关您的程序的信息。您的摘录无法按原样运行,因此我们无法自行检查您遇到的问题。请发布ѭ3随附的stacktrace,如果有任何运气,会帮助您解决崩溃问题的人会来。     
        您可以使用类似以下内容(在3.0中支持)
g = group(compute_for_user.s(user.id) for user in users)
g.apply_async()
    
        而且由于版本3.0 \'TaskSet \'不再是术语...作为特殊子任务类型的组,链和和弦是新事物,请参见http://docs.celeryproject.org/zh/3.1/ whatsnew-3.0.html#group-chord-chain-are-now-subtasks     
        对于提到的IOError,尽管此处的信息不足以说明是什么原因造成的,但我的猜测是您试图在task函数内部建立连接,因此无论何时调用任务,都会建立一个新连接。如果要将该任务调用数千次,则将有数千个连接。这将淹没系统套接字管理器,而IOError是它的抱怨。     

要回复问题请先登录注册