J'ai un RDD avec un grand nombre d'entrées KeyVal. La même clé sera présente plusieurs fois et je suis intéressé par l'extraction des N premières entrées pour chaque clé. Étant très nouveau dans Spark, je n'ai jusqu'à présent pas été en mesure de comprendre comment faire cela, donc toute aide serait appréciée.
L'entrée pourrait ressembler à:
[('a',1),('a',2),('b',3),('b',4)]
Sortie finale souhaitée pour, par exemple, les 2 premières entrées pour chaque clé:
output = {'a':[1,2], 'b':[3,4]}
Si je n'étais intéressé que par les n premières entrées en général, je pourrais bien sûr simplement appliquer prenez (n)
au RDD initial. Ce que je recherche, c'est un moyen de faire quelque chose de similaire à la fonction take (n)
, mais en itérant sur chaque clé. Si je peux créer un RDD qui contient juste le sous-ensemble souhaité du RDD d'origine, ce serait très bien. La collecte dans un dictionnaire par la suite est moins un problème.
Donc, la sortie intermédiaire (style RDD) serait:
rdd = sc.parallelize([('a',1),('a',2),('b',3),('a',5),('b',4),('b',6)])
Comment puis-je y parvenir en PySpark?
Modifier: La question en double suggérée nécessitait spécifiquement une solution utilisant reductionByKey
, ce qui n'est pas obligatoire dans ce cas.
3 Réponses :
Écoutez ici enfant ...
.map(lambda x: (x[0], list(x[1])[:nLength]))
Explication:
rdd.groupByKey()
Regroupez le RDD par la clé (dans notre cas: 'a' ou 'b'). Résultats en:
[('a', ResultIterable), ('b', ResultIterable)]
nLength = 2 rdd.groupByKey().map(lambda x: (x[0], list(x[1])[:nLength]))
Cette partie a créé un tuple: Sur le côté gauche, la clé ('a' ou 'b'), et sur le côté droit, nous créons une liste à partir de ResultIterable (x [1]), puis coupons la liste de 0 à nLength ([: nLength] ).
Profitez-en!
Peut-être que quelque chose de simple comme celui-ci ferait l'affaire:
[('b', [3, 4]), ('a', [1, 2])]
Sortie:
rdd = sc.parallelize([('a',1),('a',2),('b',3),('a',5),('b',4),('b',6)]) n = 2 rdd.groupByKey().map(lambda x : (x[0], list(x[1])[:n])).collect()
essayez ceci:
def slice_list(s,no_of_values): return s[0:no_of_values] rdd.groupByKey().map(lambda x: (x[0],slice_list( list(x[1]),2))).collect()
Copie possible de PYSPARK: utilisez le tri avec reductionByKey