1
votes

Joindre 2 DataFrame basé sur la recherche dans une colonne de collections - Spark, Scala

J'ai 2 dataframes comme ci-dessous,

val x = Seq((Seq(4,5),"XXX"),(Seq(7),"XYX")).toDF("X","NAME")

val y = Seq((5)).toDF("Y")

Je veux joindre les deux dataframes en recherchant la valeur de y et en recherchant le Seq / Array dans x.select ("X") s'il existe, puis joignez la ligne complète avec y

Comment puis-je réaliser que c'est Spark?

Cheers! p >


0 commentaires

3 Réponses :


2
votes

Spark 2.4.3 vous pouvez utiliser des fonctions d'ordre supérieur spark

scala> val x = Seq((Seq(4,5),"XXX"),(Seq(7),"XYX")).toDF("X","NAME")
scala> val y = Seq((5)).toDF("Y")

scala> x.join(y,expr("array_contains(X, y)"),"left").show
+------+----+----+
|     X|NAME|   Y|
+------+----+----+
|[4, 5]| XXX|   5|
|   [7]| XYX|null|
+------+----+----+

Veuillez confirmer que c'est ce que vous voulez réaliser?


2 commentaires

@mani_nz est-ce ce que vous voulez réaliser si oui alors veuillez accepter la réponse sinon laissez-moi savoir quelle est votre sortie souhaitée


+1 pour la solution. Mon application est sur 2.4.0 et elle échoue. Existe-t-il un moyen de le faire d'une autre manière pour que cela fonctionne dans la version 2.4.0? Acclamations!



2
votes

Vous pouvez utiliser un UDF pour la jointure, fonctionne pour toutes les versions de Spark:

val array_contains = udf((arr:Seq[Int],element:Int) => arr.contains(element))

x
 .join(y, array_contains($"X",$"Y"),"left")
 .show()


0 commentaires

1
votes

Une autre approche que vous pouvez utiliser consiste à exploser votre tableau en lignes avec la nouvelle colonne temporaire. Si vous exécutez le code suivant:

x.withColumn("temp", explode('X))
  .distinct()
  .join(y, 'temp === 'Y, "left")
  .drop('temp)

il afficherait:

x.withColumn("temp", explode('X))
  .join(y, 'temp === 'Y)
  .drop('temp)

Comme vous pouvez le voir, vous pouvez maintenant simplement rejoindre en utilisant colonnes temp et Y (puis supprimez temp):

+------+----+----+
|     X|NAME|temp|
+------+----+----+
|[4, 5]| XXX|   4|
|[4, 5]| XXX|   5|
|   [7]| XYX|   7|
+------+----+----+

Cela peut échouer en créant dupliquer les lignes si X contient des doublons. Dans ce cas, vous devrez également appeler distinct:

x.withColumn("temp", explode('X)).show()

Étant donné que cette approche utilise des méthodes natives Spark, elle sera un peu plus rapide que celui utilisant UDF, mais sans doute moins élégant.


1 commentaires

Merci! mais udf est élégant pour mon cas d'utilisation et ce n'est pas beaucoup de données qui auraient besoin d'améliorations de perf, donc je vais avec la réponse @Raphael Roth! Apprécier ton aide!