3
votes

Pyspark UDF pour renvoyer un résultat similaire à groupby (). Sum () entre deux colonnes

J'ai l'exemple de dataframe suivant

+---+----+--------------------+-------------+-------------------------+
| ID|name|               fruit|          qty|                   Result|
+---+----+--------------------+-------------+-------------------------+
|101|Mark|[apple, apple, or...|[16, 2, 3, 1]|[(apple,19), (orange,3)] |
+---+----+--------------------+-------------+-------------------------+

et je voudrais créer une autre colonne qui contient un résultat similaire à ce que j'obtiendrais avec un pandas groupby ('fruit') .sum ()

sum_cols = udf(lambda x: pd.DataFrame({'fruits': x[0], 'qty': x[1]}).groupby('fruits').sum())
spark_df.withColumn('Result', sum_cols(F.struct('fruit', 'qty'))).show()

Le résultat ci-dessus pourrait être stocké dans la nouvelle colonne sous n'importe quelle forme (soit une chaîne, un dictionnaire, une liste de tuples ...) .

J'ai essayé une approche similaire à la suivante qui ne fonctionne pas

        qty
fruits     
apple    19
orange    3

Un exemple de dataframe de résultat pourrait être

XXX

Avez-vous des suggestions sur la manière dont je pourrais y parvenir?

Merci

Modifier: sous Spark 2.4 .3


5 commentaires

Quelle est votre sortie souhaitée? Cela ne ressort pas clairement de la description, veuillez le montrer explicitement.


merci pour votre commentaire, c'est fait!


Quelle version d'étincelle? S'il s'agit de spark 2.4+, vous pouvez utiliser array_zip . Les anciennes versions rendent cela un peu plus difficile.


J'utilise la version 2.4.3. Pouvez-vous me fournir un exemple d'utilisation pour cela dans mon cas?


Dans mon expérience (limitée), j'ai vu du code pyspark "natif" fonctionner 10 fois plus vite que les UDF (et en particulier les UDAF), même lorsqu'un exploser était utilisé. Juste quelque chose à garder à l'esprit..


3 Réponses :


1
votes

Si vous avez une étincelle cette réponse ):

+---+--------------------------+
|ID |Result                    |
+---+--------------------------+
|101|[[orange, 3], [apple, 19]]|
+---+--------------------------+

Sortie:

df_aggregated.groupby('ID').agg(F.collect_list(F.struct(F.col('fruit'), F.col('qty'))).alias('Result')).show()

Ensuite, préparez le résultat souhaité. Trouvez d'abord le dataframe agrégé:

+---+------+---+
| ID| fruit|qty|
+---+------+---+
|101|orange|  3|
|101| apple| 19|
+---+------+---+

Résultat:

df_aggregated = df_split.groupby('ID', 'fruit').agg(F.sum('qty').alias('qty'))
df_aggregated.show()

Et enfin changez-le au format souhaité: p >

+---+----+------+---+
| ID|name| fruit|qty|
+---+----+------+---+
|101|Mark| apple| 16|
|101|Mark| apple|  2|
|101|Mark|orange|  3|
|101|Mark| apple|  1|
+---+----+------+---+

Résultat:

df_split = (spark_df.rdd.flatMap(lambda row: [(row.ID, row.name, f, q) for f, q in zip(row.fruit, row.qty)]).toDF(["ID", "name", "fruit", "qty"]))

df_split.show()


5 commentaires

udf a probablement de meilleures performances que exploser dans ce cas


Je ne sais pas, mais j'ai changé la réponse en RDD au lieu d'exploser.


J'essaye ça. Je pense que la distribution du code sur plusieurs lignes aiderait à la lisibilité.


udf serait presque sûrement (certainement?) meilleur que rdd aussi


Cela fonctionne également, mais la solution pault est beaucoup plus propre et plus facile à comprendre.



3
votes

Il peut y avoir un moyen sophistiqué de le faire en utilisant uniquement les fonctions API sur Spark 2.4+, peut-être avec une combinaison de arrays_zip et aggregate , mais je ne peux penser à aucun qui n'implique pas un exploser étape suivie d'un groupBy . Dans cet esprit, utiliser un udf peut en fait être mieux pour vous dans ce cas.

Je pense que créer un pandas DataFrame juste dans le but d'appeler .groupby (). sum () est exagéré. De plus, même si vous l'avez fait de cette façon, vous devrez convertir la sortie finale en une structure de données différente car un udf ne peut pas renvoyer un pandas DataFrame.

Voici un moyen avec un udf en utilisant collections.defaultdict:

from pyspark.sql.functions import array, col

spark_df.withColumn(
    "Result",
    sum_cols(array([col("fruit"), col("qty")]))
).show(truncate=False)
#+---+----+-----------------------------+-------------+--------------------------+
#|ID |name|fruit                        |qty          |Result                    |
#+---+----+-----------------------------+-------------+--------------------------+
#|101|Mark|[apple, apple, orange, apple]|[16, 2, 3, 1]|[[orange, 3], [apple, 19]]|
#+---+----+-----------------------------+-------------+--------------------------+


7 commentaires

J'aime ta solution pault, merci pour ton temps. Cependant, j'obtiens cette erreur Py4JJavaError: Une erreur s'est produite lors de l'appel de o3564.showString .... Causée par: net.razorvine.pickle.PickleException: aucun argument attendu pour la construction de ClassDict (pour numpy.dtype) .. ça vous dit quelque chose?


vient d'ajouter à partir de pyspark.sql.types import ArrayType, StructType, StringType, IntegerType, StructField


Essayez également de forcer le type int Python dans la sortie return [(k, int (sum (v))) pour k, v dans d.items ()] , de sorte que le résultat du udf sera certainement de type natif Python.


@RichardNemeth c'est un excellent point - ce qui me fait me demander: êtes-vous sûr d'utiliser __builtin __. Sum et non numpy.sum ou pyspark.sql.functions .sum ? Pourquoi vous ne devriez pas utiliser import * . Modifier Si vous obtenez un objet numpy, cela suggère que vous utilisez numpy.sum .


oui, la solution de Richard semblait résoudre le problème!


@crash J'ai mis à jour la réponse en utilisant un defaultdict de int au lieu d'une liste . de cette façon, vous pouvez faire la somme à l'intérieur de la boucle et éviter d'appeler sum à la fin. Cela devrait également résoudre votre problème!


C'est aussi plus propre! Merci beaucoup @pault, c'était extrêmement intéressant et formateur.



4
votes

Comme @pault l'a mentionné, à partir de Spark 2.4+ , vous pouvez utiliser la fonction intégrée Spark SQL pour gérer votre tâche, voici un moyen avec array_distinct + transform + aggregate

transform(sequence(0,size(fruit)-1), i -> IF(fruit[i] = x, qty[i], 0))

Configurer l'instruction SQL:

aggregate(array_T, 0, (y,z) -> y + z)

Explication :

  1. Utilisez array_distinct (fruit) pour trouver toutes les entrées distinctes dans le tableau fruit
  2. transformer ce nouveau tableau (avec l'élément x ) de x en (x, agréger (.. x ..)) li>
  3. la fonction ci-dessus agréger (.. x ..) prend la forme simple de la somme de tous les éléments de array_T

    stmt = '''
        transform(array_distinct(fruit), x -> (x, aggregate(
              transform(sequence(0,size(fruit)-1), i -> IF(fruit[i] = x, qty[i], 0))
            , 0
            , (y,z) -> int(y + z)
        ))) AS sum_fruit
    '''
    
    >>> spark_df.withColumn('sum_fruit', expr(stmt)).show(10,0)
    +---+-----+-----------------------------------------+----------------+----------------------------------------+
    |ID |name |fruit                                    |qty             |sum_fruit                               |
    +---+-----+-----------------------------------------+----------------+----------------------------------------+
    |101|Mark |[apple, apple, orange, apple]            |[16, 2, 3, 1]   |[[apple, 19], [orange, 3]]              |
    |102|Twin |[apple, banana, avocado, banana, avocado]|[5, 2, 11, 3, 1]|[[apple, 5], [banana, 5], [avocado, 12]]|
    |103|Smith|[avocado]                                |[10]            |[[avocado, 10]]                         |
    +---+-----+-----------------------------------------+----------------+----------------------------------------+
    

    où le array_T provient de la transformation suivante:

    from pyspark.sql.functions import expr
    
    # set up data
    spark_df = spark.createDataFrame([
            (101, 'Mark', ['apple', 'apple', 'orange', 'apple'], [16, 2, 3, 1])
          , (102, 'Twin', ['apple', 'banana', 'avocado', 'banana', 'avocado'], [5, 2, 11, 3, 1])
          , (103, 'Smith', ['avocado'], [10])
        ], ['ID', 'name', 'fruit', 'qty']
    )
    
    >>> spark_df.show(5,0)
    +---+-----+-----------------------------------------+----------------+
    |ID |name |fruit                                    |qty             |
    +---+-----+-----------------------------------------+----------------+
    |101|Mark |[apple, apple, orange, apple]            |[16, 2, 3, 1]   |
    |102|Twin |[apple, banana, avocado, banana, avocado]|[5, 2, 11, 3, 1]|
    |103|Smith|[avocado]                                |[10]            |
    +---+-----+-----------------------------------------+----------------+
    
    >>> spark_df.printSchema()
    root
     |-- ID: long (nullable = true)
     |-- name: string (nullable = true)
     |-- fruit: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- qty: array (nullable = true)
     |    |-- element: long (containsNull = true)
    

    qui parcourt le tableau fruit , si la valeur de fruit [i] = x, renvoie alors la quantité correspondante [i], sinon renvoie 0. par exemple pour ID = 101, quand x = 'orange', il renvoie un tableau [0, 0, 3, 0]


4 commentaires

wow c'est une très belle réponse merci @jxc! Pensez-vous que cela fonctionne mieux que la solution @pault?


@crash J'imagine que c'est mieux que d'utiliser un udf +1


@pault, vous êtes un utilisateur expérimenté, pensez-vous que cela devrait être la solution acceptée?


Oui, mais c'est votre décision à prendre.