J'essaie de réduireByKeys dans Scala, y a-t-il une méthode pour réduire les valeurs basées sur les clés de Scala. [Je sais que nous pouvons faire par la méthode reductionByKey dans Spark, mais comment faire la même chose dans Scala? ]
Les données d'entrée sont:
(2,379.99) (5,499.93) .......
Après l'étape ci-dessus, j'obtiens le résultat comme suit:
(2,250.0) (2,129.99) (4,49.98) (4,299.95) (4,150.0) (4,199.92) (5,299.98) (5,299.95)
Attendu Résultat:
val File = Source.fromFile("C:/Users/svk12/git/data/retail_db/order_items/part-00000") .getLines() .toList val map = File.map(x => x.split(",")) .map(x => (x(1),x(4))) map.take(10).foreach(println)
5 Réponses :
Il n'y a rien de intégré, mais vous pouvez l'écrire comme ceci:
def reduceByKey[A, B](items: Traversable[(A, B)])(f: (B, B) => B): Map[A, B] = { items .groupBy { case (a, _) => a } .mapValues(_.map { case (_, b) => b }.reduce(f)) // mapValues returns a view, view.force changes it back to a realized map .view.force }
Il y a un peu d'espace pour l'optimiser (par exemple utiliser des cartes mutables), mais l'idée générale reste la même .
Une autre approche, plus déclarative mais moins efficace (crée plusieurs collections intermédiaires; peut être réécrite mais avec perte de clarté:
def reduceByKey[A, B](items: Traversable[(A, B)])(f: (B, B) => B): Map[A, B] = { var result = Map.empty[A, B] items.foreach { case (a, b) => result += (a -> result.get(a).map(b1 => f(b1, b)).getOrElse(b)) } result }
Il serait préférable de curry cette fonction pour que le compilateur connaisse le type de B
pour le paramètre f
. De plus, fold
est préférable à map / getOrElse
.
Oui, le curry a du sens, mais fold
est une question de goût :) Personnellement, je n'aime pas que cela ressemble à fold (a) (_ + b)
plutôt qu'à fold (none = a, some = _ + b)
(comme par exemple cata
dans scalaz).
Commencez par grouper le tuple en utilisant la clé, le premier élément ici, puis réduisez. Le code suivant fonctionnera -
val reducedList = map.groupBy(_._1).map(l => (l._1, l._2.map(_._2).reduce(_+_))) print(reducedList)
Il semble que vous vouliez la somme de certaines valeurs d'un fichier. Un problème est que les fichiers sont des chaînes, vous devez donc convertir la String
en un format numérique avant de pouvoir être additionnée.
Voici les étapes que vous pouvez utiliser.
io.Source.fromFile("so.txt") //open file .getLines() //read line-by-line .map(_.split(",")) //each line is Array[String] .toSeq //to something that can groupBy() .groupBy(_(1)) //now is Map[String,Array[String]] .mapValues(_.map(_(4).toInt).sum) //now is Map[String,Int] .toSeq //un-Map it to (String,Int) tuples .sorted //presentation order .take(10) //sample .foreach(println) //report
Ceci sera, bien sûr, déclenché si des données de fichier ne sont pas au format requis.
À partir de L'étape / p> Ceci est une version en un seul passage de ce qui peut être traduit par: Notez également le cast de Scala 2.13
, vous pouvez utiliser le groupMapReduce
qui est (comme son nom l'indique) l'équivalent d'un groupBy
suivi de mapValues code > et une étape de
réduction
: seq.groupBy(_(1)).mapValues(_.map(_(4).toDouble).reduce(_ + _))
groupMapReduce
:
group
s les tableaux divisés par leur 2ème élément ( _ (1)
) (partie du groupe du groupe MapReduce) p> mappe
chaque occurrence de tableau dans chaque groupe à son 4ème élément et le transtape en Double
( _ (4) .toDouble code >) (partie de la carte du groupe Carte Réduire)
réduire les valeurs de
dans chaque groupe ( _ + _
) en les additionnant (réduire une partie de groupMap Réduire ). io.Source.fromFile("file.txt")
.getLines.to(LazyList)
.map(_.split(','))
.groupMapReduce(_(1))(_(4).toDouble)(_ + _)
Iterator
vers LazyList
afin d'utiliser une collection qui fournit groupMapReduce
(nous n'utilisons pas de Stream
, depuis le démarrage de Scala 2.13 code >,
LazyList
est le remplacement recommandé de Stream
s).
Voici une autre solution utilisant un foldLeft:
val File : List[String] = ??? File.map(x => x.split(",")) .map(x => (x(1),x(4).toInt)) .foldLeft(Map.empty[String,Int]){case (state, (key,value)) => state.updated(key,state.get(key).getOrElse(0)+value)} .toSeq .sortBy(_._1) .take(10) .foreach(println)
Je pense qu'il manque un groupe par étape.