我有一个我想要修改的气流工作流程(参见底部的插图)。但是,我无法在文档中找到这样做的方法。
我没有运气就看过子网,分支和xcoms。 …
我也遇到过这种情况,并没有找到一个干净的方法来解决它。如果您知道要传递给每个子标记的所有不同可能参数...那么您可以做的是将其硬编码到DAG文件中,并始终使用每个可能的子标记创建DAG。然后你有一个运算符(类似你的“获取每一个n”),它获取你想要运行的子标记列表,并让它标记不在列表中的任何下游子标记 skipped 。像这样的东西:
skipped
SUBDAGS = { 'a': {'id': 'foo'}, 'b': {'id': 'bar'}, 'c': {'id': 'test'}, 'd': {'id': 'hi'}, } def _select_subdags(**context): names = fetch_list() # returns ["a", "c", "d"] tasks_to_skip = ['my_subdag_' + name for name in set(SUBDAGS) - set(names)] session = Session() tis = session.query(TaskInstance).filter( TaskInstance.dag_id == context['dag'].dag_id, TaskInstance.execution_date == context['ti'].execution_date, TaskInstance.task_id.in_(tasks_to_skip), ) for ti in tis: now = datetime.utcnow() ti.state = State.SKIPPED ti.start_date = now ti.end_date = now session.merge(ti) session.commit() session.close() select_subdags = PythonOperator( task_id='select_subdags', dag=dag, provide_context=True, python_callable=_select_subdags, ) for name, params in SUBDAGS.iteritems(): child_dag_id = 'my_subdag_' + name subdag_op = SubDagOperator( task_id=child_dag_id, dag=dag, subdag=my_subdag(dag.dag_id, child_dag_id, params), ) select_subdags >> subdag_op
显然不理想,特别是当你最终只想运行数百个子网格时。我们还在单个DAG中遇到了数千个子标记的一些性能问题,因为它可能导致大量的任务实例,其中大部分都被简单地跳过。