J'ai besoin d'accéder au paramètre passé par BigqueryOperator dans le fichier sql, mais j'obtiens une erreur ERROR - l'argument queryParameters doit avoir un type
J'utilise le code ci-dessous:
select cast(DATE_ADD(a.dt_2, interval 7 day) as DATE) as dt_1 ,a.dt_2 ,cast('2010-01-01' as DATE) as dt_3 from (select cast(@threshold_date as date) as dt_2) a
)
Sql:
t2 = bigquery_operator.BigQueryOperator( task_id='bq_from_source_to_clean', sql='prepare.sql', use_legacy_sql=False, allow_large_results=True, query_params=[{ 'name': 'threshold_date', 'parameterType': { 'type': 'STRING' },'parameterValue': { 'value': '2020-01-01' } }], destination_dataset_table="{}.{}.{}".format('xxxx', 'xxxx', 'temp_airflow_test'), create_disposition="CREATE_IF_NEEDED", write_disposition="WRITE_TRUNCATE", dag=dag
J'utilise la version Google composer composer-1.7.0-airflow-1.10.2
Merci d'avance.
3 Réponses :
Après avoir plongé dans le code source, il semble que BigQueryHook
avait un bogue corrigé dans Airflow 1.10.3.
La façon dont vous avez défini query_params
est correcte pour les plus récents versions d'Airflow et doit être une liste selon l'API BigQuery: voir https://cloud.google.com/bigquery/docs/parameterized-queries#bigquery_query_params_named-python .
Quoi qu'il en soit, vous obtenez cette erreur car dans Airflow 1.10. 2, query_params
est défini comme un dict
, voir:
https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/bigquery_hook.py # L678
def _validate_value(key, value, expected_type): """ function to check expected type and raise error if type is not correct """ if not isinstance(value, expected_type): raise TypeError("{} argument must have a type {} not {}".format( key, expected_type, type(value)))
Cela entraîne la fonction interne _validate_value
à lancer une TypeError
:
https://github.com/apache/airflow/blob/1.10.2/airflow/ contrib / hooks / bigquery_hook.py # L1954
query_param_list = [ ... (query_params, 'queryParameters', None, dict), ... ]
Je n'ai trouvé aucun exemple de query_params
dans Airflow 1.10.2 (ou tous les tests unitaires ...), mais je pense que c'est juste parce qu'il n'est pas utilisable.
Ces bogues ont été corrigés par ces commits:
dict
par list
Ces modifications ont été intégrées dans Airflow 1.10.3, mais, pour le moment, Airflow 1.10.3 n'est pas disponible dans Composer ( https://cloud.google.com/composer/docs/concepts/versioning/composer-versions#new_environments ): dernière version ont été publiés le 16 mai 2019 et intègrent la version 1.10.2.
En attendant cette nouvelle version, je vois 2 façons de résoudre votre problème:
BigQueryOperator
et BigQueryHook
et intégrez-les dans vos sources pour les utiliser, ou étendez le BigQueryHook
existant et remplacez-les méthodes buggées. Je ne suis pas sûr que vous puissiez corriger BigQueryHook
directement (pas d'accès à ces fichiers dans l'environnement Composer) query_params
) C'est certainement un bogue avec composer (Airflow 1.10.2) nous l'avons corrigé en extrayant les fichiers de flux d'air de github et en corrigeant le fichier bigquery_hook.py, puis en référençant le fichier corrigé dans bigquery_operator.py (tous deux téléchargés dans une lib dossier), les correctifs sont:
bigquery_operator.py (ligne 21)
de lib.bigquery_hook importer BigQueryHook
bigquery_hook.py
(ligne 678) (query_params, 'queryParameters', None, list),
(ligne 731) si 'useLegacySql' dans la configuration ['query'] et la configuration ['query'] ['useLegacySql'] et \
puis dans votre dag, référencez l'opérateur BQ téléchargé: "from lib.bigquery_operator import BigQueryOperator"
Partage de deux façons de transmettre des paramètres de requête dans l'opérateur BigQuery -
Jinja Templating - Dans la requête ci-dessous, vous voyez '{{(execution_date - macros.timedelta (hours = 1)). strftime ('% Y-% m- % d% H: 00: 00 ')}}' est le tempate jina qui sera résolu lors de l'exécution.
SELECT owner_display_name, title, view_count FROM bigquery-public-data.stackoverflow.posts_questions
WHERE creation_date> CAST (' {{(execution_date - macros.timedelta (hours = 1)). strftime ('% Y-% m-% d% H: 00: 00')}} 'AS TIMESTAMP) ORDER BY view_count DESC LIMIT 100
query_params - pour la clause in, le type serait tableau et le type de type tableau devrait être le type de la colonne dans la grande requête.
query_params = [{'name': 'DATE_IN_CLAUSE', 'parameterType': {'type': 'ARRAY', 'arrayType': {'type': 'TIMESTAMP'}}, 'parameterValue': {'arrayValues ': [{' valeur ': datetime.utcnow (). strftime ('% Y-% m-% d% H: 00: 00 ')}, {' valeur ': (datetime.utcnow () - timedelta (heures = 1)). Strftime ('% Y-% m-% d% H: 00: 00')}]}}, {'name': 'COUNT', 'parameterType': {'type': 'INTEGER'}, 'parameterValue': {'value': 1}}]
SELECT owner_display_name, title, view_count FROM bigquery-public-data.stackoverflow.posts_questions WHERE creation_date dans UNNEST (@DATE_IN_CLAUSE) et view_count> @COUNT ORDER BY view_count DESC LIMIT 100
Remarque - La requête supérieure et les paramètres peuvent ne pas vous donner de résultats mais ils réussiront sans aucune erreur. Ces exemples sont juste pour montrer comment passer des paramètres.