2
votes

Jointure croisée entre deux grands ensembles de données dans Spark

J'ai 2 grands ensembles de données. Le premier ensemble de données contient environ 130 millions d'entrées.
Le deuxième ensemble de données contient environ 40000 entrées. Les données sont extraites des tables MySQL.

J'ai besoin de faire une jointure croisée mais j'obtiens

val df1 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table1,"id",100,100000,40, MySqlConnection.getConnectionProperties))
val df2 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table2, MySqlConnection.getConnectionProperties))
val df2Cache = df2.repartition(40).cache()
val crossProduct = df1.join(df2Cache)

Quelle est la meilleure technique optimale pour faire cela dans Scala?

Voici un extrait de mon code:

java.sql.SQLException: GC overhead limit exceeded

df1 est le plus grand ensemble de données et df2 est le plus petit.


5 commentaires

veuillez ajouter votre code avec des exemples qui produisent l'erreur. il sera très difficile de répondre aux questions sans voir le code et les scénarios.


Considérez si vous avez vraiment besoin de faire un crossJoin . Il n'y a pas de technique optimale. Les produits cartésiens sont chers.


Attendez-vous à 0,52 billion d'enregistrements


Le nombre d'enregistrements serait sans aucun doute élevé. Mais comment le partitionnement des données devrait-il idéalement être effectué? Je suis assez nouveau sur Spark. J'ai placé 40 comme nombre compte tenu du fait que j'ai 10 exécuteurs et 4 cœurs pour chaque exécuteur.


besoin de votre trace de pile d'erreurs et de soumission d'étincelles complète


3 Réponses :


4
votes

130M * 40K = 52 trillions d'enregistrements, c'est 52 téraoctets de mémoire requise pour stocker ces données, et c'est si nous supposons que chaque enregistrement fait 1 octet, ce qui n'est certainement pas vrai. S'il fait jusqu'à 64 octets (ce qui, je pense, est également une estimation très prudente), vous auriez besoin de 3,32 pétaoctets (!) De mémoire juste pour stocker les données. C'est une très grande quantité, donc à moins que vous n'ayez un très grand cluster et un réseau très rapide à l'intérieur de ce cluster, vous voudrez peut-être repenser votre algorithme pour le faire fonctionner.

Cela étant dit, lorsque vous effectuez une jointure de deux ensembles de données / dataframes SQL, le nombre de partitions que Spark utiliserait pour stocker le résultat de la jointure est contrôlé par l'étincelle . sql.shuffle.partitions (voir ici ). Vous voudrez peut-être le définir sur un très grand nombre et définir le nombre d'exécuteurs sur le plus grand nombre possible. Ensuite, vous pourrez peut-être exécuter votre traitement jusqu'à la fin.

En outre, vous voudrez peut-être consulter le option spark.shuffle.minNumPartitionsToHighlyCompress ; si vous le définissez sur moins que votre nombre de partitions de lecture aléatoire, vous pourriez obtenir une autre augmentation de mémoire. Notez que cette option était une constante codée en dur définie sur 2000 jusqu'à une version récente de Spark, donc en fonction de votre environnement, vous devrez simplement définir spark.sql.shuffle.partitions sur un nombre supérieur à 2000 pour faire utilisation de celui-ci.


0 commentaires

1
votes

D'accord avec Vladimir, j'ai pensé à ajouter plus de points.

voir MapStatus définir spark.sql.shuffle.partitions sur 2001 () (la valeur par défaut est 200).

nouvelle approche ( spark.shuffle.minNumPartitionsToHighlyCompress ) comme Vladimir l'a mentionné dans la réponse.

Pourquoi ce changement? : MapStatus a 2000 SPARK-24519 codés en dur em>

il appliquera un algorithme différent pour traiter

--conf spark.yarn.executor.memoryOverhead=<10% of executor memory>  -- conf spark.shuffle.compress=true --conf spark.shuffle.spill.compress=true 

HighlyCompressedMapStatus :

Une implémentation MapStatus qui stocke la taille exacte de blocs, qui sont plus grands que spark.shuffle.accurateBlockThreshold. Il stocke la taille moyenne des autres blocs non vides, plus une image bitmap pour savoir quels blocs sont vides.

spark.shuffle.accurateBlockThreshold - voir ici : Lorsque nous compressons la taille des blocs de lecture aléatoire dans HighlyCompressedMapStatus , nous enregistrerons la taille avec précision si elle est au-dessus de cette configuration . Cela permet d'éviter la MOO en évitant de sous-estimer la taille des blocs de lecture aléatoire lors de la récupération des blocs de lecture aléatoire.


CompressedMapStatus :

Une implémentation MapStatus qui suit la taille de chaque bloc. Taille pour chaque bloc est représenté en utilisant un seul octet.

Également défini sur votre spark-submit

def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }

dans les deux cas, la compression utilisera spark.io.compression .codec

Conclusion : les tâches volumineuses doivent utiliser HighlyCompressedMapStatus et la surcharge de la mémoire de l'exécuteur peut représenter 10% de la mémoire de votre exécuteur.

De plus, jetez un œil à réglage de la mémoire des étincelles


0 commentaires

0
votes

Augmentez SPARK_EXECUTOR_MEMORY à une valeur plus élevée et répartissez-le sur plus de partitions


0 commentaires