0
votes

Comptage des instances de valeurs dans les listes dans une seule colonne

J'ai un dataframe PySpark où 1 colonne se compose de listes de chaînes. Je voudrais compter le nombre d'instances de chaque élément dans chaque liste de chaînes sur toutes les lignes. Pseudocode:

counter = Counter()
for attr_list in df['attr_list']:
   counter.update(attr_list)

Une autre façon de faire serait de concaténer toutes les listes sur toutes les lignes et de construire un compteur à partir de la seule énorme liste. Y a-t-il un moyen efficace de faire cela dans PySpark?

La sortie correcte serait un seul objet collections.Counter () rempli avec le nombre d'occurrences de chaque élément dans tous les listes dans toutes les colonnes, c'est-à-dire si pour une colonne donnée, la ligne 1 a la liste ['a', 'b', 'c'] et la ligne 2 a la liste ['b' , 'c', 'd'] , nous obtiendrions un compteur qui ressemble à {'a': 1, 'b': 2, 'c': 2, 'd': 1} .


2 commentaires

quelle est votre version Spark?


Est-ce que cela répond à votre question? Obtenir la taille / longueur d'une colonne de tableau


3 Réponses :


0
votes

Vous pouvez essayer d'utiliser les méthodes distinct et flatMap de rdd, pour cela convertissez simplement la colonne en et rdd et effectuez ces opérations.

counter = (df
           .select("attr_list")
           .rdd
           # join all strings in the list and then split to get each word
           .map(lambda x: " ".join(x).split(" ")) 
           .flatMap(lambda x: x)
           # make a tuple for each word so later it can be grouped by to get its frequency count
           .map(lambda x: (x, 1))
           .reduceByKey(lambda a,b: a+b)
           .collectAsMap())


0 commentaires

0
votes

Une option de conversion en RDD est de fusionner tous les tableaux en un seul puis d'utiliser un objet Counter dessus.

from pyspark.sql.functions import explode
explode_df = df.withColumn('exploded_list',explode(df.listCol))
counts = explode_df.groupBy('exploded_list').count()
counts_tuple = counts.rdd.reduce(lambda a,b : a+b)
print({counts_tuple[i]:counts_tuple[i+1] for i in range(0,len(counts_tuple)-1,2)})

Une autre option avec exploser et groupBy et fusionner le résultat dans un dictionnaire .

from collections import Counter
all_lists = df.select('listCol').rdd
print(Counter(all_lists.map(lambda x: [i for i in x[0]]).reduce(lambda x,y: x+y)))


0 commentaires

2
votes

Si vous connaissez les éléments que vous devez compter, vous pouvez les utiliser avec spark2.4 +. et ce sera très rapide. (en utilisant filtre de fonction d'ordre supérieur et structs )

from pyspark.sql import functions as F

a=df.withColumn("atr", F.expr("""transform(array_distinct(atr_list),x->aggregate(atr_list,0,(acc,y)->\
                               IF(y=x, acc+1,acc)))"""))\
  .withColumn("zip", F.explode(F.arrays_zip(F.array_distinct("atr_list"),("atr"))))\
  .select("zip.*").withColumnRenamed("0","elements")\
  .groupBy("elements").agg(F.sum("atr").alias("sum"))\
  .collect()

{a[i][0]: a[i][1] for i in range(len(a))} 

Sortie: {'a': 1, 'b': 3, 'c': 2, 'd': 1}

2ème Méthode, utilisant transformer, agréger, exploser et grouper (ne nécessite pas la spécification d'éléments):

df.show()

#+------------+
#|    atr_list|
#+------------+
#|[a, b, b, c]|
#|   [b, c, d]|
#+------------+

elements=['a','b','c','d']

from pyspark.sql import functions as F
collected=df.withColumn("struct", F.struct(*[(F.struct(F.expr("size(filter(atr_list,x->x={}))"\
                                                    .format("'"+y+"'"))).alias(y)) for y in elements]))\
            .select(*[F.sum(F.col("struct.{}.col1".format(x))).alias(x) for x in elements])\
            .collect()[0]

{elements[i]: [x for x in collected][i] for i in range(len(elements))} 


4 commentaires

Merci. Malheureusement, dans cette situation, il y a plus de 3000 éléments, c'est pourquoi j'espérais utiliser quelque chose qui ressemble à un dict python Counter (). Utiliser une table de hachage pour incrémenter les variables accessibles en utilisant uniquement les éléments de liste dans la ligne donnée devrait être beaucoup plus rapide que d'itérer une fois pour chaque élément.


Je ne suis pas d'accord, le code ci-dessus n'est pas itératif, il se produit immédiatement sous le capot, et il n'y a pas de moyen rapide d'utiliser quelque chose comme counter () dict in spark. Avez-vous essayé de mettre tous vos 3000 éléments distincts dans la liste des éléments comme ci-dessus et d'exécuter le code?


Salut Mohammad, merci pour la solution. J'ai eu des résultats très intéressants. Je faisais cela en faisant simplement un rdd.collect () sur la colonne d'attribut, puis en parcourant la liste en mettant à jour un compteur dict un par un. En faisant cela, c'est en fait le rdd.collect () qui prend le plus de temps. Votre solution est beaucoup plus rapide, mais les deux solutions évoluent de manière très différente. J'ai testé un échantillon de 10k et un échantillon de 50k. La méthode avec le compteur dict et collect () a pris respectivement 700 et 713 secondes pour chaque échantillon. Votre méthode a pris respectivement 31 secondes et 48 secondes. Merci!


@theShmoo pas de problème et merci pour les commentaires car j'étais curieux de connaître la vitesse. et si vous aviez des données beaucoup plus volumineuses, la différence de vitesse serait beaucoup plus grande aussi