0
votes

Collection Scala non matérialisée dans une simple opération de flux AKKA

Mes données d'une source de flux non liée ressemblent à ceci:

List(start, d4, d5, d6, d7, end)
List(start, d4, d5, d6, d7, end, d9)
List(start, d4, d5, d6, d7, end, d9, d10)


0 commentaires

3 Réponses :


2
votes

Une approche consiste à utiliser stitfulmapconcat : xxx

Le code ci-dessus imprime les éléments suivants: xxx

si vous Voulez-vous accumuler les éléments entre «Démarrer» et «Fin» au lieu d'imprimer individuellement ces éléments sur une base en streaming, vous pouvez ajuster l'extrait ci-dessus pour le faire. Alternativement, jetez un coup d'œil à Accumulumdwhilechanchantillé à partir du Akka Streams Contrib projet.


0 commentaires

0
votes

Voici une façon qui transforme d'abord les éléments source en listes coulissantes à 2 éléments, laisse tomber les listes pré-"code> start " suivies de la prise de la fin de l'extrémité avant "" " listes, puis conditionnellement capture de manière conditionnelle les éléments de liste à l'aide de mapconcat : xxx

Pour capturer les éléments dans une collection, il suffit de remplacer runforeach (println) avec Runwith (Sing.Seq [String]) .


2 commentaires

Merci @ Leo-C - Très belle approche en utilisant glissement !


Votre solution fonctionne, mais je n'ai pas expliqué correctement, que ma liste de messages a également "démarrage" et "fin" répéter plusieurs fois! J'ai mis à jour la question initiale en conséquence.



0
votes

alternative peut être d'utiliser GroupéWithIn (poids maximum: long, maxduration: finesse) code> , qui regroupe des éléments par fonction de temps et de poids. L'astuce consiste à attribuer zéro poids à tous les éléments sauf "fin" et à faire "extrémité" d'être suffisamment lourd pour être égal ou supérieur au poids maximum code> Poids:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

import scala.concurrent.Await
import scala.concurrent.duration.DurationInt

implicit val system = ActorSystem("collection-accumulator")
implicit val materializer = ActorMaterializer()

val source = Source(list) // actual Source is unbound, has many more items between "start" and "end"

val maxDuration = 120.seconds // put arbitrarily high duration ehre

val resultFuture = Source(list)
  // accumulates everything up until and including "end" element
  // essentially splits at "end" elements
  .groupedWeightedWithin(1L, maxDuration)({
    case "end" => 1L
    case _ => 0
  })
  .map(accumulated => 
    accumulated
     .dropWhile(_ != "start") // drop everything till "start" element
     .drop(1)                 // drop "start"
     .takeWhile(_ != "end")   // take everything until "end" is seen
  ) 
  // Run and accumulate into seq - result will be Seq[Seq[String]]
  .runWith(Sink.seq)

println()
Await.result(resultFuture, 1.second) # Vector(Vector(d4, d5, d6, d7), Vector(d11, d12, d13))


0 commentaires