1
votes

Comment collecter une carte après groupe par dans Pyspark Dataframe?

J'ai un dataframe pyspark comme celui-ci:

|   id  |       cat       |
---------------------------
|   1   |  a -> 1, b -> 1 |
|   2   |  b -> 2, c -> 1 |
|   3   |  a -> 3         |

Maintenant, je veux les regrouper par "id" et les agréger dans une carte comme ceci:

|  id  |  time  |  cat  |
-------------------------
   1       t1       a
   1       t2       b
   2       t3       b
   2       t4       c
   2       t5       b
   3       t6       a
   3       t7       a
   3       t8       a

Je suppose que nous pouvons utiliser la fonction de collect_list pyspark sql pour les recueillir la liste, et alors je pourrais appliquer une fonction UDF pour transformer la liste en dict. Mais existe-t-il un autre moyen (plus court ou plus efficace) de le faire?


0 commentaires

3 Réponses :


0
votes

Voici comment je l'ai fait.

Code

(df.groupBy(['id', 'cat'])
   .count()
   .select(['id',F.create_map('cat', 'count').alias('map')])
   .groupBy('id')
   .agg(F.collect_list('map').alias('cat'))
   .show()
)

#+---+--------------------+
#| id|                 cat|
#+---+--------------------+
#|  1|[[a -> 1], [b -> 1]]|
#|  3|          [[a -> 3]]|
#|  2|[[b -> 2], [c -> 1]]|
#+---+--------------------+

Production

+-------+---------------------+
|     id|collect_list(arrowed)|
+-------+---------------------+
|   1   |    [a -> 1, b -> 1] |
|   3   |            [a -> 3] |
|   2   |    [b -> 2, c -> 1] |
+-------+---------------------+

Éditer

import pyspark.sql.functions as F
from pyspark.sql.types import StringType

df = spark.createDataFrame([(1,'t1','a'),(1,'t2','b'),(2,'t3','b'),(2,'t4','c'),(2,'t5','b'),\
(3,'t6','a'),(3,'t7','a'),(3,'t8','a')],\
('id','time','cat'))

(df.groupBy(['id', 'cat'])
   .agg(F.count(F.col('cat')).cast(StringType()).alias('counted'))
   .select(['id', F.concat_ws('->', F.col('cat'), F.col('counted')).alias('arrowed')])
   .groupBy('id')
   .agg(F.collect_list('arrowed'))
   .show()
)


3 commentaires

Vous avez simplement ajouté la chaîne -> entre les clés et les valeurs. Ce n'est pas du tout ce que la question pose.


@pault Ops, j'ai lu cette question trop rapidement. Noté et édité.


C'est mieux, mais ce n'est toujours pas la bonne sortie. Vous avez un tableau de cartes, plutôt qu'un MapType



4
votes

Vous pouvez utiliser cette fonction de pyspark.sql.functions.map_from_entries si nous considérons que votre dataframe est df, vous devez faire ceci:

import pyspark.sql.functions as F

df1 = df.groupby("id", "cat").count()
df2 = df1.groupby("id")\
         .agg(F.map_from_entries(F.collect_list(F.struct("cat","count"))).alias("cat"))


0 commentaires

0
votes

similaire à la réponse de yasi

import pyspark.sql.functions as F

df1 = df.groupby("id", "cat").count()
df2 = df1.groupby("id")\
         .agg(F.map_from_arrays(F.collect_list("cat"),F.collect_list("count"))).alias("cat"))


0 commentaires