Mon DAG est censé s'exécuter toutes les heures. Je tire chaque heure de données d'une source s3 et je les traite. Parfois, la tâche prend plus d'une heure. À ce moment-là, il me manque une heure de données.
Exemple: 13 h 00 Le DAG a démarré et a fonctionné pendant 2 heures. Donc, ma prochaine exécution de DAG prend le paramètre comme 3 (15h) données manquantes de 14h. En d'autres termes, comment appeler la tâche et m'assurer qu'elle s'exécute toutes les heures i., 24 fois par jour
3 Réponses :
Cela semble être un scénario parfait pour utiliser TimeDeltaSensor
Remarque: l'extrait de code suivant est juste pour référence et n'a PAS été testé
import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.time_delta_sensor import TimeDeltaSensor
from airflow.utils.trigger_rule import TriggerRule
# create DAG object
my_dag: DAG = DAG(dag_id="my_dag",
start_date=datetime.datetime(year=2019, month=3, day=11),
schedule_interval="0 0 0 * * *")
# create dummy begin & end tasks
my_begin_task: DummyOperator = DummyOperator(dag=my_dag,
task_id="my_begin_task")
my_end_task: DummyOperator = DummyOperator(dag=my_dag,
task_id="my_end_task",
trigger_rule=TriggerRule.ALL_DONE)
# populate the DAG
for i in range(1, 24, 1):
# create sensors and actual tasks for all hours of the day
my_time_delta_sensor: TimeDeltaSensor = TimeDeltaSensor(dag=my_dag,
task_id=f"my_time_delta_sensor_task_{i}_hours",
delta=datetime.timedelta(hours=i))
my_actual_task: PythonOperator = PythonOperator(dag=my_dag,
task_id=f"my_actual_task_{i}_hours",
python_callable=my_callable
..)
# wire-up tasks together
my_begin_task >> my_time_delta_sensor >> my_actual_task >> my_end_task
Références
Voici mon DAG
HOUR_PACIFIC = arrow.utcnow().shift(hours=-3).to('US/Pacific').format("HH")
dag = DAG(
DAG_ID,
catchup=False,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=5),
schedule_interval='0 * * * *')
start = DummyOperator(
task_id='Start',
dag=dag)
my_task = EMRStep(emr,
'stg',
HOUR_PACIFIC)
end = DummyOperator(
task_id='End',
dag=dag
)
start >> my_task >> end
Vous devez passer le catchup = True pour l'objet DAG.
Pouvez-vous publier un exemple de votre DAG? Les DAG simultanés dans Airflow ne posent aucun problème, c'est-à-dire que votre DAG à 14 heures devrait fonctionner parfaitement, même si le DAG à 13 heures est toujours en cours d'exécution ...
@dorvak Vous avez raison. C'était ma logique. Je veux exécuter dag chaque heure et je passe une heure en fonction de current_time. Mon environnement de flux d'air ne prend que 4 instances dans la file d'attente. Ainsi, parfois, mon instance de tâche où je passe une heure en tant que paramètre est retardée (en raison d'autres travaux de longue durée). Exemple: