由于路易吉(2.6.1)工作分配不均,

我们试图运行一个简单的pipe道分布在docker群集群上。 luigi工作人员被部署为复制泊坞服务。 他们成功启动,并在luigi-server工作几秒钟后,他们开始死亡,因为没有工作分配给他们,所有的任务最终分配给一个工人。

我们必须在工作人员的luigi.cfg中设置keep_alive = True来强制他们不要死亡,但是在pipe道工作完成之后保持工作人员似乎不是个好主意。 有没有办法控制工作分配?

我们的testingpipe道:

class RunAllTasks(luigi.Task): tasks = luigi.IntParameter() sleep_time = luigi.IntParameter() def requires(self): for i in range(self.tasks): yield RunExampleTask(i, self.sleep_time) def run(self): with self.output().open('w') as f: f.write('All done!') def output(self): return LocalTarget('/data/RunAllTasks.txt') class RunExampleTask(luigi.Task): number = luigi.IntParameter() sleep_time = luigi.IntParameter() @property def cmd(self): return """ docker run --rm --name example_{number} hello-world """.format(number=self.number) def run(self): time.sleep(self.sleep_time) logger.debug(self.cmd) out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True) logger.debug(out) with self.output().open('w') as f: f.write(str(out)) def output(self): return LocalTarget('/data/{number}.txt'.format(number=self.number)) if __name__ == "__main__": luigi.run() 

你的问题是一次yield一个单一的需求的结果,而不是你想一次完成所有的需求,如下所示:

 def requires(self): reqs = [] for i in range(self.tasks): reqs.append(RunExampleTask(i, self.sleep_time)) yield reqs