J'ai créé un BranchPythonOperator qui appelle 2 tâches en fonction de la condition comme:
class CheckTable: """ DAG task to check if table exists or not. """ def __call__(self, **kwargs) -> None: pg_hook = PostgresHook(postgres_conn_id="postgres_docker") query = "SELECT EXISTS ( \ SELECT 1 FROM information_schema.tables \ WHERE table_schema = 'public' \ AND table_name = 'users');" table_exists = pg_hook.get_records(query)[0][0] if table_exists: return "typicon_load_data" return "typicon_create_table"
Ceci est la classe appelable CheckTable
:
typicon_check_table = BranchPythonOperator( task_id='typicon_check_table', python_callable=CheckTable(), provide_context=True, dag=typicon_task_dag) typicon_create_table = PythonOperator( task_id='typicon_create_table', python_callable=CreateTable(), provide_context=True, dag=typicon_task_dag) typicon_load_data = PythonOperator( task_id='typicon_load_data', python_callable=LoadData(), provide_context=True, dag=typicon_task_dag) typicon_check_table.set_downstream([typicon_load_data, typicon_create_table]) typicon_create_table.set_downstream(typicon_load_data)
Le problème est à la fois que les tâches sont ignorées lorsque la tâche typicon_check_table
est exécutée.
Comment résoudre ce problème?
3 Réponses :
La tâche typicon_load_data
a typicon_create_table
comme parent et la règle de déclenchement par défaut est all_success
, donc je ne suis pas surpris par ce comportement. p>
Deux cas possibles ici:
CheckTable ()
renvoie typicon_load_data
, puis typicon_create_table
est ignoré, mais typicon_load_data
en aval est également ignoré. CheckTable ()
renvoie typicon_create_table
, qui est exécuté et il déclenche typicon_load_data
qui est ignoré car il s'agissait de la branche exclue. Je suppose que votre capture d'écran provient du cas 1.?
Ajoutez une règle trigger_rule = "all_done" à la table typicon_check_table comme ci-dessous
typicon_check_table = BranchPythonOperator( task_id='typicon_check_table', python_callable=CheckTable(), provide_context=True, trigger_rule="all_done", dag=typicon_task_dag)
J'ai travaillé avec le même scénario, cela fonctionne bien avec moi pour le code ci-dessous
BranchPythonOperator(task_id='slot_population_on_is_y_or_n', python_callable=DAGConditionalValidation('Y'), trigger_rule='one_success') slot_population_on_is_y = DummyOperator(task_id='slot_population_on_is_y') slot_population_on_is_n = DummyOperator(task_id='slot_population_on_is_n') slot_population_on_is_y_or_n >> [slot_population_on_is_y, slot_population_on_is_n] class DAGConditionalValidation: def __init__(self, conditional_param_key): self.conditional_param_key = conditional_param_key def __call__(self, **kwargs): if (conditional_param_key == 'Y'): return slot_population_on_is_y return slot_population_on_is_n
Tout votre code semble correct, mais vous n'avez pas la règle de déclenchement, veuillez définir la règle de déclenchement comme trigger_rule = 'one_success'
.
Cela devrait également fonctionner pour vous.
oui c'est vrai, je suis un nouveau contributeur donc j'ai besoin d'apprendre les techniques de publication :)