3
votes

Airflow exécute des tâches à des moments différents dans le même jour?

J'ai 30 tâches individuelles dans un dag, elles n'ont aucune dépendance entre elles. Les tâches exécutent le même code. La seule différence est le volume de données, certaines tâches se termineront en quelques secondes, certaines tâches prendront 2 heures ou plus.

Le problème est pendant le rattrapage, les tâches qui se terminent en secondes sont bloquées par des tâches qui prennent des heures avant d'être terminées. passer à la prochaine date d'exécution.

Je peux les diviser en points individuels, mais cela semble idiot et 30 tâches deviendront un plus grand nombre à l'avenir.

Y en a-t-il moyen d'exécuter des tâches dans le même temps à des moments d'exécution différents? Comme dès qu'une tâche est terminée, prenez la prochaine date d'exécution, quelle que soit la façon dont les autres tâches se déroulent.

Ajout d'une image à titre d'illustration. En gros, j'aimerais voir deux autres boîtes vertes pleines sur la première ligne alors que la troisième ligne est toujours en retard.

 airflow_dag_ideal

Modifier :

Après l ' explication de y2k-shubham, j'ai essayé de l'implémenter. Mais ça ne marche toujours pas. La tâche rapide commence le 2019-01-30 00 , se termine dans une seconde et ne démarre pas le 2019-01-30 01 car la tâche lente est toujours en cours d'exécution. Si possible, il serait idéal d'exécuter 2019-01-30 01 , 2019-01-30 02 , 2019-01-30 03 code > ... en parallèle si possible

Ajout d'un exemple de code

import time
from datetime import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

default_args = {
    'owner': 'test',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 30, 0, 0, 0),
    'trigger_rule': TriggerRule.DUMMY
}

dag = DAG(dag_id='test_dag', default_args=default_args, schedule_interval='@hourly')


def fast(**kwargs):
    return 1


def slow(**kwargs):
    time.sleep(600)
    return 1


fast_task = PythonOperator(
    task_id='fast',
    python_callable=fast,
    provide_context=True,
    priority_weight=10000,
    pool='fast_pool',
    # weight_rule='upstream', # using 1.9, this param doesn't exist
    dag=dag
)

slow_task = PythonOperator(
    task_id='slow',
    python_callable=slow,
    provide_context=True,
    priority_weight=500,
    pool='slow_pool',
    # weight_rule='upstream', # using 1.9, this param doesn't exist
    dag=dag
)

fast_task >> slow_task # not working


1 commentaires

Clarifications [1] Parlez-vous de l'ordre relatif d'exécution des tâches dans des DagRun s distincts? [2] Quel exécuteur utilisez-vous (mentionné dans votre fichier airflow.cfg )?


3 Réponses :


2
votes

Cela est probablement dû au fait que vous disposez de moins d'emplacements d'exécution que de tâches lentes. Le planificateur ne se soucie pas particulièrement de l'ordre dans lequel il exécute les tâches, car vous avez dit que vous ne vous en souciez pas non plus.

Si cela compte vraiment pour vous, ceux-ci devraient probablement être divisés en différents points ou vous devriez déclarer les dépendances que vous voulez que les tâches moins chères terminent en premier. Il y a plusieurs façons d'exprimer ce que vous voulez, il vous suffit de comprendre ce que c'est.


2 commentaires

Merci pour la réponse. Je ne sais pas si je vous comprends bien. J'ai ajouté une photo pour illustrer ce que je veux.


La réponse de y2k-shubham est essentiellement la même que la mienne, mais il donne de nombreux exemples spécifiques de choses que vous pourriez vouloir faire en fonction de ce dont vous avez besoin.



3
votes

Je peux penser à 3 solutions possibles à vos problèmes (j'ajouterai d'autres alternatives quand elles me viendront à l'esprit)

  1. Définissez start_date sur des tâches s individuelles dans le DAG (à l'exception d'un start_date du DAG lui-même) comme indiqué ici . Cependant, je ne serais jamais en faveur de cette approche car ce serait comme un retour sur les mêmes crons basés sur le temps que Airflow essaie de remplacer.

  2. Utilisez les pool s pour séparer les tâches s par exécution / priorité . Voici une idée (vous devrez peut-être retravailler selon vos besoins): Mettez toutes les petites tâches s dans tiny_task_pool et toutes les grandes < / em> ceux dans big_task_pool . Laissez le tiny_task_pool avoir un nombre significativement plus élevé de slots que big_task_pool . Cela rendrait la famine de votre minuscule -tâches beaucoup moins probables. Vous pouvez faire preuve de créativité avec encore plus de niveaux de pool .

  3. Même si vos tâches n'ont pas de vraies dépendances entre elles, cela ne devrait pas faire de mal à introduire délibérément des dépendances afin que toutes (ou la plupart) des grandes tâches soient effectuées en aval de petites tâches (et donc changer la structure de votre DAG ). Cela reviendrait à une approche du type travail le plus court d'abord . Vous pouvez également explorer priority_weight / priority_rule pour obtenir un contrôle encore plus précis.

Toutes les alternatives ci-dessus supposent que les longueurs (durée d'exécution) de la tâche sont connues à l'avance. Dans le monde réel, cela pourrait ne pas être vrai; ou même si c'est le cas, il pourrait changer progressivement les heures supplémentaires. Pour cela, je vous suggère de peaufiner votre script de définition de dag pour prendre en compte le temps d'exécution moyen (ou médian) de votre tâche s sur les n derniers passages pour décider de leur priorité.

  • Pour la méthode start_date , il suffit de fournir une start_date ultérieure (en fait la même date, l'heure ultérieure) aux tâches s qui ont duré plus longtemps lors des exécutions précédentes
  • Pour la méthode de pool , déplacez les tâches dans différents pools en fonction de leurs durées d'exécution précédentes
  • Pour la méthode dépendance de tâche , augmentez l'exécution des tâches s en aval . Cela peut sembler difficile mais vous pouvez le visualiser comme ceci: Créez 3 DummyOperator s et liez-les (l'un après l'autre). Vous devez maintenant remplir toutes les petites tâches entre les 2 premiers DummyOperator et les grandes entre les deux suivants.

3 commentaires

Personnellement, je préférerais l'approche dépendance des tâches car (1) pas de travail supplémentaire / caché en dehors du câblage des tâches (2) Excellent repère visuel (depuis DAG < La structure de / code> refléterait cela sur l'interface utilisateur)


wow ... merci pour l'explication détaillée. J'essaierai vos suggestions.


J'ai essayé les n ° 2 et 3 ... cela ne semble pas fonctionner. J'ai joint le code. Pourriez-vous jeter un œil? Merci beaucoup!



4
votes

Il s'avère que deux variables peuvent être définies pour résoudre mon problème très facilement.

concurrence et max_active_runs

Dans l'exemple ci-dessous, vous pouvez avoir 4 dags en cours d'exécution et chaque dag peut avoir 4 tâches en même temps . D'autres combinaisons sont également possibles.

dag = DAG(
    dag_id='sample_dag',
    default_args=default_args,
    schedule_interval='@daily',
    # this will allow up to 16 tasks to be run at the same time
    concurrency=16,
    # this will allow up to 4 dags to be run at the same time
    max_active_runs=4,
)


0 commentaires