J'ai déjà vu ceci et ce questions sur SO et apporté les modifications en conséquence. Cependant, mon DAG dépendant reste bloqué en état de poussée. Voici mon DAG maître:
[2019-01-10 11:45:20,215] {{jdbc_operator.py:56}} INFO - Executing: CALL AIRFLOW.TEST_INSERT (20) [2019-01-10 11:45:21,477] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:21,476] {{dbapi_hook.py:166}} INFO - CALL AIRFLOW.TEST_INSERT (20) [2019-01-10 11:45:24,139] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:24,137] {{jobs.py:2627}} INFO - Task exited with return code 0
Voici mon DAG dépendant:
[2019-01-10 11:43:52,951] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... [2019-01-10 11:44:52,955] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... [2019-01-10 11:45:52,961] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... [2019-01-10 11:46:52,949] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... [2019-01-10 11:47:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... [2019-01-10 11:48:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... [2019-01-10 11:49:52,905] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ...
Voici les journaux du DAG dépendant une fois que le DAG maître obtient exécuté:
from airflow import DAG from airflow.operators.jdbc_operator import JdbcOperator from datetime import datetime, timedelta from airflow.sensors.external_task_sensor import ExternalTaskSensor today = datetime.today() default_args = { 'depends_on_past': False, 'retries': 0, 'start_date': datetime(today.year, today.month, today.day), 'schedule_interval': '@once' } dag = DAG('external-dag-upstream', default_args=default_args) task_sensor = ExternalTaskSensor( task_id='link_upstream', external_dag_id='call-procedure-and-bash', external_task_id='call_procedure', execution_delta=timedelta(minutes=-2), dag=dag ) count_rows = JdbcOperator( task_id='count_rows', jdbc_conn_id='airflow_db2', sql='SELECT COUNT(*) FROM AIRFLOW.TEST', dag=dag ) count_rows.set_upstream(task_sensor)
Voici les journaux de l'exécution du DAG maître:
from airflow import DAG from airflow.operators.jdbc_operator import JdbcOperator from datetime import datetime from airflow.operators.bash_operator import BashOperator today = datetime.today() default_args = { 'depends_on_past': False, 'retries': 0, 'start_date': datetime(today.year, today.month, today.day), 'schedule_interval': '@once' } dag = DAG('call-procedure-and-bash', default_args=default_args) call_procedure = JdbcOperator( task_id='call_procedure', jdbc_conn_id='airflow_db2', sql='CALL AIRFLOW.TEST_INSERT (20)', dag=dag ) call_procedure
Mon hypothèse est, Airflow devrait déclencher la dépendance DAG si le maître fonctionne bien? J'ai essayé de jouer avec execution_delta
mais cela ne semble pas fonctionner.
De plus, schedule_interval
et start_date
sont idem pour les deux DAG, donc ne pensez pas que cela devrait causer des problèmes.
Suis-je absent de quelque chose?
4 Réponses :
Il se peut que vous deviez utiliser une temporisation positive: https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html car en soustrayant le delta d'exécution, il va finir par rechercher une tâche qui s'est exécutée 2 minutes après elle-même. < / p>
Cependant, le delta n'est pas vraiment une plage, le TI doit avoir un ID Dag, un ID de tâche, un résultat réussi et une date d'exécution dans la liste des datetimes. Qui lorsque vous donnez execution_delta
comme un delta, est une liste d'une date-heure prenant la date d'exécution actuelle et soustrayant le delta temporel.
Cela revient probablement à vous soit de supprimer le chronomètre afin que les deux dates d'exécution correspondent et que le capteur attende que l'autre tâche réussisse, OU votre date de début et votre intervalle de planification étant définis comme fondamentalement aujourd'hui et @once
obtiennent des dates d'exécution qui ne sont pas prévisibles les unes avec les autres. Vous pouvez essayer de définir par exemple datetime (2019,1,10)
et 0 1 * * *
pour les faire fonctionner tous les deux tous les jours à 1h du matin (encore une fois sans execution_delta
).
J'ai supprimé execution_delta
et défini schedule_interval
sur 0 1 * * *
. Pourtant, cela n'a pas déclenché le DAG lorsque celui en amont était terminé. Cependant, lorsque je change la date de début à la volée (lorsque le capteur est en cours d'exécution), il termine en quelque sorte le DAG
en aval. On dirait que cela a probablement quelque chose à voir avec la date de début des deux DAG
mais je ne suis pas encore en mesure de le comprendre.
Assurez-vous que les deux DAG démarrent en même temps et que vous ne démarrez aucun des DAG manuellement.
J'espère que vous ne déclenchez pas le DAG manuellement. Si vous voulez le tester, laissez le DAG s'exécuter selon le calendrier, puis surveillez les exécutions du DAG.
J'ai eu ce problème à cause d'un changement d'heure été / hiver: "1 jour avant" signifie "exactement 24 heures avant" donc si le fuseau horaire a un changement d'heure d'été entre les deux, le DAG est bloqué.
Une solution consiste à le définir manuellement comme réussi.
L'autre façon serait d'utiliser l'argument execution_date_fn
et de calculer manuellement le décalage horaire correctement dans ce cas.