0
votes

Rxjs fait quelque chose lors de la première émission à partir de plusieurs abonnements

Existe-t-il un moyen propre de faire quelque chose lors de la première émission à partir de plusieurs abonnements?

par exemple :

1
2
Hello world !
1
2
1
1
2
1
2
2
...

Objectif de sortie :

this.subscription1 = this.service.getData1().subscribe(data => {
    this.data1 = data;
    console.log('1');
});

this.subscription2 = this.service.getData2().subscribe(data => {
    this.data2 = data2;
    console.log('2');
});

// Do something after first emit from subscription1 AND subscription2
doSomething();

...

doSomething() {
    console.log('Hello world !');
}


2 commentaires

Essayez combineLatest : learnrxjs.io/learn-rxjs/operators/combination/combinelatest


@HarunYilmaz Je pense que forkJoin serait un choix légèrement meilleur ici


3 Réponses :


0
votes

La stratégie la plus simple est d'avoir un 3e Observable qui effectuera cette action.

Voir l'exemple ci-dessous

const track1Subject$ = new Subject();
const track1$ = track1Subject$.asObservable();
const track2Subject$ = new Subject();
const track2$ = track2Subject$.asObservable();
const Observable1$ = timer(1000, 2000).pipe(
  map(() => 1),
  tap(console.log),
  tap(() => track1Subject$.next()),
  take(5)
);
const Observable2$ = timer(1700, 1700).pipe(
  map(() => 2),
  tap(console.log),
  tap(() => track2Subject$.next()),
  take(5)
);
const Observable3$ = combineLatest([track1$, track2$]).pipe(
  take(1),
  map(() => "Hello World!"),
  tap(console.log)
);
Observable1$.subscribe();
Observable2$.subscribe();
Observable3$.subscribe();

La sortie de la console est comme ci-dessous, car il y a deux abonnés à Observable1$ (c'est-à-dire Observable1$ et Observable3 $ same as two subscribers to Observable2 $ (ie Observable2 $ and Observable3$ nous voyons les journaux de console 1 1 2 2 'bonjour le monde. ».

entrez la description de l'image ici

Voici le lien vers le stackblitz

Dans ce qui précède, nous remarquons que nous obtenons 2 abonnements donc 2 journaux de console pour chacun. Pour résoudre ce problème, nous pouvons utiliser des Subjects pour générer de nouveaux observables et les combiner à la place

const Observable1$ = timer(1000, 2000).pipe(
  map(() => 1),
  tap(console.log)
);
const Observable2$ = timer(1700, 1700).pipe(
  map(() => 2),
  tap(console.log)
);
const Observable3$ = combineLatest([Observable1$, Observable2$]).pipe(
  take(1),
  map(() => "Hello World!"),
  tap(console.log)
);
Observable1$.subscribe();
Observable2$.subscribe();
Observable3$.subscribe();

entrez la description de l'image ici

Voir Lien vers la solution finale


0 commentaires

0
votes

Il y a eu plusieurs fois où j'ai également eu besoin d'un isFirst opérateur isFirst qui isFirst un prédicat que pour la première émission. Je l' ai giflé ensemble un opérateur personnalisé rapide qui utilise une seule variable d'état d' first décider si l'émission est en effet d' abord et exécuter un certain prédicat en utilisant le tap opérateur.

Comme il utilise le tap interne, il ne modifie en aucun cas l'émission de la source. Il exécute uniquement le prédicat passé lorsque l'émission est effectivement la première.

Essayez ce qui suit

opérateur isFirst()

this.subscription = combineLatest(
  this.service.getData1().pipe(
    tap({
      next: data => {
        this.data1 = data;
        console.log('1');
      }
    })
  ), 
  this.service.getData2().pipe(
    tap({
      next: data => {
        this.data2 = data;
        console.log('2');
      }
    })
  )
).pipe(
  isFirst(_ => { 
    console.log("first"); 
  })
).subscribe({
  next: r => console.log("inside subscription:", r)
});

Pour combiner plusieurs flux qui seront déclenchés lorsque l' une des sources émet, vous pouvez utiliser la fonction combineLatest de combineLatest .

Exemple

import { Component } from "@angular/core";

import { timer, Observable, Subject, combineLatest } from "rxjs";
import { tap, takeUntil } from "rxjs/operators";

@Component({
  selector: "my-app",
  template: `<button (mouseup)="stop$.next()">Stop</button>`
})
export class AppComponent {
  stop$ = new Subject<any>();

  constructor() {
    combineLatest(timer(2000, 1000), timer(3000, 500))
      .pipe(
        isFirst(_ => {
          console.log("first");
        }),
        takeUntil(this.stop$)
      )
      .subscribe({
        next: r => console.log("inside subscription:", r)
      });
  }
}

Exemple de travail: Stackblitz

Dans votre cas, cela pourrait ressembler à quelque chose comme

export const isFirst = (predicate: any) => {
  let first = true;
  return <T>(source: Observable<T>) => {
    return source.pipe(
      tap({
        next: _ => {
          if (first) {
            predicate();
            first = false;
          }
        }
      })
    );
  };
};


2 commentaires

Ça a l'air sympa! forkJoin ne serait-il pas préférable d'exécuter des abonnements en parallèle?


forkJoin n'émettra que lorsque les sources seront complètes. Donc, si vous deviez faire quelque chose dans l'abonnement ou certains opérateurs sous le forkJoin , ils ne seront pas touchés tant que toutes les observables ne seront pas terminées. Et le rappel d'abonnement n'obtiendra que le complet. De plus, combineLatest déclenche également toutes les observables sources en parallèle. La différence réside dans la manière dont les notifications sont émises aux opérateurs suivants et à l'abonnement.



0
votes

Avec quelques restrictions supplémentaires, ce problème devient plus facile. Malheureusement, des opérateurs tels que combineLatest, et zip ajoutent une structure supplémentaire à vos données. Je vais vous fournir une solution avec zip ci-dessous, mais cela ne s'étend pas du tout (si vous souhaitez ajouter plus de logique en aval de votre zip , vous n'avez pas de chance dans de nombreux cas).

Solution générale.

En supposant, cependant, que getData1 et getData2 sont complètement orthogonaux (comment ils émettent et comment ils sont consommés par votre application ne sont pas liés de manière prévisible), alors une solution à cela nécessitera plusieurs abonnements ou un opérateur personnalisé chargé de suivre émissions .

Il est presque certain que vous pouvez faire quelque chose de plus élégant que cela, mais c'est la solution la plus générale à laquelle je pourrais penser et qui répond à vos critères très généraux.

Ici, je fusionne les appels de service, étiquette chaque appel et passe par les émissions jusqu'à ce que chaque appel ait été émis au moins une fois.

zip(
  this.service.getData1().pipe(
    tap(datum => {
      console.log('1')
      this.data1 = datum;
    )
  ),
  this.service.getData2().pipe(
    tap(datum => {
      console.log('2')
      this.data2 = datum;
    )
  )
).pipe(
  map((payload, index) => {
    if(index === 0) doSomething();
    return payload;
  })
).subscribe();

Dans l'abonnement, nous devons séparer à nouveau les deux appels. Puisque vous définissez une variable globale, vous pouvez lancer cette logique comme effet secondaire dans l'opérateur tap pour chaque appel. Cela devrait avoir des résultats similaires.

merge(
  this.service.getData1().pipe(
    tap(datum => {
      console.log('1')
      this.data1 = datum;
    ),
    map(payload => ({fromData: 1, payload}))
  ),
  ...

La solution zip

Cette solution est beaucoup plus courte à écrire mais présente certains inconvénients.

merge(
  this.service.getData1().pipe(
    tap(_ => console.log('1')),
    map(payload => ({fromData: 1, payload}))
  ),
  this.service.getData2().pipe(
    tap(_ => console.log('2')),
    map(payload => ({fromData: 2, payload}))
  )
).pipe(
  // Custom Operator 
  s => defer(() => {
    let fromData1 = false;
    let fromData2 = false;
    let done = false;
    return s.pipe(
      tap(({fromData}) => {
        if(done) return;
        if(fromData === 1) fromData1 = true;
        if(fromData === 2) fromData2 = true;
        if(fromData1 && fromData2){
          done = true;
          doSomething();
        }
      })
    );
  })
).subscribe(({fromData, payload}) => {
  if(fromData === 1) this.data1 = payload;
  if(fromData === 2) this.data2 = payload;
});

Ce qui est transmis à votre abonnement, ce sont les appels de service couplés. Ici, vous devez absolument définir une variable globale comme effet secondaire de l'appel de service d'origine. L'option de le faire dans l'abonnement est perdue (à moins que vous ne vouliez les définir par paires).


0 commentaires