J'ai le code de jouet suivant:
import pandas as pd from google.cloud import bigquery_storage_v1beta1 import os import google.auth os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json' credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"]) bq_storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials) table_ref = bigquery_storage_v1beta1.types.TableReference() table_ref.project_id = "bigquery-public-data" table_ref.dataset_id = "libraries_io" table_ref.table_id = "versions" parent = "projects/{}".format(your_project_id) session = client.create_read_session( table_ref, parent, format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED), ) reader1 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0]), timeout=100000) reader2 = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[1]), timeout=100000) df = pd.concat([reader1.to_dataframe(session),reader2.to_dataframe(session)]) df
J'ai utilisé BALANCED ShardingStrategy pour lancer plus d'un flux qui peut être lu indépendamment.
La documentation de BigqueryStorage dit:
Cependant, si vous souhaitez déployer plusieurs lecteurs, vous pouvez le faire en avoir un lecteur pour traiter chaque flux individuel.
J'ai lancé deux lecteurs, un pour chacun des flux de la session. Ensuite, deux dataframes (1 créée à partir de chaque lecteur) sont concaténées en une seule. Cependant, cette approche ne donne aucune vitesse par rapport à la LIQUID ShardingStrategy.
J'essaye de faire en sorte que les deux lecteurs lisent les lignes en parallèle. Cependant, je n'ai rien trouvé sur la lecture de flux parallèles dans la documentation de la bibliothèque.
Les questions sont les suivantes:
1) BugQuery Storage fournit-il un moyen natif pour lire plusieurs flux simultanément si BALANCED ShardingStrategy est choisi?
2) Quel est le meilleur façon de lire le flux en parallèle? Dois-je utiliser le multitraitement ou asyncio pour cela?
3) J'apprécierais que quelqu'un puisse fournir un exemple de base sur le reding de flux parallèles
3 Réponses :
API BigQuery Storage prend en charge plusieurs flux, mais votre méthode d'exécution le prend ne pas. Vous pouvez créer plusieurs instances de lecteurs, puis chacun d'eux peut utiliser des flux individuels pour augmenter le débit.
Vous avez de nombreuses options pour effectuer un Traitement parallèle en python . Cependant, le plus simple à utiliser est le package multiprocesseur .
Une autre option consiste à utiliser Apache Beam < / a> qui prend en charge le traitement parallèle par défaut mais peut ne pas convenir à votre cas d'utilisation. Il dispose d'un pilote BigQuery IO intégré, mais sa version python ne prend pas encore en charge l'API BigQuery Storage, vous devrez donc peut-être écrire votre propre implémentation pour l'API BQ Storage.
'mais votre méthode d'exécution ne le fait pas' pourriez-vous s'il vous plaît élaborer là-dessus? J'ai créé deux lecteurs et j'ai deux flux, chaque flux individuel est utilisé par 1 lecteur. Qu'est-ce que je fais mal? Que faut-il changer pour utiliser plusieurs flux?
Vous lisez en série, reader2 ne sera traité qu'après le traitement de reader1. Cela s'appelle une exécution ligne par ligne. Ce que vous voulez, c'est déclencher des processus individuels pour reader1 et reader2 et les laisser traiter en parallèle.
@KunalDeo Pouvez-vous s'il vous plaît fournir plus d'informations sur le pilote personnalisé de faisceau? Je suppose que l'un des problèmes liés à la mise en œuvre d'une telle chose dans beam, est que par défaut et sur la base de l'environnement d'exécution parallèle de beam, nous ne pouvons pas fragmenter une énorme opération de lecture en plus petites parties (car nous ne savons pas combien de travailleurs fonctionneront dans prd). Ensuite, la chose devient un faisceau exécutant beaucoup de sessions de lecture flux unique en parallèle, chacune faisant individuellement la même chose, ne complétant pas réellement les parties manquantes de l'autre. Si vous avez d'autres idées, liens ou quoi que ce soit, c'est très apprécié si vous pouviez partager! :) Merci
J'ai fait quelques recherches et je me suis rendu compte que vous utilisiez du code de l'API BigQuery Storage, et vous avez raison, la stratégie équilibrée est utilisée si vous consommez plusieurs flux, il faut mentionner qu'elle est toujours active version bêta.
Une des raisons pour lesquelles cela se produit est que vous ne voyez peut-être qu'un seul flux car les données sont relativement "petites" pour l'algorithme d'allocation de flux, le nombre de flux peut être inférieur au nombre demandé en fonction de 2 facteurs: a parallélisme raisonnable de la table et de la limite du service. Actuellement, les détails de l'algorithme permettant de déterminer ce qui est «raisonnable» ne sont pas accessibles au public et peuvent changer une fois que l'API atteint la phase de disponibilité générale.
Vous pouvez également essayer le package multiprocesseur qui a été recommandé ci-dessus.
Il vous manque la valeur required_streams
:
readers = [] for i in range(n_streams): stream = session.streams[i] position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) reader = bqstorageclient.read_rows(position) readers.append(reader) df = pd.concat([reader.to_dataframe(session) for reader in readers])
Vous pouvez concaténer les dataframes sur une seule ligne:
n_streams = 2 session = client.create_read_session( table_ref, parent, requested_streams=n_streams, format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED), )
J'espère que cela vous aidera.