Je souhaite créer un service qui combine les résultats de deux sources réactives. L'un produit Mono et un autre produit Flux. Pour la fusion, j'ai besoin de la même valeur de mono pour chaque flux émis.
Pour l'instant, j'ai quelque chose comme ça
2020-01-20 12:55:20.505 INFO [] [] [ parallel-1] SampleTest : flux started 2020-01-20 12:55:20.508 INFO [] [] [ parallel-2] SampleTest : mono started 2020-01-20 12:55:21.523 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.528 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono 2020-01-20 12:55:21.535 WARN [] [] [ parallel-2] SampleTest : signal on mono
Cela me donne ce dont j'ai besoin,
Mais j'ai remarqué que repeat () émet une quantité massive d'éléments après la mise en cache du contexte. Est-ce un problème?
Est-ce que je peux faire quelque chose pour limiter le nombre de répétitions au nombre de configurations reçues, tout en faisant toujours les deux demandes simultanément? Ou ce n'est pas un problème et je peux ignorer en toute sécurité ces éléments supplémentaires émis?
J'ai essayé d'utiliser combineLatest
mais en fonction du timing, certains éléments de la configuration peuvent être perdus et ne pas être traités .
MODIFIER
En regardant les suggestions de @Ricard Kollcaku, j'ai créé un exemple de test qui montre pourquoi ce n'est pas ce que je recherche.
2020-01-20 12:55:22.542 INFO [] [] [ parallel-3] SampleTest : flux started 2020-01-20 12:55:24.547 INFO [] [] [ parallel-4] SampleTest : mono started 2020-01-20 12:55:24.547 INFO [] [] [ parallel-5] SampleTest : mono started 2020-01-20 12:55:24.548 INFO [] [] [ parallel-6] SampleTest : mono started expected: <1> but was: <3>
En regardant les résultats des tests test1 et test2
import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; public class SampleTest { Logger LOG = LoggerFactory.getLogger(SampleTest.class); AtomicLong counter = new AtomicLong(0); Flux<String> getFlux() { return Flux.fromStream(() -> { LOG.info("flux started"); sleep(1000); return Stream.of("a", "b", "c"); }).subscribeOn(Schedulers.parallel()); } Mono<String> getMono() { return Mono.defer(() -> { counter.incrementAndGet(); LOG.info("mono started"); sleep(1000); return Mono.just("mono"); }).subscribeOn(Schedulers.parallel()); } private void sleep(final long milis) { try { Thread.sleep(milis); } catch (final InterruptedException e) { e.printStackTrace(); } } @Test void test0() { final Flux<String> result = Flux.zip( getFlux(), getMono().cache().repeat() .doOnNext(n -> LOG.warn("signal on mono", n)), (s1, s2) -> s1 + " " + s2 ); assertResults(result); } @Test void test1() { final Flux<String> result = getFlux().flatMap(s -> Mono.zip(Mono.just(s), getMono(), (s1, s2) -> s1 + " " + s2)); assertResults(result); } @Test void test2() { final Flux<String> result = getFlux().flatMap(s -> getMono().map((s1 -> s + " " + s1))); assertResults(result); } void assertResults(final Flux<String> result) { final Flux<String> flux = result; StepVerifier.create(flux) .expectNext("a mono") .expectNext("b mono") .expectNext("c mono") .verifyComplete(); Assertions.assertEquals(1L, counter.get()); }
Je dois rejeter votre proposition. Dans les deux cas, getMono est - invoqué autant de fois que d'objets en flux - invoqué après l'arrivée du premier élément de flux Et ce sont des interactions que je veux éviter. Mes services font des requêtes http sous le capot et ils peuvent prendre du temps.
Ma solution actuelle n'a pas ce problème, mais si j'ajoute un logger à mon zip, j'obtiendrai ceci
Flux.zip( service1.getConfig(), //produces flux service2.getContext() //produces mono .cache().repeat() )
Comme vous pouvez le voir, il y a beaucoup d'éléments émis en combinant cache (). Repeat ()
ensemble et je veux savoir si c'est un problème et si oui alors comment l'éviter (mais gardez l'invocation unique de l'invocation mono et parallèle).
3 Réponses :
Les bibliothèques comme Project Reactor et RxJava essaient de fournir autant de combinaisons de leurs capacités que possible, mais ne donnent pas accès aux instruments de combinaison de capacités. Et par conséquent, il y a toujours des cas secondaires qui ne sont pas couverts.
Mon propre DF4J, pour autant que je sache, est la seule bibliothèque asynchrone qui offre les moyens de combiner des capacités. Par exemple, voici comment l'utilisateur peut compresser Flux et Mono: (bien sûr, cette classe ne fait pas partie de DF4J lui-même):
got:1 and:5 got:2 and:5 got:3 and:5
et voici comment elle peut être utilisée :
@Test public void ZipActorTest() { Flux<Integer> flux = Flux.just(1,2,3); Mono<Integer> mono = Mono.just(5); ZipActor<Integer, Integer> actor = new ZipActor<Integer, Integer>(flux, mono){ @Override protected void runAction(Integer element1, Integer element2) { System.out.println("got:"+element1+" and:"+element2); } }; actor.start(); actor.join(); }
La sortie de la console est la suivante:
import org.df4j.core.dataflow.Actor; import org.df4j.core.port.InpFlow; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; abstract class ZipActor<T1, T2> extends Actor { InpFlow<T1> inpFlow = new InpFlow<>(this); InpFlow<T2> inpScalar = new InpFlow<>(this); ZipActor(Flux<T1> flux, Mono<T2> mono) { flux.subscribe(inpFlow); mono.subscribe(inpScalar); } @Override protected void runAction() throws Throwable { if (inpFlow.isCompleted()) { stop(); return; } T1 element1 = inpFlow.removeAndRequest(); T2 element2 = inpScalar.current(); runAction(element1, element2); } protected abstract void runAction(T1 element1, T2 element2); }
Merci pour sa suggestion que cela résoudrait probablement mon problème. Malheureusement, j'aurais besoin de lutter avec la "politique d'entreprise" de mon entreprise juste pour pouvoir inclure votre bibliothèque dans notre projet (par exemple, des problèmes de support / correctifs, etc.): (Inclure également une bibliothèque externe pour résoudre un problème peut ne pas être la meilleure approche si il existe d'autres solutions (pas aussi bonnes mais toujours) viables. Néanmoins, merci d'avoir posté, je garderai votre lib à l'esprit pour de futurs cas d'utilisation :)
Je ne suis pas sûr de comprendre d'où vous venez d'ici - le réacteur offre des capacités pour combiner des flux via les méthodes Flux.zip, etc. Par souci de transparence, vous devez également noter que vous êtes l'auteur de DF4J.
Vous pouvez le faire avec un simple changement
getMono(). flatMapMany(s -> getFlux().map(s1 -> s1 + " " + s)) .subscribe(System.out::println); Flux<String> getFlux() { return Flux.defer(() -> { System.out.println("init flux"); return Flux.just("a", "b", "c"); }); } Mono<String> getMono() { return Mono.defer(() -> { System.out.println("init Mono"); return Mono.just("sss"); }); }
si vous souhaitez utiliser zip, mais vous pouvez obtenir les mêmes résultats en utilisant flatmap
getFlux() .flatMap(s -> getMono() .map((s1 -> s + " " + s1))) .subscribe(System.out::println); } Flux<String> getFlux() { return Flux.just("a", "b", "c"); } Mono<String> getMono() { return Mono.just("mono"); }
dans les deux résultats est: un mono b mono c mono
MODIFIER Ok maintenant je comprends mieux. Pouvez-vous essayer cette solution.
getFlux() .flatMap(s -> Mono.zip(Mono.just(s),getMono(), (s1, s2) -> s1+" "+s2)) .subscribe(System.out::println); Flux<String> getFlux(){ return Flux.just("a","b","c"); } Mono<String> getMono(){ return Mono.just("mono"); }
Merci pour la réponse mais cela ne fonctionnera pas. J'ai édité ma question pour mieux expliquer mes attentes et pourquoi votre proposition ne répond pas à mes besoins.
Cela fonctionne mieux - appel unique en mono. Mais le code attend toujours que mono émette le résultat avant d'appeler la demande de flux. Je préférerais commencer les deux en même temps (la logique zip avec l'annulation des autres demandes si l'une échoue est également bien ici) mais il semble que ce n'est pas possible
Je pense que ce que vous essayez de réaliser pourrait être fait avec Flux.join
Voici un exemple de code:
Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)), Mono.just(2).delayElement(Duration.ofMillis(500))).log(); Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50)).log(); List<String> list = flux.join(mono, (v1) -> Flux.never(), (v2) -> Flux.never(), (x, y) -> { return x + y; }).collectList().block(); System.out.println(list);
Pourquoi pas une cartographie à plat via Mono, par exemple quelque chose comme ceci
Mono.just (1) .flux (). flatMap (v1 -> Flux.just (2, 3) .map (v2 -> v1 + v2));
Votre problème est celui-ci? flux 1 émettant 1,2,3,4 flux 2 émettant seulement un. Et vous êtes capable de traiter seulement 1, a quand vous voulez traiter 1, a - 2, a - 3, a - 4, a?
@efan - cela fonctionnerait mais (d'après ma compréhension de react) je ferai une demande à
Flux.just (2, 3)
uniquement lorsqueMono
émettra une valeur. Et je voudrais commencer les deux demandes en même temps et combiner les résultats lorsqu'ils sont disponibles. @RicardKollcaku - pas exactement. Je suis capable de traiter1, a 2, a 3, a ...
mais pour y parvenir, je produis une quantité absurde dea
en raison de la façon dontcache (). repeat ()
fonctionne.@RobertOzga avec votre édition, j'explique mieux pourquoi vous ne créerez pas de mono plusieurs fois. J'ai modifié la réponse pour que Mono ne soit créé qu'une seule fois