J'ai un simple dag lors de l'exécution de la tâche Comment puis-je spécifier à BigQuery pour sauvegarder le résultat de la requête à une table partitionnée quotidienne? Ma première hypothèse a été d'utiliser J'utilise BQ_Query Code> La requête SQL est enregistrée dans une table fronde. Je veux que cela soit sauvé dans une table cloisonnée quotidienne. Pour ce faire, je n'ai changé que
Destination_DataSet_Table Code> sur
My_DataSet.MY_TABLE $ $ 20180524 CODE>. J'ai eu l'erreur ci-dessous lors de l'exécution du
bq_task code>: p>
Query_Params code> dans
bigqueryperator code>
Mais je n'ai trouvé aucun exemple sur la manière d'utiliser ce paramètre. p>
Google-Cloud == 0.27.0 Code> Python Client ... et c'est le un utilisé dans Prod: ( P> P>
4 Réponses :
Vous avez d'abord besoin de créer une table de destination partitionnée vide. Suivez les instructions ici: Lien pour créer une table partitionnée vide
puis courez à nouveau sous le pipeline de flux d'air. Vous pouvez essayer Code: P>
import datetime from airflow import DAG from airflow.contrib.operators.bigquery_operator import BigQueryOperator today_date = datetime.datetime.now().strftime("%Y%m%d") table_name = 'my_dataset.my_table' + '$' + today_date with DAG(dag_id='my_dags.my_dag') as dag: start = DummyOperator(task_id='start') end = DummyOperator(task_id='end') sql = """ SELECT * FROM 'another_dataset.another_table' """ bq_query = BigQueryOperator(bql=sql, destination_dataset_table={{ params.t_name }}), task_id='bq_query', bigquery_conn_id='my_bq_connection', use_legacy_sql=False, write_disposition='WRITE_TRUNCATE', create_disposition='CREATE_IF_NEEDED', query_params={'t_name': table_name}, dag=dag ) start >> bq_query >> end
Je ne reçois plus l'erreur: la tâche excitée avec le code de retour 0 code>. Mais, la table des résultats n'est pas créée, je ne le vois pas à BigQuery. Bizarre
Vous devez d'abord créer une table de destination partitionnée vide. Suivez les instructions ici: https://cloud.google.com/bigquery/docs/Creatting-Column-Parti tions # Création_an_em pty_partitioned_tabl e_with_a_schema_defi Nition code> Pour créer une table partitionnée vide
ou vous pouvez utiliser bigquerycreateempleratrice () code> pour créer une table vide avec partition par paramètre
TIME_PARTITION CODE> Paramètre
La chose est que je ne veux pas spécifier le schéma. Je ne pense pas qu'u bigquerycreateempleratrice () code> sans aucun schéma ne soit étouffé. Comme dans l'interface utilisateur de la bigquery, la création d'une table cloisonnée quotidienne vide et sans aucun champ entraîne une erreur
Schema Code> Dans
BigQuéryCreateempleTyTablePerator () Code> est un paramètre facultatif. Check Ce , nous avons construit cette Opérateur en gardant à l'esprit que nous ne voulons pas spécifier de schéma. Si vous cochez la chaîne DOC pour
BigQuéryCreateemptytableToutigeratrice Code> Classe ->
crée une nouvelle table vide dans le jeu de données de bigquery spécifié, éventuellement avec le schéma. Code>
Exemple: time_partitioning = {'Type': "Jour", "Champ": "Test_field", 'ExpirationMS': 1000} code>
Cette fonctionnalité n'a pas été fusionnée dans la version précédente, j'ai demandé à mon ami de vérifier s'il peut fusionner cela dans le flux d'air actuel. Bonnes nouvelles, il a fusionné la fonctionnalité de la version actuelle. Tout d'abord, vous devez installer la version actuelle puis exécuter l'exemple ci-dessus.
Merci @gruby, la version que j'utilise est 1.9.0. Je pense que c'est la dernière version stable (selon le Repo ). Comment mettre à jour ma version pour obtenir les dernières fonctionnalités?
Ceci est cher si vous avez beaucoup de données. Pourquoi ne pas utiliser l'opérateur de copie libre de copie à la place? Github.com/Apache/incubator-airflow/ BLOB / MASTER / AIRFLOW / CONT NIB / ...
J'ai le même doute, mais c'est pour BigQuéryCreateDeemperactertabilitor. Lorsque nous créons une table à l'aide de l'opérateur. Nous devons spécifier TABLE_ID = 'EMP_7 dans le paramètre qui est le nom de la table.My question est si la table existe déjà, il ne devrait pas créer de table, si la table n'existe pas, alors elle devrait être créée. Comment pouvons-nous faire cela ??
Le problème principal ici est que je n'ai pas accès à la nouvelle version de Google Cloud Python API, le produit utilise la version 0.27.0 . Donc, pour faire le travail, j'ai fait quelque chose de mauvais et de sale: p>
table_sharded code> li>
- obtenu
table_shardled code> s schéma, que ce soit table_schema code> li>
- Sauvé
"SELECT * à partir de DataSet.table_sharded" Code> La requête à une table partitionnée fournissant table_schema code> li>
ul>
Tout cela est extrait dans un seul opérateur qui utilise un crochet. Le crochet est responsable de la création / de la suppression de tableaux / partitions, à obtenir le schéma de table et à exécuter des requêtes sur BigQuery. P>
Regardez le code . S'il y a une autre solution, merci de me le faire savoir. p>
Pouvez-vous utiliser une commande (gratuit) de table de copie BQ au lieu de la requête?
Utilisation de BigQuéryoperator Vous pouvez transmettre le paramètre Time_Partitioning qui créera des tables partitionnées sur l'ingestion
bq_cmd = BigQueryOperator ( task_id= "task_id", sql= [query], destination_dataset_table= destination_tbl, use_legacy_sql= False, write_disposition= 'WRITE_TRUNCATE', time_partitioning= {'time_partitioning_type':'DAY'}, allow_large_results= True, trigger_rule= 'all_success', query_params= query_params, dag= dag )
from datetime import datetime,timedelta from airflow import DAG from airflow.models import Variable from airflow.contrib.operators.bigquery_operator import BigQueryOperator from airflow.operators.dummy_operator import DummyOperator DEFAULT_DAG_ARGS = { 'owner': 'airflow', 'depends_on_past': False, 'retries': 2, 'retry_delay': timedelta(minutes=10), 'project_id': Variable.get('gcp_project'), 'zone': Variable.get('gce_zone'), 'region': Variable.get('gce_region'), 'location': Variable.get('gce_zone'), } with DAG( 'test', start_date=datetime(2019, 1, 1), schedule_interval=None, catchup=False, default_args=DEFAULT_DAG_ARGS) as dag: bq_query = BigQueryOperator( task_id='create-partition', bql="""SELECT * FROM `dataset.table_name`""", -- table from which you want to pull data destination_dataset_table='project.dataset.table_name' + '$' + datetime.now().strftime('%Y%m%d'), -- Auto partitioned table in Bq write_disposition='WRITE_TRUNCATE', create_disposition='CREATE_IF_NEEDED', use_legacy_sql=False, ) I recommend to use Variable in Airflow and create all fields and use in DAG. By above code, partition will be added in Bigquery table for Todays date.
Ne
Créer une table ... Partition par ... comme sélectionnez ... Code> Travail?
@Elliottbrossard Je ne pense pas que cela va travailler, car le Dag va être exécuté chaque jour. Avec
Créer ... code> La table sera créée après chaque exécution. Je veux juste créer une nouvelle partition, pas la table entière.