3
votes

Problème de jointure Spark Dataframe

L'extrait de code ci-dessous fonctionne très bien. (Lisez CSV, Lisez Parquet et rejoignez-vous)

//Reading from Hbase -- getting three columns: Number of records: 1
 df1=read from Hbase code

**df1.persist(StorageLevel.MEMORY_AND_DISK)
val cnt = df1.count()**

df2=spark.read.parquet(inputFilePath)

//Join with Another table : Number of records: 50 Million, total 
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1")  "right")

Il a été dit que l'extrait de code ci-dessous ne fonctionne pas. (Lisez Hbase, Lisez Parquet et rejoignez-vous) (La différence est la lecture de Hbase)

//Reading from Hbase (It read from hbase properly -- getting three columns: Number of records: 1
 df1=read from Hbase code
 // It read from Hbase properly and able to show one record.
 df1.show

df2=spark.read.parquet(inputFilePath)

//Join with Another table : Number of records: 50 Million, total 
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1")  "right")

Erreur : causée par: org .apache.spark.SparkException: Job abandonné en raison d'un échec d'étape: La taille totale des résultats sérialisés de 56 tâches (1024,4 Mo) est plus grande que spark.driver.maxResultSize (1024,0 Mo)

Ensuite, j'ai ajouté spark .driver.maxResultSize = 5g, puis une autre erreur a commencé à se produire, erreur d'espace Java Heap (exécutée sur ThreadPoolExecutor.java). Si j'observe l'utilisation de la mémoire dans Manager, je vois que l'utilisation continue d'augmenter jusqu'à ce qu'elle atteigne ~ 50 Go, auquel point l'erreur OOM se produit. Donc, quelle qu'en soit la raison, la quantité de RAM utilisée pour effectuer cette opération est environ 10 fois supérieure à la taille du RDD que j'essaie d'utiliser.

Si je persiste df1 dans la mémoire et le disque et que je fais un compter(). Le programme fonctionne très bien. L'extrait de code est ci-dessous

//Reading csv file -- getting three columns: Number of records: 1
 df1=spark.read.format("csv").load(filePath) 

df2=spark.read.parquet(inputFilePath)

//Join with Another table : Number of records: 30 Million, total 
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1")  "right")

Il fonctionne avec un fichier même s'il contient les mêmes données mais pas avec Hbase. L'exécution sur un cluster de 100 nœuds de travail avec 125 Go de mémoire sur chacun. La mémoire n'est donc pas le problème.

Ma question ici est à la fois le fichier et Hbase a les mêmes données et à la fois lu et capable d'afficher () les données. Mais pourquoi seule l'Hbase échoue. J'ai du mal à comprendre ce qui pourrait ne pas aller avec ce code. Toutes les suggestions seront appréciées.


0 commentaires

3 Réponses :


2
votes

Lorsque les données sont extraites, Spark n'est pas conscient du nombre de lignes qui sont récupérées à partir de HBase, par conséquent, la stratégie choisie serait de trier la jointure par fusion. / p>

pour éviter le problème, nous pouvons utiliser la jointure de diffusion en même temps que nous n'avons pas l'habitude de trier et de mélanger les données à partir de df2 en utilisant la colonne clé, qui montre la dernière instruction de votre extrait de code.

cependant, pour contourner cela (puisqu'il ne s'agit que d'une ligne), nous pouvons utiliser l'expression Case pour les colonnes à remplir.

exemple:

df.withColumn(
"newCol"
,when(col("df2col1").eq(lit(hbaseKey))
    ,lit(hbaseValueCol1))
 .otherwise(lit(null))


1 commentaires

Merci d'avoir répondu. Juste une info. La table hbase peut augmenter à l'avenir mais pas plus de 100 lignes. Je n'ai pas pu suivre la hbaseKey. Pouvez-vous s'il vous plaît expliquer un peu plus l'expression de cas que vous mentionnez.



2
votes

J'ai aussi parfois du mal avec cette erreur. Cela se produit souvent lorsque Spark tente de diffuser une grande table pendant une jointure (cela se produit lorsque l'optimiseur de Spark sous-estime la taille de la table ou que les statistiques ne sont pas correctes). Comme il n'y a aucun indice pour forcer la jointure par tri-fusion ( Comment indiquer une jointure de tri par fusion ou une jointure par hachage aléatoire (et ignorer la jointure par hachage de diffusion)? ), la seule option est de désactiver les jointures de diffusion en définissant spark.sql.autoBroadcastJoinThreshold = -1


0 commentaires

0
votes

Lorsque j'ai des problèmes de mémoire lors d'une jointure, cela signifie généralement l'une des deux raisons suivantes:

  1. Vous avez trop peu de partitions dans les dataframes (les partitions sont trop grandes)
  2. Il existe de nombreux doublons dans les deux dataframes de la clé sur laquelle vous joignez, et la jointure explose votre mémoire.

Annonce 1. Je pense que vous devriez regarder le nombre de partitions que vous avez dans chaque table avant de rejoindre. Lorsque Spark lit un fichier il ne garde pas forcément le même nombre de partitions que la table d'origine (parquet, csv ou autre). La lecture à partir de csv vs la lecture à partir de HBase peut créer un nombre différent de partitions et c'est pourquoi vous constatez des différences de performances. Des partitions trop grandes deviennent encore plus grandes après la jointure et cela crée un problème de mémoire. Jetez un œil à la mémoire d'exécution maximale par tâche dans Spark UI. Cela vous donnera une idée de votre utilisation de la mémoire par tâche. J'ai trouvé préférable de le maintenir en dessous de 1 Go.

Solution: repartitionnez vos tables avant la jointure.

Annonce. 2 Ce n'est peut-être pas le cas ici, mais cela vaut la peine d'être vérifié.


1 commentaires

Pourquoi le nombre de partitions devrait-il être important dans une jointure de diffusion? Et qu'est-ce que cela devrait avoir à voir avec la mémoire du pilote?