J'ai un dataframe PySpark où 1 colonne se compose de listes de chaînes. Je voudrais compter le nombre d'instances de chaque élément dans chaque liste de chaînes sur toutes les lignes. Pseudocode:
counter = Counter() for attr_list in df['attr_list']: counter.update(attr_list)
Une autre façon de faire serait de concaténer toutes les listes sur toutes les lignes et de construire un compteur à partir de la seule énorme liste. Y a-t-il un moyen efficace de faire cela dans PySpark?
La sortie correcte serait un seul objet collections.Counter ()
rempli avec le nombre d'occurrences de chaque élément dans tous les listes dans toutes les colonnes, c'est-à-dire si pour une colonne donnée, la ligne 1 a la liste ['a', 'b', 'c']
et la ligne 2 a la liste ['b' , 'c', 'd']
, nous obtiendrions un compteur qui ressemble à {'a': 1, 'b': 2, 'c': 2, 'd': 1}
.
3 Réponses :
Vous pouvez essayer d'utiliser les méthodes distinct
et flatMap
de rdd, pour cela convertissez simplement la colonne en et rdd et effectuez ces opérations.
counter = (df .select("attr_list") .rdd # join all strings in the list and then split to get each word .map(lambda x: " ".join(x).split(" ")) .flatMap(lambda x: x) # make a tuple for each word so later it can be grouped by to get its frequency count .map(lambda x: (x, 1)) .reduceByKey(lambda a,b: a+b) .collectAsMap())
Une option de conversion en RDD est de fusionner tous les tableaux en un seul puis d'utiliser un objet Counter
dessus.
from pyspark.sql.functions import explode explode_df = df.withColumn('exploded_list',explode(df.listCol)) counts = explode_df.groupBy('exploded_list').count() counts_tuple = counts.rdd.reduce(lambda a,b : a+b) print({counts_tuple[i]:counts_tuple[i+1] for i in range(0,len(counts_tuple)-1,2)})
Une autre option avec exploser
et groupBy
et fusionner le résultat dans un dictionnaire
.
from collections import Counter all_lists = df.select('listCol').rdd print(Counter(all_lists.map(lambda x: [i for i in x[0]]).reduce(lambda x,y: x+y)))
Si vous connaissez les 2ème Méthode, utilisant éléments
que vous devez compter, vous pouvez les utiliser avec spark2.4 +.
et ce sera très rapide. (en utilisant filtre de fonction d'ordre supérieur
et structs
) from pyspark.sql import functions as F
a=df.withColumn("atr", F.expr("""transform(array_distinct(atr_list),x->aggregate(atr_list,0,(acc,y)->\
IF(y=x, acc+1,acc)))"""))\
.withColumn("zip", F.explode(F.arrays_zip(F.array_distinct("atr_list"),("atr"))))\
.select("zip.*").withColumnRenamed("0","elements")\
.groupBy("elements").agg(F.sum("atr").alias("sum"))\
.collect()
{a[i][0]: a[i][1] for i in range(len(a))}
Sortie: {'a': 1, 'b': 3, 'c': 2, 'd': 1}
transformer, agréger, exploser et grouper
(ne nécessite pas la spécification d'éléments): df.show()
#+------------+
#| atr_list|
#+------------+
#|[a, b, b, c]|
#| [b, c, d]|
#+------------+
elements=['a','b','c','d']
from pyspark.sql import functions as F
collected=df.withColumn("struct", F.struct(*[(F.struct(F.expr("size(filter(atr_list,x->x={}))"\
.format("'"+y+"'"))).alias(y)) for y in elements]))\
.select(*[F.sum(F.col("struct.{}.col1".format(x))).alias(x) for x in elements])\
.collect()[0]
{elements[i]: [x for x in collected][i] for i in range(len(elements))}
Merci. Malheureusement, dans cette situation, il y a plus de 3000 éléments, c'est pourquoi j'espérais utiliser quelque chose qui ressemble à un dict python Counter (). Utiliser une table de hachage pour incrémenter les variables accessibles en utilisant uniquement les éléments de liste dans la ligne donnée devrait être beaucoup plus rapide que d'itérer une fois pour chaque élément.
Je ne suis pas d'accord, le code ci-dessus n'est pas itératif, il se produit immédiatement sous le capot, et il n'y a pas de moyen rapide d'utiliser quelque chose comme counter () dict in spark. Avez-vous essayé de mettre tous vos 3000 éléments distincts dans la liste des éléments comme ci-dessus et d'exécuter le code?
Salut Mohammad, merci pour la solution. J'ai eu des résultats très intéressants. Je faisais cela en faisant simplement un rdd.collect () sur la colonne d'attribut, puis en parcourant la liste en mettant à jour un compteur dict un par un. En faisant cela, c'est en fait le rdd.collect () qui prend le plus de temps. Votre solution est beaucoup plus rapide, mais les deux solutions évoluent de manière très différente. J'ai testé un échantillon de 10k et un échantillon de 50k. La méthode avec le compteur dict et collect () a pris respectivement 700 et 713 secondes pour chaque échantillon. Votre méthode a pris respectivement 31 secondes et 48 secondes. Merci!
@theShmoo pas de problème et merci pour les commentaires car j'étais curieux de connaître la vitesse. et si vous aviez des données beaucoup plus volumineuses, la différence de vitesse serait beaucoup plus grande aussi
quelle est votre version Spark?
Est-ce que cela répond à votre question? Obtenir la taille / longueur d'une colonne de tableau