0
votes

Airflow longue durée de fonctionnement horaire DAG manquant quelques heures

Mon DAG est censé s'exécuter toutes les heures. Je tire chaque heure de données d'une source s3 et je les traite. Parfois, la tâche prend plus d'une heure. À ce moment-là, il me manque une heure de données.

Exemple: 13 h 00 Le DAG a démarré et a fonctionné pendant 2 heures. Donc, ma prochaine exécution de DAG prend le paramètre comme 3 (15h) données manquantes de 14h. En d'autres termes, comment appeler la tâche et m'assurer qu'elle s'exécute toutes les heures i., 24 fois par jour


2 commentaires

Pouvez-vous publier un exemple de votre DAG? Les DAG simultanés dans Airflow ne posent aucun problème, c'est-à-dire que votre DAG à 14 heures devrait fonctionner parfaitement, même si le DAG à 13 heures est toujours en cours d'exécution ...


@dorvak Vous avez raison. C'était ma logique. Je veux exécuter dag chaque heure et je passe une heure en fonction de current_time. Mon environnement de flux d'air ne prend que 4 instances dans la file d'attente. Ainsi, parfois, mon instance de tâche où je passe une heure en tant que paramètre est retardée (en raison d'autres travaux de longue durée). Exemple:


3 Réponses :


-1
votes

Cela semble être un scénario parfait pour utiliser TimeDeltaSensor


Remarque: l'extrait de code suivant est juste pour référence et n'a PAS été testé

import datetime

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.time_delta_sensor import TimeDeltaSensor
from airflow.utils.trigger_rule import TriggerRule

# create DAG object
my_dag: DAG = DAG(dag_id="my_dag",
                  start_date=datetime.datetime(year=2019, month=3, day=11),
                  schedule_interval="0 0 0 * * *")

# create dummy begin & end tasks
my_begin_task: DummyOperator = DummyOperator(dag=my_dag,
                                             task_id="my_begin_task")
my_end_task: DummyOperator = DummyOperator(dag=my_dag,
                                           task_id="my_end_task",
                                           trigger_rule=TriggerRule.ALL_DONE)

# populate the DAG
for i in range(1, 24, 1):
    # create sensors and actual tasks for all hours of the day
    my_time_delta_sensor: TimeDeltaSensor = TimeDeltaSensor(dag=my_dag,
                                                            task_id=f"my_time_delta_sensor_task_{i}_hours",
                                                            delta=datetime.timedelta(hours=i))
    my_actual_task: PythonOperator = PythonOperator(dag=my_dag,
                                                    task_id=f"my_actual_task_{i}_hours",
                                                    python_callable=my_callable
                                                    ..)
    # wire-up tasks together
    my_begin_task >> my_time_delta_sensor >> my_actual_task >> my_end_task

Références


0 commentaires

0
votes

Voici mon DAG

HOUR_PACIFIC = arrow.utcnow().shift(hours=-3).to('US/Pacific').format("HH")

dag = DAG(
    DAG_ID,
    catchup=False,
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=5),
    schedule_interval='0 * * * *')

start = DummyOperator(
    task_id='Start',
    dag=dag)

my_task = EMRStep(emr,
'stg',
HOUR_PACIFIC)

end = DummyOperator(
    task_id='End',
    dag=dag
)
start >> my_task >> end


0 commentaires

0
votes

Vous devez passer le catchup = True pour l'objet DAG.


0 commentaires