J'ai créé un BranchPythonOperator qui appelle 2 tâches en fonction de la condition comme:
class CheckTable:
"""
DAG task to check if table exists or not.
"""
def __call__(self, **kwargs) -> None:
pg_hook = PostgresHook(postgres_conn_id="postgres_docker")
query = "SELECT EXISTS ( \
SELECT 1 FROM information_schema.tables \
WHERE table_schema = 'public' \
AND table_name = 'users');"
table_exists = pg_hook.get_records(query)[0][0]
if table_exists:
return "typicon_load_data"
return "typicon_create_table"
Ceci est la classe appelable CheckTable :
typicon_check_table = BranchPythonOperator(
task_id='typicon_check_table',
python_callable=CheckTable(),
provide_context=True,
dag=typicon_task_dag)
typicon_create_table = PythonOperator(
task_id='typicon_create_table',
python_callable=CreateTable(),
provide_context=True,
dag=typicon_task_dag)
typicon_load_data = PythonOperator(
task_id='typicon_load_data',
python_callable=LoadData(),
provide_context=True,
dag=typicon_task_dag)
typicon_check_table.set_downstream([typicon_load_data, typicon_create_table])
typicon_create_table.set_downstream(typicon_load_data)
Le problème est à la fois que les tâches sont ignorées lorsque la tâche typicon_check_table est exécutée.
Comment résoudre ce problème?
3 Réponses :
La tâche typicon_load_data a typicon_create_table comme parent et la règle de déclenchement par défaut est all_success , donc je ne suis pas surpris par ce comportement. p>
Deux cas possibles ici:
CheckTable () renvoie typicon_load_data , puis typicon_create_table est ignoré, mais typicon_load_data en aval est également ignoré. CheckTable () renvoie typicon_create_table , qui est exécuté et il déclenche typicon_load_data qui est ignoré car il s'agissait de la branche exclue. Je suppose que votre capture d'écran provient du cas 1.?
Ajoutez une règle trigger_rule = "all_done" à la table typicon_check_table comme ci-dessous
typicon_check_table = BranchPythonOperator(
task_id='typicon_check_table',
python_callable=CheckTable(),
provide_context=True,
trigger_rule="all_done",
dag=typicon_task_dag)
J'ai travaillé avec le même scénario, cela fonctionne bien avec moi pour le code ci-dessous
BranchPythonOperator(task_id='slot_population_on_is_y_or_n', python_callable=DAGConditionalValidation('Y'),
trigger_rule='one_success')
slot_population_on_is_y = DummyOperator(task_id='slot_population_on_is_y')
slot_population_on_is_n = DummyOperator(task_id='slot_population_on_is_n')
slot_population_on_is_y_or_n >> [slot_population_on_is_y, slot_population_on_is_n]
class DAGConditionalValidation:
def __init__(self, conditional_param_key):
self.conditional_param_key = conditional_param_key
def __call__(self, **kwargs):
if (conditional_param_key == 'Y'):
return slot_population_on_is_y
return slot_population_on_is_n
Tout votre code semble correct, mais vous n'avez pas la règle de déclenchement, veuillez définir la règle de déclenchement comme trigger_rule = 'one_success' .
Cela devrait également fonctionner pour vous.
oui c'est vrai, je suis un nouveau contributeur donc j'ai besoin d'apprendre les techniques de publication :)