J'ai 30 tâches individuelles dans un dag, elles n'ont aucune dépendance entre elles. Les tâches exécutent le même code. La seule différence est le volume de données, certaines tâches se termineront en quelques secondes, certaines tâches prendront 2 heures ou plus.
Le problème est pendant le rattrapage, les tâches qui se terminent en secondes sont bloquées par des tâches qui prennent des heures avant d'être terminées. passer à la prochaine date d'exécution.
Je peux les diviser en points individuels, mais cela semble idiot et 30 tâches deviendront un plus grand nombre à l'avenir.
Y en a-t-il moyen d'exécuter des tâches dans le même temps à des moments d'exécution différents? Comme dès qu'une tâche est terminée, prenez la prochaine date d'exécution, quelle que soit la façon dont les autres tâches se déroulent.
Ajout d'une image à titre d'illustration. En gros, j'aimerais voir deux autres boîtes vertes pleines sur la première ligne alors que la troisième ligne est toujours en retard.
Modifier :
Après l ' explication de y2k-shubham, j'ai essayé de l'implémenter. Mais ça ne marche toujours pas. La tâche rapide commence le 2019-01-30 00
, se termine dans une seconde et ne démarre pas le 2019-01-30 01
car la tâche lente est toujours en cours d'exécution. Si possible, il serait idéal d'exécuter 2019-01-30 01
, 2019-01-30 02
, 2019-01-30 03 code > ... en parallèle si possible
Ajout d'un exemple de code
import time from datetime import datetime from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.trigger_rule import TriggerRule default_args = { 'owner': 'test', 'depends_on_past': False, 'start_date': datetime(2019, 1, 30, 0, 0, 0), 'trigger_rule': TriggerRule.DUMMY } dag = DAG(dag_id='test_dag', default_args=default_args, schedule_interval='@hourly') def fast(**kwargs): return 1 def slow(**kwargs): time.sleep(600) return 1 fast_task = PythonOperator( task_id='fast', python_callable=fast, provide_context=True, priority_weight=10000, pool='fast_pool', # weight_rule='upstream', # using 1.9, this param doesn't exist dag=dag ) slow_task = PythonOperator( task_id='slow', python_callable=slow, provide_context=True, priority_weight=500, pool='slow_pool', # weight_rule='upstream', # using 1.9, this param doesn't exist dag=dag ) fast_task >> slow_task # not working
3 Réponses :
Cela est probablement dû au fait que vous disposez de moins d'emplacements d'exécution que de tâches lentes. Le planificateur ne se soucie pas particulièrement de l'ordre dans lequel il exécute les tâches, car vous avez dit que vous ne vous en souciez pas non plus.
Si cela compte vraiment pour vous, ceux-ci devraient probablement être divisés en différents points ou vous devriez déclarer les dépendances que vous voulez que les tâches moins chères terminent en premier. Il y a plusieurs façons d'exprimer ce que vous voulez, il vous suffit de comprendre ce que c'est.
Merci pour la réponse. Je ne sais pas si je vous comprends bien. J'ai ajouté une photo pour illustrer ce que je veux.
La réponse de y2k-shubham est essentiellement la même que la mienne, mais il donne de nombreux exemples spécifiques de choses que vous pourriez vouloir faire en fonction de ce dont vous avez besoin.
Je peux penser à 3 solutions possibles à vos problèmes (j'ajouterai d'autres alternatives quand elles me viendront à l'esprit)
Définissez start_date
sur des tâches
s individuelles dans le DAG
(à l'exception d'un start_date
du DAG
lui-même) comme indiqué ici . Cependant, je ne serais jamais en faveur de cette approche car ce serait comme un retour sur les mêmes crons basés sur le temps que Airflow code> essaie de remplacer.
Utilisez les pool
s strong > pour séparer les tâches
s par exécution / priorité . Voici une idée (vous devrez peut-être retravailler selon vos besoins): Mettez toutes les petites tâches
s dans tiny_task_pool
et toutes les grandes < / em> ceux dans big_task_pool
. Laissez le tiny_task_pool
avoir un nombre significativement plus élevé de slots
que big_task_pool
. Cela rendrait la famine de votre minuscule -tâches beaucoup moins probables. Vous pouvez faire preuve de créativité avec encore plus de niveaux de pool
.
Même si vos tâches
n'ont pas de vraies dépendances entre elles, cela ne devrait pas faire de mal à introduire délibérément des dépendances afin que toutes (ou la plupart) des grandes tâches soient effectuées en aval
de petites tâches (et donc changer la structure de votre DAG code >). Cela reviendrait à une approche du type travail le plus court d'abord . Vous pouvez également explorer
priority_weight code> /
pour obtenir un contrôle encore plus précis. priority_rule
Toutes les alternatives ci-dessus supposent que les longueurs (durée d'exécution) de la tâche
sont connues à l'avance. Dans le monde réel, cela pourrait ne pas être vrai; ou même si c'est le cas, il pourrait changer progressivement les heures supplémentaires. Pour cela, je vous suggère de peaufiner votre script de définition de dag pour prendre en compte le temps d'exécution moyen (ou médian) de votre tâche
s sur les n derniers passages pour décider de leur priorité.
start_date
, il suffit de fournir une start_date
ultérieure (en fait la même date, l'heure ultérieure) aux tâches
s qui ont duré plus longtemps lors des exécutions précédentes pool
, déplacez les tâches
dans différents pools
en fonction de leurs durées d'exécution précédentes tâches
s en aval
. Cela peut sembler difficile mais vous pouvez le visualiser comme ceci: Créez 3 DummyOperator
s et liez-les (l'un après l'autre). Vous devez maintenant remplir toutes les petites tâches entre les 2 premiers DummyOperator
et les grandes entre les deux suivants.
Personnellement, je préférerais l'approche dépendance des tâches car (1) pas de travail supplémentaire / caché en dehors du câblage des tâches (2) Excellent repère visuel (depuis DAG < La structure de / code> refléterait cela sur l'interface utilisateur)
wow ... merci pour l'explication détaillée. J'essaierai vos suggestions.
J'ai essayé les n ° 2 et 3 ... cela ne semble pas fonctionner. J'ai joint le code. Pourriez-vous jeter un œil? Merci beaucoup!
Il s'avère que deux variables peuvent être définies pour résoudre mon problème très facilement.
concurrence
et max_active_runs
Dans l'exemple ci-dessous, vous pouvez avoir 4 dags en cours d'exécution et chaque dag peut avoir 4 tâches en même temps . D'autres combinaisons sont également possibles.
dag = DAG( dag_id='sample_dag', default_args=default_args, schedule_interval='@daily', # this will allow up to 16 tasks to be run at the same time concurrency=16, # this will allow up to 4 dags to be run at the same time max_active_runs=4, )
Clarifications [1] Parlez-vous de l'ordre relatif d'exécution des tâches dans des
DagRun
s distincts? [2] Quelexécuteur
utilisez-vous (mentionné dans votrefichier airflow.cfg
)?