J'utilise MySqlHook pour établir une connexion à partir de airflow_db et j'effectue une requête, mais j'ai besoin de voir le résultat de la requête quelque part (disons log) , comment puis-je voir?
Voici l'exemple de code
t1 = MySqlOperator( task_id='basic_mysql', mysql_conn_id='airflow_db', sql="select * from xcom", dag=dag)
3 Réponses :
AFAIK, MySqlOperator
sert à exécuter une requête UPDATE
/ DELETE
etc. en d'autres termes interroge qui:
Pour obtenir le résultat réel, vous devez exploiter MySqlHook
. Voici un petit extrait de code ( Python 3.6+
) pour commencer (non testé, mais juste pour des conseils)
from typing import List, Optional, Any from airflow.hooks.mysql_hook import MySqlHook # instantiate a MySqlHook mysql_hook: MySqlHook = MySqlHook(mysql_conn_id="airflow_db") # get records (this method comes from airflow.hooks.db_api_hook.DbApiHook) records: List[List[Optional[Any]]] = mysql_hook.get_records(sql="select * from xcom") # print records print(records) # alternatively, you can write records to task's logger # note that here 'operator' = reference to your Operator # operator.log.info("\n".join(records))
La sortie de print ()
/ log.info ()
apparaîtra dans le journal des tâches sur l'interface utilisateur
vous avez raison. Nous devons exploiter notre MySqlHook, mais kaxil l'a montré sous une forme très claire et a donc marqué sa réponse comme correcte. Quoi qu'il en soit, merci pour votre contribution.
juste en notant que quelque chose comme MERGE
/ UPSERT
fonctionne parfaitement via SQLOperator mais renvoie toujours des informations précieuses
En général, avec Airflow, votre requête doit être écrite de telle sorte que les résultats soient placés dans une table temporaire (peut-être en incluant le results_name _ {{ds_nodash}}
). Vous pouvez ensuite utiliser MySqlTo
SomethingElse Operator
pour déplacer les résultats de la table temporaire. Ensuite, nettoyez en laissant tomber la table.
Je ne vois aucune raison pour laquelle la journalisation des résultats dans les journaux Airflow serait un travail suffisant pour un DAG.
L'opérateur MySQL actuellement (airflow 1.10.1 au moment de l'écriture) ne prend pas en charge le retour de quoi que ce soit dans XCom, donc le correctif pour vous, pour le moment, est d'écrire un petit opérateur vous-même. Vous pouvez le faire directement dans votre fichier DAG:
from airflow.operators.python_operator import PythonOperator from airflow.operators.mysql_operator import MySqlOperator from airflow.hooks.mysql_hook import MySqlHook class ReturningMySqlOperator(MySqlOperator): def execute(self, context): self.log.info('Executing: %s', self.sql) hook = MySqlHook(mysql_conn_id=self.mysql_conn_id, schema=self.database) return hook.get_records( self.sql, parameters=self.parameters) t1 = ReturningMySqlOperator( task_id='basic_mysql', mysql_conn_id='airflow_db', sql="select * from xcom", dag=dag) def get_records(**kwargs): ti = kwargs['ti'] xcom = ti.xcom_pull(task_ids='basic_mysql') string_to_print = 'Value in xcom is: {}'.format(xcom) # Get data in your logs logging.info(string_to_print) t2 = PythonOperator( task_id='records', provide_context=True, python_callable=get_records, dag=dag) t1 >> t2
Merci @kaxil, ne pouvons-nous pas tirer la valeur de xcom sans utiliser d'opérateur? Comme j'ai créé une variable dans mon DAG et que je veux attribuer xcom à cette variable.
Pour ce cas d'utilisation, vous pouvez utiliser la variable Airflow ( airflow.apache.org/concepts.html#variables ). Et vous pouvez définir / obtenir la valeur dans le DAG et l'utiliser également dans une variable modèle dans cet opérateur.
vous voulez dire, je peux stocker la valeur xcom dans Airflow Variable?
@Ashish Kumar Variable.set (..)
peut être utilisé pour conserver une Variable
dans le backend de Airflow
( meta < / i>) db (même à l'exécution, c'est-à-dire depuis Operator
). Consultez son utilisation ici