3
votes

comment passer le paramètre de requête au fichier sql à l'aide de l'opérateur bigquery

J'ai besoin d'accéder au paramètre passé par BigqueryOperator dans le fichier sql, mais j'obtiens une erreur ERROR - l'argument queryParameters doit avoir un type pas J'utilise le code ci-dessous:

select  cast(DATE_ADD(a.dt_2, interval 7 day) as DATE) as dt_1
,a.dt_2
,cast('2010-01-01' as DATE) as dt_3 
from (select cast(@threshold_date as date) as dt_2) a

)

Sql:

t2 = bigquery_operator.BigQueryOperator(
task_id='bq_from_source_to_clean',
sql='prepare.sql',
use_legacy_sql=False,
allow_large_results=True,
query_params=[{ 'name': 'threshold_date', 'parameterType': { 'type': 'STRING' },'parameterValue': { 'value': '2020-01-01' } }],
destination_dataset_table="{}.{}.{}".format('xxxx',
                                            'xxxx',
                                            'temp_airflow_test'),
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
dag=dag

J'utilise la version Google composer composer-1.7.0-airflow-1.10.2

Merci d'avance.


0 commentaires

3 Réponses :


8
votes

Après avoir plongé dans le code source, il semble que BigQueryHook avait un bogue corrigé dans Airflow 1.10.3.

La façon dont vous avez défini query_params est correcte pour les plus récents versions d'Airflow et doit être une liste selon l'API BigQuery: voir https://cloud.google.com/bigquery/docs/parameterized-queries#bigquery_query_params_named-python .

Quoi qu'il en soit, vous obtenez cette erreur car dans Airflow 1.10. 2, query_params est défini comme un dict , voir:

https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/bigquery_hook.py # L678

def _validate_value(key, value, expected_type):
    """ function to check expected type and raise
    error if type is not correct """
    if not isinstance(value, expected_type):
        raise TypeError("{} argument must have a type {} not {}".format(
            key, expected_type, type(value)))

Cela entraîne la fonction interne _validate_value à lancer une TypeError :

https://github.com/apache/airflow/blob/1.10.2/airflow/ contrib / hooks / bigquery_hook.py # L1954

query_param_list = [
    ...
    (query_params, 'queryParameters', None, dict),
    ...
]

Je n'ai trouvé aucun exemple de query_params dans Airflow 1.10.2 (ou tous les tests unitaires ...), mais je pense que c'est juste parce qu'il n'est pas utilisable.

Ces bogues ont été corrigés par ces commits:

Ces modifications ont été intégrées dans Airflow 1.10.3, mais, pour le moment, Airflow 1.10.3 n'est pas disponible dans Composer ( https://cloud.google.com/composer/docs/concepts/versioning/composer-versions#new_environments ): dernière version ont été publiés le 16 mai 2019 et intègrent la version 1.10.2.

En attendant cette nouvelle version, je vois 2 façons de résoudre votre problème:

  • copiez / collez les versions fixes de BigQueryOperator et BigQueryHook et intégrez-les dans vos sources pour les utiliser, ou étendez le BigQueryHook existant et remplacez-les méthodes buggées. Je ne suis pas sûr que vous puissiez corriger BigQueryHook directement (pas d'accès à ces fichiers dans l'environnement Composer)
  • modélisez vous-même votre requête SQL (et n'utilisez pas query_params )


0 commentaires

1
votes

C'est certainement un bogue avec composer (Airflow 1.10.2) nous l'avons corrigé en extrayant les fichiers de flux d'air de github et en corrigeant le fichier bigquery_hook.py, puis en référençant le fichier corrigé dans bigquery_operator.py (tous deux téléchargés dans une lib dossier), les correctifs sont:

  1. bigquery_operator.py (ligne 21)

    de lib.bigquery_hook importer BigQueryHook

  2. bigquery_hook.py

    (ligne 678) (query_params, 'queryParameters', None, list),

    (ligne 731) si 'useLegacySql' dans la configuration ['query'] et la configuration ['query'] ['useLegacySql'] et \

puis dans votre dag, référencez l'opérateur BQ téléchargé: "from lib.bigquery_operator import BigQueryOperator"


0 commentaires

1
votes

Partage de deux façons de transmettre des paramètres de requête dans l'opérateur BigQuery -

  1. Jinja Templating - Dans la requête ci-dessous, vous voyez '{{(execution_date - macros.timedelta (hours = 1)). strftime ('% Y-% m- % d% H: 00: 00 ')}}' est le tempate jina qui sera résolu lors de l'exécution.

    SELECT owner_display_name, title, view_count FROM bigquery-public-data.stackoverflow.posts_questions WHERE creation_date> CAST (' {{(execution_date - macros.timedelta (hours = 1)). strftime ('% Y-% m-% d% H: 00: 00')}} 'AS TIMESTAMP) ORDER BY view_count DESC LIMIT 100

  2. query_params - pour la clause in, le type serait tableau et le type de type tableau devrait être le type de la colonne dans la grande requête.

    query_params = [{'name': 'DATE_IN_CLAUSE', 'parameterType': {'type': 'ARRAY', 'arrayType': {'type': 'TIMESTAMP'}}, 'parameterValue': {'arrayValues ': [{' valeur ': datetime.utcnow (). strftime ('% Y-% m-% d% H: 00: 00 ')}, {' valeur ': (datetime.utcnow () - timedelta (heures = 1)). Strftime ('% Y-% m-% d% H: 00: 00')}]}}, {'name': 'COUNT', 'parameterType': {'type': 'INTEGER'}, 'parameterValue': {'value': 1}}]

    SELECT owner_display_name, title, view_count FROM bigquery-public-data.stackoverflow.posts_questions WHERE creation_date dans UNNEST (@DATE_IN_CLAUSE) et view_count> @COUNT ORDER BY view_count DESC LIMIT 100

Remarque - La requête supérieure et les paramètres peuvent ne pas vous donner de résultats mais ils réussiront sans aucune erreur. Ces exemples sont juste pour montrer comment passer des paramètres.


0 commentaires