J'ai l'exemple de dataframe suivant
+---+----+--------------------+-------------+-------------------------+ | ID|name| fruit| qty| Result| +---+----+--------------------+-------------+-------------------------+ |101|Mark|[apple, apple, or...|[16, 2, 3, 1]|[(apple,19), (orange,3)] | +---+----+--------------------+-------------+-------------------------+
et je voudrais créer une autre colonne qui contient un résultat similaire à ce que j'obtiendrais avec un pandas groupby ('fruit') .sum ()
sum_cols = udf(lambda x: pd.DataFrame({'fruits': x[0], 'qty': x[1]}).groupby('fruits').sum()) spark_df.withColumn('Result', sum_cols(F.struct('fruit', 'qty'))).show()
Le résultat ci-dessus pourrait être stocké dans la nouvelle colonne sous n'importe quelle forme (soit une chaîne, un dictionnaire, une liste de tuples ...) .
J'ai essayé une approche similaire à la suivante qui ne fonctionne pas
qty fruits apple 19 orange 3
Un exemple de dataframe de résultat pourrait être
XXX
Avez-vous des suggestions sur la manière dont je pourrais y parvenir?
Merci
Modifier: sous Spark 2.4 .3
3 Réponses :
Si vous avez une étincelle cette réponse ):
+---+--------------------------+ |ID |Result | +---+--------------------------+ |101|[[orange, 3], [apple, 19]]| +---+--------------------------+
Sortie:
df_aggregated.groupby('ID').agg(F.collect_list(F.struct(F.col('fruit'), F.col('qty'))).alias('Result')).show()
Ensuite, préparez le résultat souhaité. Trouvez d'abord le dataframe agrégé:
+---+------+---+ | ID| fruit|qty| +---+------+---+ |101|orange| 3| |101| apple| 19| +---+------+---+
Résultat:
df_aggregated = df_split.groupby('ID', 'fruit').agg(F.sum('qty').alias('qty')) df_aggregated.show()
Et enfin changez-le au format souhaité: p >
+---+----+------+---+ | ID|name| fruit|qty| +---+----+------+---+ |101|Mark| apple| 16| |101|Mark| apple| 2| |101|Mark|orange| 3| |101|Mark| apple| 1| +---+----+------+---+
Résultat:
df_split = (spark_df.rdd.flatMap(lambda row: [(row.ID, row.name, f, q) for f, q in zip(row.fruit, row.qty)]).toDF(["ID", "name", "fruit", "qty"])) df_split.show()
udf
a probablement de meilleures performances que exploser
dans ce cas
Je ne sais pas, mais j'ai changé la réponse en RDD
au lieu d'exploser.
J'essaye ça. Je pense que la distribution du code sur plusieurs lignes aiderait à la lisibilité.
udf
serait presque sûrement (certainement?) meilleur que rdd
aussi
Cela fonctionne également, mais la solution pault est beaucoup plus propre et plus facile à comprendre.
Il peut y avoir un moyen sophistiqué de le faire en utilisant uniquement les fonctions API sur Spark 2.4+, peut-être avec une combinaison de arrays_zip
et aggregate
, mais je ne peux penser à aucun qui n'implique pas un exploser
étape suivie d'un groupBy
. Dans cet esprit, utiliser un udf
peut en fait être mieux pour vous dans ce cas.
Je pense que créer un pandas
DataFrame juste dans le but d'appeler .groupby (). sum ()
est exagéré. De plus, même si vous l'avez fait de cette façon, vous devrez convertir la sortie finale en une structure de données différente car un udf
ne peut pas renvoyer un pandas
DataFrame.
Voici un moyen avec un udf
en utilisant collections.defaultdict
:
from pyspark.sql.functions import array, col spark_df.withColumn( "Result", sum_cols(array([col("fruit"), col("qty")])) ).show(truncate=False) #+---+----+-----------------------------+-------------+--------------------------+ #|ID |name|fruit |qty |Result | #+---+----+-----------------------------+-------------+--------------------------+ #|101|Mark|[apple, apple, orange, apple]|[16, 2, 3, 1]|[[orange, 3], [apple, 19]]| #+---+----+-----------------------------+-------------+--------------------------+
J'aime ta solution pault, merci pour ton temps. Cependant, j'obtiens cette erreur Py4JJavaError: Une erreur s'est produite lors de l'appel de o3564.showString .... Causée par: net.razorvine.pickle.PickleException: aucun argument attendu pour la construction de ClassDict (pour numpy.dtype) code> .. ça vous dit quelque chose?
vient d'ajouter à partir de pyspark.sql.types import ArrayType, StructType, StringType, IntegerType, StructField
Essayez également de forcer le type int Python dans la sortie return [(k, int (sum (v))) pour k, v dans d.items ()]
, de sorte que le résultat du udf sera certainement de type natif Python.
@RichardNemeth c'est un excellent point - ce qui me fait me demander: êtes-vous sûr d'utiliser __builtin __. Sum
et non numpy.sum
ou pyspark.sql.functions .sum
? Pourquoi vous ne devriez pas utiliser import *
. Modifier Si vous obtenez un objet numpy, cela suggère que vous utilisez numpy.sum
.
oui, la solution de Richard semblait résoudre le problème!
@crash J'ai mis à jour la réponse en utilisant un defaultdict
de int
au lieu d'une liste
. de cette façon, vous pouvez faire la somme à l'intérieur de la boucle et éviter d'appeler sum
à la fin. Cela devrait également résoudre votre problème!
C'est aussi plus propre! Merci beaucoup @pault, c'était extrêmement intéressant et formateur.
Comme @pault l'a mentionné, à partir de Spark 2.4+ , vous pouvez utiliser la fonction intégrée Spark SQL pour gérer votre tâche, voici un moyen avec array_distinct + transform + aggregate Configurer l'instruction SQL: la fonction ci-dessus agréger (.. x ..) prend la forme simple de la somme de tous les éléments de array_T où le array_T provient de la transformation suivante: qui parcourt le tableau transform(sequence(0,size(fruit)-1), i -> IF(fruit[i] = x, qty[i], 0))
aggregate(array_T, 0, (y,z) -> y + z)
Explication :
array_distinct (fruit)
pour trouver toutes les entrées distinctes dans le tableau fruit
x
) de x
en (x, agréger (.. x ..))
li>
stmt = '''
transform(array_distinct(fruit), x -> (x, aggregate(
transform(sequence(0,size(fruit)-1), i -> IF(fruit[i] = x, qty[i], 0))
, 0
, (y,z) -> int(y + z)
))) AS sum_fruit
'''
>>> spark_df.withColumn('sum_fruit', expr(stmt)).show(10,0)
+---+-----+-----------------------------------------+----------------+----------------------------------------+
|ID |name |fruit |qty |sum_fruit |
+---+-----+-----------------------------------------+----------------+----------------------------------------+
|101|Mark |[apple, apple, orange, apple] |[16, 2, 3, 1] |[[apple, 19], [orange, 3]] |
|102|Twin |[apple, banana, avocado, banana, avocado]|[5, 2, 11, 3, 1]|[[apple, 5], [banana, 5], [avocado, 12]]|
|103|Smith|[avocado] |[10] |[[avocado, 10]] |
+---+-----+-----------------------------------------+----------------+----------------------------------------+
from pyspark.sql.functions import expr
# set up data
spark_df = spark.createDataFrame([
(101, 'Mark', ['apple', 'apple', 'orange', 'apple'], [16, 2, 3, 1])
, (102, 'Twin', ['apple', 'banana', 'avocado', 'banana', 'avocado'], [5, 2, 11, 3, 1])
, (103, 'Smith', ['avocado'], [10])
], ['ID', 'name', 'fruit', 'qty']
)
>>> spark_df.show(5,0)
+---+-----+-----------------------------------------+----------------+
|ID |name |fruit |qty |
+---+-----+-----------------------------------------+----------------+
|101|Mark |[apple, apple, orange, apple] |[16, 2, 3, 1] |
|102|Twin |[apple, banana, avocado, banana, avocado]|[5, 2, 11, 3, 1]|
|103|Smith|[avocado] |[10] |
+---+-----+-----------------------------------------+----------------+
>>> spark_df.printSchema()
root
|-- ID: long (nullable = true)
|-- name: string (nullable = true)
|-- fruit: array (nullable = true)
| |-- element: string (containsNull = true)
|-- qty: array (nullable = true)
| |-- element: long (containsNull = true)
fruit
, si la valeur de fruit [i] = x, renvoie alors la quantité correspondante [i], sinon renvoie 0. par exemple pour ID = 101, quand x = 'orange', il renvoie un tableau [0, 0, 3, 0]
wow c'est une très belle réponse merci @jxc! Pensez-vous que cela fonctionne mieux que la solution @pault?
@crash J'imagine que c'est mieux que d'utiliser un udf
+1
@pault, vous êtes un utilisateur expérimenté, pensez-vous que cela devrait être la solution acceptée?
Oui, mais c'est votre décision à prendre.
Quelle est votre sortie souhaitée? Cela ne ressort pas clairement de la description, veuillez le montrer explicitement.
merci pour votre commentaire, c'est fait!
Quelle version d'étincelle? S'il s'agit de spark 2.4+, vous pouvez utiliser
array_zip
. Les anciennes versions rendent cela un peu plus difficile.J'utilise la version 2.4.3. Pouvez-vous me fournir un exemple d'utilisation pour cela dans mon cas?
Dans mon expérience (limitée), j'ai vu du code pyspark "natif" fonctionner 10 fois plus vite que les UDF (et en particulier les UDAF), même lorsqu'un
exploser
était utilisé. Juste quelque chose à garder à l'esprit..