Mes données d'une source de flux non liée ressemblent à ceci:
List(start, d4, d5, d6, d7, end) List(start, d4, d5, d6, d7, end, d9) List(start, d4, d5, d6, d7, end, d9, d10)
3 Réponses :
Une approche consiste à utiliser Le code ci-dessus imprime les éléments suivants: p> si vous Voulez-vous accumuler les éléments entre «Démarrer» et «Fin» au lieu d'imprimer individuellement ces éléments sur une base en streaming, vous pouvez ajuster l'extrait ci-dessus pour le faire. Alternativement, jetez un coup d'œil à stitfulmapconcat code>
: Accumulumdwhilechanchantillé Code>
à partir du Akka Streams Contrib projet. P> P>
Voici une façon qui transforme d'abord les éléments source en listes coulissantes à 2 éléments, laisse tomber les listes pré-"code> start code>" suivies de la prise de la fin de l'extrémité avant " Pour capturer les éléments dans une collection, il suffit de remplacer " code> " listes, puis conditionnellement capture de manière conditionnelle les éléments de liste à l'aide de
mapconcat code>
: runforeach (println) code> avec
Runwith (Sing.Seq [String]) Code>. P> P>
Merci @ Leo-C - Très belle approche en utilisant glissement code>!
Votre solution fonctionne, mais je n'ai pas expliqué correctement, que ma liste de messages a également "démarrage" et "fin" répéter plusieurs fois! J'ai mis à jour la question initiale en conséquence.
alternative peut être d'utiliser GroupéWithIn (poids maximum: long, maxduration: finesse) code>
, qui regroupe des éléments par fonction de temps et de poids. L'astuce consiste à attribuer zéro poids à tous les éléments sauf "fin" et à faire "extrémité" d'être suffisamment lourd pour être égal ou supérieur au poids maximum code> Poids:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
implicit val system = ActorSystem("collection-accumulator")
implicit val materializer = ActorMaterializer()
val source = Source(list) // actual Source is unbound, has many more items between "start" and "end"
val maxDuration = 120.seconds // put arbitrarily high duration ehre
val resultFuture = Source(list)
// accumulates everything up until and including "end" element
// essentially splits at "end" elements
.groupedWeightedWithin(1L, maxDuration)({
case "end" => 1L
case _ => 0
})
.map(accumulated =>
accumulated
.dropWhile(_ != "start") // drop everything till "start" element
.drop(1) // drop "start"
.takeWhile(_ != "end") // take everything until "end" is seen
)
// Run and accumulate into seq - result will be Seq[Seq[String]]
.runWith(Sink.seq)
println()
Await.result(resultFuture, 1.second) # Vector(Vector(d4, d5, d6, d7), Vector(d11, d12, d13))