1
votes

Combinez Mono avec Flux

Je souhaite créer un service qui combine les résultats de deux sources réactives. L'un produit Mono et un autre produit Flux. Pour la fusion, j'ai besoin de la même valeur de mono pour chaque flux émis.

Pour l'instant, j'ai quelque chose comme ça

2020-01-20 12:55:20.505 INFO  [] [] [     parallel-1]  SampleTest  : flux started  
2020-01-20 12:55:20.508 INFO  [] [] [     parallel-2]  SampleTest  : mono started  
2020-01-20 12:55:21.523 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.528 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.529 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.529 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.529 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.529 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.530 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.530 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.530 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.530 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.532 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.532 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.532 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.532 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.535 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  

Cela me donne ce dont j'ai besoin,

  • service2 n'est appelé qu'une seule fois
  • Le contexte est fourni pour chaque configuration
  • le flux résultant a autant d'éléments que de configurations

Mais j'ai remarqué que repeat () émet une quantité massive d'éléments après la mise en cache du contexte. Est-ce un problème?

Est-ce que je peux faire quelque chose pour limiter le nombre de répétitions au nombre de configurations reçues, tout en faisant toujours les deux demandes simultanément? Ou ce n'est pas un problème et je peux ignorer en toute sécurité ces éléments supplémentaires émis?

J'ai essayé d'utiliser combineLatest mais en fonction du timing, certains éléments de la configuration peuvent être perdus et ne pas être traités .

MODIFIER

En regardant les suggestions de @Ricard Kollcaku, j'ai créé un exemple de test qui montre pourquoi ce n'est pas ce que je recherche.

2020-01-20 12:55:22.542 INFO  [] [] [     parallel-3]  SampleTest  : flux started  
2020-01-20 12:55:24.547 INFO  [] [] [     parallel-4]  SampleTest  : mono started  
2020-01-20 12:55:24.547 INFO  [] [] [     parallel-5]  SampleTest  : mono started  
2020-01-20 12:55:24.548 INFO  [] [] [     parallel-6]  SampleTest  : mono started  

expected: <1> but was: <3>

En regardant les résultats des tests test1 et test2

import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

public class SampleTest
{
    Logger LOG = LoggerFactory.getLogger(SampleTest.class);
    AtomicLong counter = new AtomicLong(0);

    Flux<String> getFlux()
    {
        return Flux.fromStream(() -> {
            LOG.info("flux started");
            sleep(1000);
            return Stream.of("a", "b", "c");
        }).subscribeOn(Schedulers.parallel());
    }

    Mono<String> getMono()
    {
        return Mono.defer(() -> {
            counter.incrementAndGet();
            LOG.info("mono started");
            sleep(1000);
            return Mono.just("mono");
        }).subscribeOn(Schedulers.parallel());
    }

    private void sleep(final long milis)
    {
        try
        {
            Thread.sleep(milis);
        }
        catch (final InterruptedException e)
        {
            e.printStackTrace();
        }
    }

    @Test
    void test0()
    {
        final Flux<String> result = Flux.zip(
                getFlux(),
                getMono().cache().repeat()
                         .doOnNext(n -> LOG.warn("signal on mono", n)),
                (s1, s2) -> s1 + " " + s2
        );

        assertResults(result);
    }

    @Test
    void test1()
    {
        final Flux<String> result =
                getFlux().flatMap(s -> Mono.zip(Mono.just(s), getMono(),
                        (s1, s2) -> s1 + " " + s2));
        assertResults(result);
    }

    @Test
    void test2()
    {
        final Flux<String> result = getFlux().flatMap(s -> getMono().map((s1 -> s + " " + s1)));
        assertResults(result);
    }

    void assertResults(final Flux<String> result)
    {
        final Flux<String> flux = result;

        StepVerifier.create(flux)
                    .expectNext("a mono")
                    .expectNext("b mono")
                    .expectNext("c mono")
                    .verifyComplete();

        Assertions.assertEquals(1L, counter.get());
    }

Je dois rejeter votre proposition. Dans les deux cas, getMono est - invoqué autant de fois que d'objets en flux - invoqué après l'arrivée du premier élément de flux Et ce sont des interactions que je veux éviter. Mes services font des requêtes http sous le capot et ils peuvent prendre du temps.

Ma solution actuelle n'a pas ce problème, mais si j'ajoute un logger à mon zip, j'obtiendrai ceci

Flux.zip(
   service1.getConfig(), //produces flux
   service2.getContext() //produces mono
           .cache().repeat()
)

Comme vous pouvez le voir, il y a beaucoup d'éléments émis en combinant cache (). Repeat () ensemble et je veux savoir si c'est un problème et si oui alors comment l'éviter (mais gardez l'invocation unique de l'invocation mono et parallèle).


4 commentaires

Pourquoi pas une cartographie à plat via Mono, par exemple quelque chose comme ceci Mono.just (1) .flux (). flatMap (v1 -> Flux.just (2, 3) .map (v2 -> v1 + v2));


Votre problème est celui-ci? flux 1 émettant 1,2,3,4 flux 2 émettant seulement un. Et vous êtes capable de traiter seulement 1, a quand vous voulez traiter 1, a - 2, a - 3, a - 4, a?


@efan - cela fonctionnerait mais (d'après ma compréhension de react) je ferai une demande à Flux.just (2, 3) uniquement lorsque Mono émettra une valeur. Et je voudrais commencer les deux demandes en même temps et combiner les résultats lorsqu'ils sont disponibles. @RicardKollcaku - pas exactement. Je suis capable de traiter 1, a 2, a 3, a ... mais pour y parvenir, je produis une quantité absurde de a en raison de la façon dont cache (). repeat () fonctionne.


@RobertOzga avec votre édition, j'explique mieux pourquoi vous ne créerez pas de mono plusieurs fois. J'ai modifié la réponse pour que Mono ne soit créé qu'une seule fois


3 Réponses :


0
votes

Les bibliothèques comme Project Reactor et RxJava essaient de fournir autant de combinaisons de leurs capacités que possible, mais ne donnent pas accès aux instruments de combinaison de capacités. Et par conséquent, il y a toujours des cas secondaires qui ne sont pas couverts.

Mon propre DF4J, pour autant que je sache, est la seule bibliothèque asynchrone qui offre les moyens de combiner des capacités. Par exemple, voici comment l'utilisateur peut compresser Flux et Mono: (bien sûr, cette classe ne fait pas partie de DF4J lui-même):

got:1 and:5
got:2 and:5
got:3 and:5

et voici comment elle peut être utilisée :

@Test
public void ZipActorTest() {
    Flux<Integer> flux = Flux.just(1,2,3);
    Mono<Integer> mono = Mono.just(5);
    ZipActor<Integer, Integer> actor = new ZipActor<Integer, Integer>(flux, mono){
        @Override
        protected void runAction(Integer element1, Integer element2) {
            System.out.println("got:"+element1+" and:"+element2);
        }
    };
    actor.start();
    actor.join();
}

La sortie de la console est la suivante:

import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

abstract class ZipActor<T1, T2> extends Actor {
    InpFlow<T1> inpFlow = new InpFlow<>(this);
    InpFlow<T2> inpScalar = new InpFlow<>(this);

    ZipActor(Flux<T1> flux, Mono<T2> mono) {
        flux.subscribe(inpFlow);
        mono.subscribe(inpScalar);
    }

    @Override
    protected void runAction() throws Throwable {
        if (inpFlow.isCompleted()) {
            stop();
            return;
        }
        T1 element1 = inpFlow.removeAndRequest();
        T2 element2 = inpScalar.current();
        runAction(element1, element2);
    }

    protected abstract void runAction(T1 element1, T2 element2);
}


2 commentaires

Merci pour sa suggestion que cela résoudrait probablement mon problème. Malheureusement, j'aurais besoin de lutter avec la "politique d'entreprise" de mon entreprise juste pour pouvoir inclure votre bibliothèque dans notre projet (par exemple, des problèmes de support / correctifs, etc.): (Inclure également une bibliothèque externe pour résoudre un problème peut ne pas être la meilleure approche si il existe d'autres solutions (pas aussi bonnes mais toujours) viables. Néanmoins, merci d'avoir posté, je garderai votre lib à l'esprit pour de futurs cas d'utilisation :)


Je ne suis pas sûr de comprendre d'où vous venez d'ici - le réacteur offre des capacités pour combiner des flux via les méthodes Flux.zip, etc. Par souci de transparence, vous devez également noter que vous êtes l'auteur de DF4J.



0
votes

Vous pouvez le faire avec un simple changement

   getMono().
            flatMapMany(s -> getFlux().map(s1 -> s1 + " " + s))
            .subscribe(System.out::println);


Flux<String> getFlux() {
    return Flux.defer(() -> {
        System.out.println("init flux");
        return Flux.just("a", "b", "c");
    });
}

Mono<String> getMono() {
    return Mono.defer(() -> {
        System.out.println("init Mono");
        return Mono.just("sss");
    });
}

si vous souhaitez utiliser zip, mais vous pouvez obtenir les mêmes résultats en utilisant flatmap

      getFlux()
            .flatMap(s -> getMono()
                    .map((s1 -> s + " " + s1)))
            .subscribe(System.out::println);
}

Flux<String> getFlux() {
    return Flux.just("a", "b", "c");
}

Mono<String> getMono() {
    return Mono.just("mono");
}

dans les deux résultats est: un mono b mono c mono

MODIFIER Ok maintenant je comprends mieux. Pouvez-vous essayer cette solution.

    getFlux()
    .flatMap(s -> Mono.zip(Mono.just(s),getMono(), (s1, s2) -> s1+" "+s2))
    .subscribe(System.out::println);

Flux<String> getFlux(){
    return Flux.just("a","b","c");
}
Mono<String> getMono(){
    return  Mono.just("mono");
}


2 commentaires

Merci pour la réponse mais cela ne fonctionnera pas. J'ai édité ma question pour mieux expliquer mes attentes et pourquoi votre proposition ne répond pas à mes besoins.


Cela fonctionne mieux - appel unique en mono. Mais le code attend toujours que mono émette le résultat avant d'appeler la demande de flux. Je préférerais commencer les deux en même temps (la logique zip avec l'annulation des autres demandes si l'une échoue est également bien ici) mais il semble que ce n'est pas possible



1
votes

Je pense que ce que vous essayez de réaliser pourrait être fait avec Flux.join

Voici un exemple de code:

Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
        Mono.just(2).delayElement(Duration.ofMillis(500))).log();

Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50)).log();

List<String> list = flux.join(mono, (v1) -> Flux.never(), (v2) -> Flux.never(), (x, y) -> {
    return x + y;
}).collectList().block();

System.out.println(list);


0 commentaires