2
votes

Classement par ligne sur plusieurs colonnes dans Spark Dataframe

J'utilise spark avec Scala pour transformer un Dataframe, où je voudrais calculer une nouvelle variable qui calcule le rang d'une variable par ligne dans de nombreuses variables.

Exemple -

Input DF-

+---+---+---+
|c_0|c_1|c_2|
+---+---+---+
| 11| 11| 35|
| 22| 12| 66|
| 44| 22| 12|
+---+---+---+

Expected DF-

+---+---+---+--------+--------+--------+
|c_0|c_1|c_2|c_0_rank|c_1_rank|c_2_rank|
+---+---+---+--------+--------+--------+
| 11| 11| 35|        2|        3|        1|
| 22| 12| 66|       2|       3|       1|
| 44| 22| 12|       1|       2|       3|
+---+---+---+--------+--------+--------+




3 commentaires

Qu'est-ce que tu as essayé jusque-là?


J'ai essayé de créer une nouvelle colonne de type tableau avec tous les éléments, puis d'essayer de la mapper et d'utiliser zipwithindex après avoir trié le tableau pour obtenir l'index. Mais après avoir utilisé une carte sur un df, je suis bloqué et incapable d'utiliser withcolumn pour générer les trois colonnes de rang.


Pourriez-vous s'il vous plaît ajouter plus de détails? Quel est le rang que vous devez calculer?


3 Réponses :


0
votes

Une façon de procéder serait d'utiliser Windows.

(0 to 2)
    .map("c_"+_)
    .foldLeft(df)((d, column) => 
        zipWithIndex(d.orderBy(desc(column)), column+"_rank"))
    .show

Mais ce n'est pas une bonne idée. Toutes les données se retrouveront dans une partition, ce qui provoquera une erreur MOO si toutes les données ne rentrent pas dans un exécuteur.

Une autre méthode nécessiterait de trier la trame de données trois fois, mais au moins cela mettrait à l'échelle à n'importe quelle taille de données.

Définissons une fonction qui zippe une dataframe avec des index consécutifs (elle existe pour les RDD mais pas pour les dataframes)

def zipWithIndex(df : DataFrame, name : String) : DataFrame = {
    val rdd = df.rdd.zipWithIndex
      .map{ case (row, i) => Row.fromSeq(row.toSeq :+ (i+1)) }
    val newSchema = df.schema.add(StructField(name, LongType, false))
    df.sparkSession.createDataFrame(rdd, newSchema)
}

Et utilisons-le sur le même dataframe df:

val df = Seq((11,  21,  35),(22,  12, 66),(44, 22 , 12))
    .toDF("c_0", "c_1", "c_2")
(0 to 2)
    .map("c_"+_)
    .foldLeft(df)((d, column) => 
          d.withColumn(column+"_rank", rank() over Window.orderBy(desc(column))))
    .show
+---+---+---+--------+--------+--------+                                        
|c_0|c_1|c_2|c_0_rank|c_1_rank|c_2_rank|
+---+---+---+--------+--------+--------+
| 22| 12| 66|       2|       3|       1|
| 11| 21| 35|       3|       2|       2|
| 44| 22| 12|       1|       1|       3|
+---+---+---+--------+--------+--------+

qui fournit exactement le même résultat que ci-dessus.


5 commentaires

J'ai une autre suggestion pour utiliser la classe de cas. Cela aidera à ne pas le convertir en RDD et à utiliser directement le Dataset Spark.


merci pour la réponse rapide, mais la sortie ne correspond pas à la sortie attendue. J'attends le rang le plus bas pour la valeur la plus élevée ..eg - c_0_rank devrait être 1 pour c_0 ayant la valeur 44 dans la première ligne et non 3.


Bon, j'ai raté le fait qu'il était trié par ordre décroissant. J'ai édité ma réponse. C'est réparé.


la première ligne est correcte, pour la deuxième et la troisième la sortie est toujours incorrecte. Veuillez vérifier le DF attendu dans la question que je l'ai formatée pour une meilleure clarté.


Comme je n'avais pas compris votre question, j'ai posté une nouvelle réponse.



0
votes

Vous pourriez probablement créer une fonction de fenêtre. Notez que cela est sensible au MOO si vous avez trop de données. Mais, je voulais juste présenter le concept des fonctions de fenêtre ici.

inputDF.createOrReplaceTempView("my_df")
val expectedDF =  spark.sql("""
    select 
        c_0
        , c_1
        , c_2
        , rank(c_0) over (order by c_0 desc) c_0_rank
        , rank(c_1) over (order by c_1 desc) c_1_rank
        , rank(c_2) over (order by c_2 desc) c_2_rank 
    from my_df""")
expectedDF.show()

+---+---+---+--------+--------+--------+
|c_0|c_1|c_2|c_0_rank|c_1_rank|c_2_rank|
+---+---+---+--------+--------+--------+
| 44| 22| 12|       3|       3|       1|
| 11| 21| 35|       1|       2|       2|
| 22| 12| 66|       2|       1|       3|
+---+---+---+--------+--------+--------+


3 commentaires

la sortie souhaitée doit être: c_0, c_1, c_2, c_0_rank, c_1_rank, c_2_rank 44,22,12,1,2,3 11,21,35,3,2,1 22,12,66,2,3 , 1 numéro le plus élevé ayant le rang le plus bas où la colonne c_n_rank spécifie le rang correspondant pour la valeur de la colonne c_n.


Modifié pour le classement par rang desc


veuillez vérifier le DF attendu en question, l'avez édité pour plus de clarté



1
votes

Si je comprends bien, vous voulez avoir le rang de chaque colonne, dans chaque ligne.

Définissons d'abord les données, et les colonnes à "classer".

val colMap = df.columns.zipWithIndex.map(_.swap).toMap
val zip = udf((s: Seq[Int]) => s
    .zipWithIndex
    .sortBy(-_._1)
    .map(_._2)
    .zipWithIndex
    .toMap
    .mapValues(_+1))
val ranks = (0 until cols.size)
    .map(i => 'zip.getItem(i) as colMap(i) + "_rank")
val result = df
    .withColumn("zip", zip(array(cols.map(col) : _*)))
    .select(cols.map(col) ++ ranks :_*)


7 commentaires

Cela fonctionne, je l'ai accepté, je me demande simplement si l'utilisation de udf aurait un impact sur les performances s'il était utilisé sur un grand ensemble de données en raison de sa nature de ser-deser, également que pensez-vous du parallélisme, ne serait-ce pas sujet à MOO.


Ce code n'est pas du tout sujet au MOO. Il s'agit d'un simple calcul par ligne qui sera parfaitement réparti. Pas besoin de regrouper les lignes de quelque manière que ce soit, et rien sur le pilote pour que vous soyez en sécurité. Aussi parce que vous avez posé des questions sur l'UDF, j'ai vérifié et à partir de Spark 2.4, une fonction intégrée peut remplacer mon UDF (j'ai édité ma réponse pour la mentionner). Pourtant, même en utilisant l'UDF, je ne pense pas que les performances en souffriraient autant. Si vous essayez les deux, faites-le nous savoir ;-)


Merci Oli, vérifiera cela et reviendra avec les résultats de performance.


Je suppose que cela me renverrait le même index (rang dans ce cas) si les valeurs des éléments sont les mêmes. Je veux qu'ils s'incrémentent. (11,11,35 devrait retourner 2,3,1) Dois-je ajouter de la logique dans udf ou croire qu'il existe un moyen plus simple.


Avec cette méthode, il ne peut y avoir aucun index dupliqué. En effet, un élément ne peut avoir qu'un seul index dans un tableau. BTW, si vous utilisez Spark 2.4, vous n'avez même pas besoin d'un UDF;)


Je suis d'accord, le problème est quand vous faites un getindex pour un élément en double. Il renvoie l'index du premier élément. Si vous exécutez le code avec 11,11,35, il retourne 2,2,1 alors que 2,3,1bis attendu car le deuxième 11 devrait obtenir un rang incrémenté de 3 s'il a été répété plus tôt.


avez édité la question, veuillez jeter un œil. Merci