2
votes

Stockage Python BigQuery. Lecture de plusieurs flux en parallèle

J'ai le code de jouet suivant:

import pandas as pd
from google.cloud import bigquery_storage_v1beta1
import os
import google.auth

os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'
credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bq_storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "libraries_io"
table_ref.table_id = "versions"

parent = "projects/{}".format(your_project_id)
session = client.create_read_session(
    table_ref,
    parent,
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)

reader1 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0]), timeout=100000)
reader2 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[1]), timeout=100000)

df = pd.concat([reader1.to_dataframe(session),reader2.to_dataframe(session)])
df

J'ai utilisé BALANCED ShardingStrategy pour lancer plus d'un flux qui peut être lu indépendamment.

La documentation de BigqueryStorage dit:

Cependant, si vous souhaitez déployer plusieurs lecteurs, vous pouvez le faire en avoir un lecteur pour traiter chaque flux individuel.

J'ai lancé deux lecteurs, un pour chacun des flux de la session. Ensuite, deux dataframes (1 créée à partir de chaque lecteur) sont concaténées en une seule. Cependant, cette approche ne donne aucune vitesse par rapport à la LIQUID ShardingStrategy.

J'essaye de faire en sorte que les deux lecteurs lisent les lignes en parallèle. Cependant, je n'ai rien trouvé sur la lecture de flux parallèles dans la documentation de la bibliothèque.

Les questions sont les suivantes:

1) BugQuery Storage fournit-il un moyen natif pour lire plusieurs flux simultanément si BALANCED ShardingStrategy est choisi?

2) Quel est le meilleur façon de lire le flux en parallèle? Dois-je utiliser le multitraitement ou asyncio pour cela?

3) J'apprécierais que quelqu'un puisse fournir un exemple de base sur le reding de flux parallèles


0 commentaires

3 Réponses :


2
votes

API BigQuery Storage prend en charge plusieurs flux, mais votre méthode d'exécution le prend ne pas. Vous pouvez créer plusieurs instances de lecteurs, puis chacun d'eux peut utiliser des flux individuels pour augmenter le débit.

Vous avez de nombreuses options pour effectuer un Traitement parallèle en python . Cependant, le plus simple à utiliser est le package multiprocesseur .

Une autre option consiste à utiliser Apache Beam < / a> qui prend en charge le traitement parallèle par défaut mais peut ne pas convenir à votre cas d'utilisation. Il dispose d'un pilote BigQuery IO intégré, mais sa version python ne prend pas encore en charge l'API BigQuery Storage, vous devrez donc peut-être écrire votre propre implémentation pour l'API BQ Storage.


3 commentaires

'mais votre méthode d'exécution ne le fait pas' pourriez-vous s'il vous plaît élaborer là-dessus? J'ai créé deux lecteurs et j'ai deux flux, chaque flux individuel est utilisé par 1 lecteur. Qu'est-ce que je fais mal? Que faut-il changer pour utiliser plusieurs flux?


Vous lisez en série, reader2 ne sera traité qu'après le traitement de reader1. Cela s'appelle une exécution ligne par ligne. Ce que vous voulez, c'est déclencher des processus individuels pour reader1 et reader2 et les laisser traiter en parallèle.


@KunalDeo Pouvez-vous s'il vous plaît fournir plus d'informations sur le pilote personnalisé de faisceau? Je suppose que l'un des problèmes liés à la mise en œuvre d'une telle chose dans beam, est que par défaut et sur la base de l'environnement d'exécution parallèle de beam, nous ne pouvons pas fragmenter une énorme opération de lecture en plus petites parties (car nous ne savons pas combien de travailleurs fonctionneront dans prd). Ensuite, la chose devient un faisceau exécutant beaucoup de sessions de lecture flux unique en parallèle, chacune faisant individuellement la même chose, ne complétant pas réellement les parties manquantes de l'autre. Si vous avez d'autres idées, liens ou quoi que ce soit, c'est très apprécié si vous pouviez partager! :) Merci




1
votes

Il vous manque la valeur required_streams :

readers = []
for i in range(n_streams):
    stream = session.streams[i]
    position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
    reader = bqstorageclient.read_rows(position)
    readers.append(reader)
df = pd.concat([reader.to_dataframe(session) for reader in readers])

Vous pouvez concaténer les dataframes sur une seule ligne:

n_streams = 2
session = client.create_read_session(
    table_ref,
    parent,
    requested_streams=n_streams,
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)

J'espère que cela vous aidera.


0 commentaires