J'ai un DAG décrit comme ceci:
bucket_name |- airflow.cfg |- dags |_ pipeline.py |- ... |_ sql_requests |_ create_table.sql
La tâche create_table
appelle un script SQL create_table.sql
. Ce script SQL n'est pas dans le même dossier que le dossier DAG: il se trouve dans un dossier sql_requests
au même niveau que le dossier DAG.
Voici l'architecture à l'intérieur du bucket de GCP Composer (qui est Google Airflow):
tmpl_search_path = '/home/airflow/gcs/sql_requests/' with DAG(dag_id='pipeline', default_args=default_args, template_searchpath = [tmpl_search_path]) as dag: create_table = bigquery_operator.BigQueryOperator( task_id = 'create_table', sql = 'create_table.sql', use_legacy_sql = False, destination_dataset_table = some_table) )
Quel chemin dois-je définir pour template_searchpath pour référencer le dossier sql_requests
dans le bucket Airflow sur GCP?
J'ai essayé template_searchpath = ['/ home / airflow / gcs / sql_requests']
, template_searchpath = ['../sql_requests']
, template_searchpath = ['/ sql_requests']
mais aucun n'a fonctionné.
Le message d'erreur que je reçois est 'jinja2.exceptions.TemplateNotFound'
4 Réponses :
Je crois que, par défaut, l'opérateur recherche les fichiers sql dans le dossier DAG, vous pouvez donc mettre votre SQL dans le dossier
sql = '../sql/create_table.sql'
Et puis le référencer comme
sql = '/sql/create_table.sql'
Si cela ne fonctionne pas, essayez-le sans le début /
(dont je ne suis pas sûr que vous ayez besoin)
Modifier
Si vous souhaitez les placer dans un dossier à la racine du bucket, essayez
gs://composer-bucket-name/dags/sql/create_table.sql
Merci pour votre réponse. J'ai fait cela et cela a fonctionné. Mais j'aimerais savoir s'il est possible de mettre mes requêtes sql dans un dossier autre que dans le dossier dag, comme gs: //composer-bucket-name/sql/create_table.sql
. J'ai essayé de suivre le même schéma mais cela n'a pas fonctionné et je ne sais pas pourquoi.
Merci pour votre mise à jour, mais malheureusement j'ai essayé et cela n'a pas fonctionné, je ne sais pas pourquoi
Selon https://cloud.google.com/composer/docs / concepts / cloud-storage il n'est pas possible de stocker les fichiers nécessaires à l'exécution de dags ailleurs que dans les dossiers dags
ou plugins
:
Pour éviter un échec du workflow, stockez vos DAG, plugins et modules Python dans les dags / ou plugins / dossiers, même si vos modules Python ne contiennent pas de DAG ou de plugins.
C'est la raison pour laquelle j'ai eu l'erreur TemplateNotFound.
Vous pouvez stocker dans des chemins montés / connus qui sont des dags / plugins OU des données
Le dossier de données n'a pas de limite de capacité, mais il est facile de ne pas l'utiliser pour stocker tout ce que le serveur Web aurait besoin de lire, car le serveur Web ne peut pas accéder à ce dossier (par exemple, si vous mettez des fichiers SQL dans / data dossier, vous ne pourrez pas analyser le modèle rendu dans l'interface utilisateur, mais toutes les tâches qui doivent accéder au fichier pendant l'exécution fonctionneront très bien)
Remplacez le dossier " sql_requests " par le dossier " dag " afin que votre code ressemble à ceci:
tmpl_search_path = '/home/airflow/dags/sql_requests/' with DAG(dag_id='pipeline', default_args=default_args, template_searchpath = [tmpl_search_path]) as dag: create_table = bigquery_operator.BigQueryOperator( task_id = 'create_table', sql = 'create_table.sql', use_legacy_sql = False, destination_dataset_table = some_table ) )
Pour moi , ça marche!