6
votes

Flux d'air Bigqueryoperator: Comment sauver la requête Résultat dans une table partitionnée?

J'ai un simple dag xxx

lors de l'exécution de la tâche BQ_Query La requête SQL est enregistrée dans une table fronde. Je veux que cela soit sauvé dans une table cloisonnée quotidienne. Pour ce faire, je n'ai changé que Destination_DataSet_Table sur My_DataSet.MY_TABLE $ $ 20180524 . J'ai eu l'erreur ci-dessous lors de l'exécution du bq_task : xxx

Comment puis-je spécifier à BigQuery pour sauvegarder le résultat de la requête à une table partitionnée quotidienne? Ma première hypothèse a été d'utiliser Query_Params dans bigqueryperator Mais je n'ai trouvé aucun exemple sur la manière d'utiliser ce paramètre.

EDIT:

J'utilise Google-Cloud == 0.27.0 Python Client ... et c'est le un utilisé dans Prod: (


2 commentaires

Ne Créer une table ... Partition par ... comme sélectionnez ... Travail?


@Elliottbrossard Je ne pense pas que cela va travailler, car le Dag va être exécuté chaque jour. Avec Créer ... La table sera créée après chaque exécution. Je veux juste créer une nouvelle partition, pas la table entière.


4 Réponses :


8
votes

Vous avez d'abord besoin de créer une table de destination partitionnée vide. Suivez les instructions ici: Lien pour créer une table partitionnée vide

puis courez à nouveau sous le pipeline de flux d'air. Vous pouvez essayer Code: P>

import datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
today_date = datetime.datetime.now().strftime("%Y%m%d")
table_name = 'my_dataset.my_table' + '$' + today_date
with DAG(dag_id='my_dags.my_dag') as dag:
    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')
    sql = """
         SELECT *
         FROM 'another_dataset.another_table'
          """
    bq_query = BigQueryOperator(bql=sql,
                        destination_dataset_table={{ params.t_name }}),
                        task_id='bq_query',
                        bigquery_conn_id='my_bq_connection',
                        use_legacy_sql=False,
                        write_disposition='WRITE_TRUNCATE',
                        create_disposition='CREATE_IF_NEEDED',
                        query_params={'t_name': table_name},
                        dag=dag
                        )
start >> bq_query >> end


10 commentaires

Je ne reçois plus l'erreur: la tâche excitée avec le code de retour 0 . Mais, la table des résultats n'est pas créée, je ne le vois pas à BigQuery. Bizarre


Vous devez d'abord créer une table de destination partitionnée vide. Suivez les instructions ici: https://cloud.google.com/bigquery/docs/Creatting-Column-Parti tions # Création_an_em pty_partitioned_tabl e_with_a_schema_defi Nition Pour créer une table partitionnée vide


ou vous pouvez utiliser bigquerycreateempleratrice () pour créer une table vide avec partition par paramètre TIME_PARTITION Paramètre


La chose est que je ne veux pas spécifier le schéma. Je ne pense pas qu'u bigquerycreateempleratrice () sans aucun schéma ne soit étouffé. Comme dans l'interface utilisateur de la bigquery, la création d'une table cloisonnée quotidienne vide et sans aucun champ entraîne une erreur


Schema Dans BigQuéryCreateempleTyTablePerator () est un paramètre facultatif. Check Ce , nous avons construit cette Opérateur en gardant à l'esprit que nous ne voulons pas spécifier de schéma. Si vous cochez la chaîne DOC pour BigQuéryCreateemptytableToutigeratrice Classe -> crée une nouvelle table vide dans le jeu de données de bigquery spécifié, éventuellement avec le schéma.


Exemple: time_partitioning = {'Type': "Jour", "Champ": "Test_field", 'ExpirationMS': 1000}


Cette fonctionnalité n'a pas été fusionnée dans la version précédente, j'ai demandé à mon ami de vérifier s'il peut fusionner cela dans le flux d'air actuel. Bonnes nouvelles, il a fusionné la fonctionnalité de la version actuelle. Tout d'abord, vous devez installer la version actuelle puis exécuter l'exemple ci-dessus.


Merci @gruby, la version que j'utilise est 1.9.0. Je pense que c'est la dernière version stable (selon le Repo ). Comment mettre à jour ma version pour obtenir les dernières fonctionnalités?


Ceci est cher si vous avez beaucoup de données. Pourquoi ne pas utiliser l'opérateur de copie libre de copie à la place? Github.com/Apache/incubator-airflow/ BLOB / MASTER / AIRFLOW / CONT NIB / ...


J'ai le même doute, mais c'est pour BigQuéryCreateDeemperactertabilitor. Lorsque nous créons une table à l'aide de l'opérateur. Nous devons spécifier TABLE_ID = 'EMP_7 dans le paramètre qui est le nom de la table.My question est si la table existe déjà, il ne devrait pas créer de table, si la table n'existe pas, alors elle devrait être créée. Comment pouvons-nous faire cela ??



0
votes

Le problème principal ici est que je n'ai pas accès à la nouvelle version de Google Cloud Python API, le produit utilise la version 0.27.0 . Donc, pour faire le travail, j'ai fait quelque chose de mauvais et de sale:

  • Enregistré le résultat de la requête dans une table fronde, que ce soit table_sharded
  • obtenu table_shardled s schéma, que ce soit table_schema
  • Sauvé "SELECT * à partir de DataSet.table_sharded" La requête à une table partitionnée fournissant table_schema

    Tout cela est extrait dans un seul opérateur qui utilise un crochet. Le crochet est responsable de la création / de la suppression de tableaux / partitions, à obtenir le schéma de table et à exécuter des requêtes sur BigQuery.

    Regardez le code . S'il y a une autre solution, merci de me le faire savoir.


1 commentaires

Pouvez-vous utiliser une commande (gratuit) de table de copie BQ au lieu de la requête?



0
votes

Utilisation de BigQuéryoperator Vous pouvez transmettre le paramètre Time_Partitioning qui créera des tables partitionnées sur l'ingestion

bq_cmd = BigQueryOperator (
            task_id=                    "task_id",
            sql=                        [query],
            destination_dataset_table=  destination_tbl,
            use_legacy_sql=             False,
            write_disposition=          'WRITE_TRUNCATE',
            time_partitioning=          {'time_partitioning_type':'DAY'},
            allow_large_results=        True,
            trigger_rule=               'all_success',
            query_params=               query_params,
            dag=                        dag
        )


0 commentaires

0
votes
from datetime import datetime,timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dummy_operator import DummyOperator

DEFAULT_DAG_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=10),
    'project_id': Variable.get('gcp_project'),
    'zone': Variable.get('gce_zone'),
    'region': Variable.get('gce_region'),
    'location': Variable.get('gce_zone'),
}

with DAG(
    'test',
    start_date=datetime(2019, 1, 1),
    schedule_interval=None,
    catchup=False,
    default_args=DEFAULT_DAG_ARGS) as dag:

    bq_query = BigQueryOperator(
        task_id='create-partition',
        bql="""SELECT
                * 
                FROM
                `dataset.table_name`""",   -- table from which you want to pull data
        destination_dataset_table='project.dataset.table_name' + '$' + datetime.now().strftime('%Y%m%d'),             -- Auto partitioned table in Bq 
        write_disposition='WRITE_TRUNCATE',
        create_disposition='CREATE_IF_NEEDED',
        use_legacy_sql=False,
    )
I recommend to use Variable in Airflow and create all fields and use in DAG.
By above code, partition will be added in Bigquery table for Todays date.

0 commentaires