6
votes

Airflow: ExternalTaskSensor ne déclenche pas la tâche

J'ai déjà vu ceci et ce questions sur SO et apporté les modifications en conséquence. Cependant, mon DAG dépendant reste bloqué en état de poussée. Voici mon DAG maître:

[2019-01-10 11:45:20,215] {{jdbc_operator.py:56}} INFO - Executing: CALL AIRFLOW.TEST_INSERT (20)
[2019-01-10 11:45:21,477] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:21,476] {{dbapi_hook.py:166}} INFO - CALL AIRFLOW.TEST_INSERT (20)
[2019-01-10 11:45:24,139] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:24,137] {{jobs.py:2627}} INFO - Task exited with return code 0

Voici mon DAG dépendant:

[2019-01-10 11:43:52,951] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:44:52,955] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:45:52,961] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:46:52,949] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:47:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:48:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:49:52,905] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 

Voici les journaux du DAG dépendant une fois que le DAG maître obtient exécuté:

from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor

today = datetime.today()

default_args = {
    'depends_on_past': False,
    'retries': 0,
    'start_date': datetime(today.year, today.month, today.day),
    'schedule_interval': '@once'
}

dag = DAG('external-dag-upstream', default_args=default_args)

task_sensor = ExternalTaskSensor(
    task_id='link_upstream',
    external_dag_id='call-procedure-and-bash',
    external_task_id='call_procedure',
    execution_delta=timedelta(minutes=-2),
    dag=dag
)

count_rows = JdbcOperator(
    task_id='count_rows',
    jdbc_conn_id='airflow_db2',
    sql='SELECT COUNT(*) FROM AIRFLOW.TEST',
    dag=dag
)

count_rows.set_upstream(task_sensor)

Voici les journaux de l'exécution du DAG maître:

from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime
from airflow.operators.bash_operator import BashOperator

today = datetime.today()

default_args = {
    'depends_on_past': False,
    'retries': 0,
    'start_date': datetime(today.year, today.month, today.day),
    'schedule_interval': '@once'
}

dag = DAG('call-procedure-and-bash', default_args=default_args)

call_procedure = JdbcOperator(
    task_id='call_procedure',
    jdbc_conn_id='airflow_db2',
    sql='CALL AIRFLOW.TEST_INSERT (20)',
    dag=dag
)

call_procedure

Mon hypothèse est, Airflow devrait déclencher la dépendance DAG si le maître fonctionne bien? J'ai essayé de jouer avec execution_delta mais cela ne semble pas fonctionner.

De plus, schedule_interval et start_date sont idem pour les deux DAG, donc ne pensez pas que cela devrait causer des problèmes.

Suis-je absent de quelque chose?


0 commentaires

4 Réponses :


0
votes

Il se peut que vous deviez utiliser une temporisation positive: https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html car en soustrayant le delta d'exécution, il va finir par rechercher une tâche qui s'est exécutée 2 minutes après elle-même. < / p>

Cependant, le delta n'est pas vraiment une plage, le TI doit avoir un ID Dag, un ID de tâche, un résultat réussi et une date d'exécution dans la liste des datetimes. Qui lorsque vous donnez execution_delta comme un delta, est une liste d'une date-heure prenant la date d'exécution actuelle et soustrayant le delta temporel.

Cela revient probablement à vous soit de supprimer le chronomètre afin que les deux dates d'exécution correspondent et que le capteur attende que l'autre tâche réussisse, OU votre date de début et votre intervalle de planification étant définis comme fondamentalement aujourd'hui et @once obtiennent des dates d'exécution qui ne sont pas prévisibles les unes avec les autres. Vous pouvez essayer de définir par exemple datetime (2019,1,10) et 0 1 * * * pour les faire fonctionner tous les deux tous les jours à 1h du matin (encore une fois sans execution_delta ).


1 commentaires

J'ai supprimé execution_delta et défini schedule_interval sur 0 1 * * * . Pourtant, cela n'a pas déclenché le DAG lorsque celui en amont était terminé. Cependant, lorsque je change la date de début à la volée (lorsque le capteur est en cours d'exécution), il termine en quelque sorte le DAG en aval. On dirait que cela a probablement quelque chose à voir avec la date de début des deux DAG mais je ne suis pas encore en mesure de le comprendre.



1
votes

Assurez-vous que les deux DAG démarrent en même temps et que vous ne démarrez aucun des DAG manuellement.


0 commentaires

1
votes

J'espère que vous ne déclenchez pas le DAG manuellement. Si vous voulez le tester, laissez le DAG s'exécuter selon le calendrier, puis surveillez les exécutions du DAG.


0 commentaires

0
votes

J'ai eu ce problème à cause d'un changement d'heure été / hiver: "1 jour avant" signifie "exactement 24 heures avant" donc si le fuseau horaire a un changement d'heure d'été entre les deux, le DAG est bloqué.

Une solution consiste à le définir manuellement comme réussi.

L'autre façon serait d'utiliser l'argument execution_date_fn et de calculer manuellement le décalage horaire correctement dans ce cas.


0 commentaires