0
votes

Spark Rejoignez les dataframes et les colonnes de mise à jour conditionnellement

salut j'ai 2 dataframes d'étincelles.
Le premier: xxx

et second one: xxx

partout pays, latitude, longitude, last_update et uid, le fond DF pourrait avoir différentes colonnes ajoutées.
L'idée est de faire une jointure complète par UID, de mettre à jour les colonnes communes et de conserver les colonnes communes.
Comment pourrais-je accomplir cette tâche? Merci.


2 commentaires

Les colonnes communes que vous souhaitez conserver ce que vous voulez faire avec eux en cas de valeurs différentes?


Je veux mettre à jour les colonnes communes avec les valeurs de la table inférieure


3 Réponses :


1
votes

Voici le code (vous n'avez pas spécifié, essayons donc SCALA):

// Your dataframes
val upper = ...
val lower = ...

// Find out the columns
val sharedCols = upper.columns.toSet & lower.columns.toSet
val disjointCols = (upper.columns.toSet | lower.columns.toSet) -- sharedCols
val columns = (sharedCols.map(c => coalesce(lower.col(c), upper.col(c)).as(c)) ++ disjointCols.map(c => col(c))).toList

// Join and project    
val joined = upper.join(lower, upper.col("uid") === lower.col("uid"), "full_outer").select(columns:_*)
joined.show


0 commentaires

0
votes

Si comme vous l'avez dit dans les commentaires, vous voulez toujours em> les colonnes communes de la table inférieure. Vous pouvez faire une simple jointure à perdre des cloums communs à partir de DF1 avant la jointure.

joined_df = df1.drop("some_common_columns").join(df2,Seq("uid"))


0 commentaires

0
votes

J'ai trouvé cette solution, pour éviter le mélange en raison de la jointure.
Que pensez-vous?
Toute amélioration ou raccourcis scala que je peux utiliser? XXX PRE>

Après avoir défini la fonction ci-dessus, je fais: P>

      val upper_col = tableToUpdate.columns.toSet
      val bottom_col = miniJoin.columns.toSet
      val union_cols = tableToUpdate_col ++ miniJoin_col

          upper
            .select(func_union_name(tableToUpdate_col, union_cols): _*)
            .union(bottom.select(func_union_name(bottom_col, union_cols): _*))            
            .withColumn("max_lu",max(col("last_update"))
                                  .over(Window.partitionBy(col("uid"))))
            .filter(col("last_update").geq(col("max_lu")))
            .drop(col("max_lu"))


0 commentaires