J'ai un dataframe pyspark comme celui-ci:
| id | cat | --------------------------- | 1 | a -> 1, b -> 1 | | 2 | b -> 2, c -> 1 | | 3 | a -> 3 |
Maintenant, je veux les regrouper par "id" et les agréger dans une carte comme ceci:
| id | time | cat | ------------------------- 1 t1 a 1 t2 b 2 t3 b 2 t4 c 2 t5 b 3 t6 a 3 t7 a 3 t8 a
Je suppose que nous pouvons utiliser la fonction de collect_list pyspark sql pour les recueillir la liste, et alors je pourrais appliquer une fonction UDF pour transformer la liste en dict. Mais existe-t-il un autre moyen (plus court ou plus efficace) de le faire?
3 Réponses :
Voici comment je l'ai fait.
Code
(df.groupBy(['id', 'cat']) .count() .select(['id',F.create_map('cat', 'count').alias('map')]) .groupBy('id') .agg(F.collect_list('map').alias('cat')) .show() ) #+---+--------------------+ #| id| cat| #+---+--------------------+ #| 1|[[a -> 1], [b -> 1]]| #| 3| [[a -> 3]]| #| 2|[[b -> 2], [c -> 1]]| #+---+--------------------+
Production
+-------+---------------------+ | id|collect_list(arrowed)| +-------+---------------------+ | 1 | [a -> 1, b -> 1] | | 3 | [a -> 3] | | 2 | [b -> 2, c -> 1] | +-------+---------------------+
Éditer
import pyspark.sql.functions as F from pyspark.sql.types import StringType df = spark.createDataFrame([(1,'t1','a'),(1,'t2','b'),(2,'t3','b'),(2,'t4','c'),(2,'t5','b'),\ (3,'t6','a'),(3,'t7','a'),(3,'t8','a')],\ ('id','time','cat')) (df.groupBy(['id', 'cat']) .agg(F.count(F.col('cat')).cast(StringType()).alias('counted')) .select(['id', F.concat_ws('->', F.col('cat'), F.col('counted')).alias('arrowed')]) .groupBy('id') .agg(F.collect_list('arrowed')) .show() )
Vous avez simplement ajouté la chaîne ->
entre les clés et les valeurs. Ce n'est pas du tout ce que la question pose.
@pault Ops, j'ai lu cette question trop rapidement. Noté et édité.
C'est mieux, mais ce n'est toujours pas la bonne sortie. Vous avez un tableau de cartes, plutôt qu'un MapType
Vous pouvez utiliser cette fonction de pyspark.sql.functions.map_from_entries
si nous considérons que votre dataframe est df, vous devez faire ceci:
import pyspark.sql.functions as F df1 = df.groupby("id", "cat").count() df2 = df1.groupby("id")\ .agg(F.map_from_entries(F.collect_list(F.struct("cat","count"))).alias("cat"))
similaire à la réponse de yasi
import pyspark.sql.functions as F df1 = df.groupby("id", "cat").count() df2 = df1.groupby("id")\ .agg(F.map_from_arrays(F.collect_list("cat"),F.collect_list("count"))).alias("cat"))