4
votes

Comment réduire par clé dans "Scala" [Not In Spark]

J'essaie de réduireByKeys dans Scala, y a-t-il une méthode pour réduire les valeurs basées sur les clés de Scala. [Je sais que nous pouvons faire par la méthode reductionByKey dans Spark, mais comment faire la même chose dans Scala? ]

Les données d'entrée sont:

(2,379.99)
(5,499.93)
.......

Après l'étape ci-dessus, j'obtiens le résultat comme suit:

(2,250.0)
(2,129.99)
(4,49.98)
(4,299.95)
(4,150.0)
(4,199.92)
(5,299.98)
(5,299.95)

Attendu Résultat:

val File = Source.fromFile("C:/Users/svk12/git/data/retail_db/order_items/part-00000")
                 .getLines()
                 .toList

 val map = File.map(x => x.split(","))
               .map(x => (x(1),x(4)))

  map.take(10).foreach(println)


1 commentaires

Je pense qu'il manque un groupe par étape.


5 Réponses :


1
votes

Il n'y a rien de intégré, mais vous pouvez l'écrire comme ceci:

def reduceByKey[A, B](items: Traversable[(A, B)])(f: (B, B) => B): Map[A, B] = {
  items
    .groupBy { case (a, _) => a }
    .mapValues(_.map { case (_, b) => b }.reduce(f))
    // mapValues returns a view, view.force changes it back to a realized map
    .view.force
}

Il y a un peu d'espace pour l'optimiser (par exemple utiliser des cartes mutables), mais l'idée générale reste la même .

Une autre approche, plus déclarative mais moins efficace (crée plusieurs collections intermédiaires; peut être réécrite mais avec perte de clarté:

def reduceByKey[A, B](items: Traversable[(A, B)])(f: (B, B) => B): Map[A, B] = {
  var result = Map.empty[A, B]
  items.foreach {
    case (a, b) =>
      result += (a -> result.get(a).map(b1 => f(b1, b)).getOrElse(b))
  }
  result
}


2 commentaires

Il serait préférable de curry cette fonction pour que le compilateur connaisse le type de B pour le paramètre f . De plus, fold est préférable à map / getOrElse .


Oui, le curry a du sens, mais fold est une question de goût :) Personnellement, je n'aime pas que cela ressemble à fold (a) (_ + b) plutôt qu'à fold (none = a, some = _ + b) (comme par exemple cata dans scalaz).



1
votes

Commencez par grouper le tuple en utilisant la clé, le premier élément ici, puis réduisez. Le code suivant fonctionnera -

val reducedList = map.groupBy(_._1).map(l => (l._1, l._2.map(_._2).reduce(_+_)))
print(reducedList)


0 commentaires

3
votes

Il semble que vous vouliez la somme de certaines valeurs d'un fichier. Un problème est que les fichiers sont des chaînes, vous devez donc convertir la String en un format numérique avant de pouvoir être additionnée.

Voici les étapes que vous pouvez utiliser.

io.Source.fromFile("so.txt") //open file
  .getLines()                //read line-by-line
  .map(_.split(","))         //each line is Array[String]
  .toSeq                     //to something that can groupBy()
  .groupBy(_(1))             //now is Map[String,Array[String]]
  .mapValues(_.map(_(4).toInt).sum) //now is Map[String,Int]
  .toSeq                     //un-Map it to (String,Int) tuples
  .sorted                    //presentation order
  .take(10)                  //sample
  .foreach(println)          //report

Ceci sera, bien sûr, déclenché si des données de fichier ne sont pas au format requis.


0 commentaires

2
votes

À partir de Scala 2.13 , vous pouvez utiliser le groupMapReduce qui est (comme son nom l'indique) l'équivalent d'un groupBy suivi de mapValues ​​ code > et une étape de réduction :

seq.groupBy(_(1)).mapValues(_.map(_(4).toDouble).reduce(_ + _))

L'étape groupMapReduce :

  • group s les tableaux divisés par leur 2ème élément ( _ (1) ) (partie du groupe du groupe MapReduce) p>

  • mappe chaque occurrence de tableau dans chaque groupe à son 4ème élément et le transtape en Double ( _ (4) .toDouble code >) (partie de la carte du groupe Carte Réduire)

  • réduire les valeurs de dans chaque groupe ( _ + _ ) en les additionnant (réduire une partie de groupMap Réduire ).

  • / p>

Ceci est une version en un seul passage de ce qui peut être traduit par:

io.Source.fromFile("file.txt")
  .getLines.to(LazyList)
  .map(_.split(','))
  .groupMapReduce(_(1))(_(4).toDouble)(_ + _)

Notez également le cast de Iterator vers LazyList afin d'utiliser une collection qui fournit groupMapReduce (nous n'utilisons pas de Stream , depuis le démarrage de Scala 2.13 code >, LazyList est le remplacement recommandé de Streams).


0 commentaires

0
votes

Voici une autre solution utilisant un foldLeft:

val File : List[String] = ???

File.map(x => x.split(","))
  .map(x => (x(1),x(4).toInt))
  .foldLeft(Map.empty[String,Int]){case (state, (key,value)) => state.updated(key,state.get(key).getOrElse(0)+value)}
  .toSeq
  .sortBy(_._1)
  .take(10)
  .foreach(println)


0 commentaires