4
votes

Comment voir le résultat de MySqlHook dans le journal

J'utilise MySqlHook pour établir une connexion à partir de airflow_db et j'effectue une requête, mais j'ai besoin de voir le résultat de la requête quelque part (disons log) , comment puis-je voir?

Voici l'exemple de code

t1 = MySqlOperator(
    task_id='basic_mysql',
    mysql_conn_id='airflow_db',
    sql="select * from xcom",
    dag=dag)


0 commentaires

3 Réponses :


2
votes

AFAIK, MySqlOperator sert à exécuter une requête UPDATE / DELETE etc. en d'autres termes interroge qui:

  • ne renvoie aucun résultat
  • retourne le résultat, mais cela ne vous dérange pas

Pour obtenir le résultat réel, vous devez exploiter MySqlHook . Voici un petit extrait de code ( Python 3.6+ ) pour commencer (non testé, mais juste pour des conseils)

from typing import List, Optional, Any
from airflow.hooks.mysql_hook import MySqlHook

# instantiate a MySqlHook
mysql_hook: MySqlHook = MySqlHook(mysql_conn_id="airflow_db")

# get records (this method comes from airflow.hooks.db_api_hook.DbApiHook)
records: List[List[Optional[Any]]] = mysql_hook.get_records(sql="select * from xcom")

# print records
print(records)

# alternatively, you can write records to task's logger
# note that here 'operator' = reference to your Operator
# operator.log.info("\n".join(records))

La sortie de print () / log.info () apparaîtra dans le journal des tâches sur l'interface utilisateur


2 commentaires

vous avez raison. Nous devons exploiter notre MySqlHook, mais kaxil l'a montré sous une forme très claire et a donc marqué sa réponse comme correcte. Quoi qu'il en soit, merci pour votre contribution.


juste en notant que quelque chose comme MERGE / UPSERT fonctionne parfaitement via SQLOperator mais renvoie toujours des informations précieuses



0
votes

En général, avec Airflow, votre requête doit être écrite de telle sorte que les résultats soient placés dans une table temporaire (peut-être en incluant le results_name _ {{ds_nodash}} ). Vous pouvez ensuite utiliser MySqlToSomethingElse Operator pour déplacer les résultats de la table temporaire. Ensuite, nettoyez en laissant tomber la table.

Je ne vois aucune raison pour laquelle la journalisation des résultats dans les journaux Airflow serait un travail suffisant pour un DAG.


0 commentaires

4
votes

L'opérateur MySQL actuellement (airflow 1.10.1 au moment de l'écriture) ne prend pas en charge le retour de quoi que ce soit dans XCom, donc le correctif pour vous, pour le moment, est d'écrire un petit opérateur vous-même. Vous pouvez le faire directement dans votre fichier DAG:

from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from airflow.hooks.mysql_hook import MySqlHook

class ReturningMySqlOperator(MySqlOperator):
    def execute(self, context):
        self.log.info('Executing: %s', self.sql)
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                         schema=self.database)
        return hook.get_records(
            self.sql,
            parameters=self.parameters)

t1 = ReturningMySqlOperator(
    task_id='basic_mysql',
    mysql_conn_id='airflow_db',
    sql="select * from xcom",
    dag=dag)

def get_records(**kwargs):
    ti = kwargs['ti']
    xcom = ti.xcom_pull(task_ids='basic_mysql')
    string_to_print = 'Value in xcom is: {}'.format(xcom)
    # Get data in your logs
    logging.info(string_to_print)

t2 = PythonOperator(
    task_id='records',
    provide_context=True,
    python_callable=get_records,
    dag=dag)

t1 >> t2


4 commentaires

Merci @kaxil, ne pouvons-nous pas tirer la valeur de xcom sans utiliser d'opérateur? Comme j'ai créé une variable dans mon DAG et que je veux attribuer xcom à cette variable.


Pour ce cas d'utilisation, vous pouvez utiliser la variable Airflow ( airflow.apache.org/concepts.html#variables ). Et vous pouvez définir / obtenir la valeur dans le DAG et l'utiliser également dans une variable modèle dans cet opérateur.


vous voulez dire, je peux stocker la valeur xcom dans Airflow Variable?


@Ashish Kumar Variable.set (..) peut être utilisé pour conserver une Variable dans le backend de Airflow ( meta < / i>) db (même à l'exécution, c'est-à-dire depuis Operator ). Consultez son utilisation ici