芹菜画布在异步和热切模式下的工作方式存在一些差异。我注意到,在替换自身的动态任务中,后面跟着链的组不会将结果发送到链上的下一个。
嗯,这似乎很复杂,让我举个例子:
鉴于以下任务:
@shared_task(bind=True)
def grouped(self, val):
task = (
group(asum.s(val, n) for n in range(val)) | asum.s(val)
)
raise self.replace(task)
当它在另一个画布中分组时,如下所示:
@shared_task(bind=True)
def flow(self, val):
workflow = (asum.s(1, val) |
asum.s(2) |
grouped.s() |
amul.s(3))
return self.replace(workflow)
任务amul在急切模式下不会接收分组结果。
为了真正解决这个问题,我在github上创建了一个示例项目,在那里你可以深入到问题中并帮助我解决一些快速的解决方案,可能的话,一些公关人员也会参与芹菜项目。
编辑--编辑
在这个项目中,我描述了两种使用芹菜的不同行为。在异步模式下,mode任务按预期工作。
>>> from equation.main import *
>>> from equation.tasks import *
>>> flow.delay(1).get()
78
>>> flow.delay(2).get()
120
>>> flow.delay(100).get()
47895
发布于 2020-02-04 08:40:14
在一个测试用例中,我一直在努力解决这个问题。对于未来读者来说,至少从芹菜4.4.0开始,以下成语将适用于所有上下文,包括同步的进程内执行:
return self.replace(...)
使用raise
或简单地让函数在Task.replace
之后结束,只会在异步模式下工作。相关代码是就在Task.replace的末尾
if self.request.is_eager:
return sig.apply().get()
else:
sig.delay()
raise Ignore('Replaced by new task')
发布于 2019-07-12 09:43:14
可悲的是,渴望的模式将永远不会与运行一个实际的员工相同。在运行一个实际的工作人员时,有太多复杂的事情需要急切的模式,而不是完全相同的事情。我同意这种情况在使用急迫模式时应该属于特殊情况,但预计会有一些差异。如果您知道如何解决这个问题,请提交一个PR,我们可以在那里检查修复。谢谢!
发布于 2019-07-12 03:46:16
grouped()没有返回任何内容,那么您希望amul如何得到结果?
https://stackoverflow.com/questions/56994569
复制