Existe-t-il un moyen propre de faire quelque chose lors de la première émission à partir de plusieurs abonnements?
par exemple :
1 2 Hello world ! 1 2 1 1 2 1 2 2 ...
Objectif de sortie :
this.subscription1 = this.service.getData1().subscribe(data => { this.data1 = data; console.log('1'); }); this.subscription2 = this.service.getData2().subscribe(data => { this.data2 = data2; console.log('2'); }); // Do something after first emit from subscription1 AND subscription2 doSomething(); ... doSomething() { console.log('Hello world !'); }
3 Réponses :
La stratégie la plus simple est d'avoir un 3e Observable
qui effectuera cette action.
Voir l'exemple ci-dessous
const track1Subject$ = new Subject(); const track1$ = track1Subject$.asObservable(); const track2Subject$ = new Subject(); const track2$ = track2Subject$.asObservable(); const Observable1$ = timer(1000, 2000).pipe( map(() => 1), tap(console.log), tap(() => track1Subject$.next()), take(5) ); const Observable2$ = timer(1700, 1700).pipe( map(() => 2), tap(console.log), tap(() => track2Subject$.next()), take(5) ); const Observable3$ = combineLatest([track1$, track2$]).pipe( take(1), map(() => "Hello World!"), tap(console.log) ); Observable1$.subscribe(); Observable2$.subscribe(); Observable3$.subscribe();
La sortie de la console est comme ci-dessous, car il y a deux abonnés à Observable1$
(c'est-à-dire Observable1$
et Observable3 $ same as two subscribers to
Observable2 $ (ie
Observable2 $ and Observable3$
nous voyons les journaux de console 1 1 2 2 'bonjour le monde. ».
Voici le lien vers le stackblitz
Dans ce qui précède, nous remarquons que nous obtenons 2 abonnements donc 2 journaux de console pour chacun. Pour résoudre ce problème, nous pouvons utiliser des Subjects
pour générer de nouveaux observables et les combiner à la place
const Observable1$ = timer(1000, 2000).pipe( map(() => 1), tap(console.log) ); const Observable2$ = timer(1700, 1700).pipe( map(() => 2), tap(console.log) ); const Observable3$ = combineLatest([Observable1$, Observable2$]).pipe( take(1), map(() => "Hello World!"), tap(console.log) ); Observable1$.subscribe(); Observable2$.subscribe(); Observable3$.subscribe();
Il y a eu plusieurs fois où j'ai également eu besoin d'un isFirst
opérateur isFirst
qui isFirst
un prédicat que pour la première émission. Je l' ai giflé ensemble un opérateur personnalisé rapide qui utilise une seule variable d'état d' first
décider si l'émission est en effet d' abord et exécuter un certain prédicat en utilisant le tap
opérateur.
Comme il utilise le tap
interne, il ne modifie en aucun cas l'émission de la source. Il exécute uniquement le prédicat passé lorsque l'émission est effectivement la première.
Essayez ce qui suit
opérateur isFirst()
this.subscription = combineLatest( this.service.getData1().pipe( tap({ next: data => { this.data1 = data; console.log('1'); } }) ), this.service.getData2().pipe( tap({ next: data => { this.data2 = data; console.log('2'); } }) ) ).pipe( isFirst(_ => { console.log("first"); }) ).subscribe({ next: r => console.log("inside subscription:", r) });
Pour combiner plusieurs flux qui seront déclenchés lorsque l' une des sources émet, vous pouvez utiliser la fonction combineLatest de combineLatest
.
Exemple
import { Component } from "@angular/core"; import { timer, Observable, Subject, combineLatest } from "rxjs"; import { tap, takeUntil } from "rxjs/operators"; @Component({ selector: "my-app", template: `<button (mouseup)="stop$.next()">Stop</button>` }) export class AppComponent { stop$ = new Subject<any>(); constructor() { combineLatest(timer(2000, 1000), timer(3000, 500)) .pipe( isFirst(_ => { console.log("first"); }), takeUntil(this.stop$) ) .subscribe({ next: r => console.log("inside subscription:", r) }); } }
Exemple de travail: Stackblitz
Dans votre cas, cela pourrait ressembler à quelque chose comme
export const isFirst = (predicate: any) => { let first = true; return <T>(source: Observable<T>) => { return source.pipe( tap({ next: _ => { if (first) { predicate(); first = false; } } }) ); }; };
Ça a l'air sympa! forkJoin
ne serait-il pas préférable d'exécuter des abonnements en parallèle?
forkJoin
n'émettra que lorsque les sources seront complètes. Donc, si vous deviez faire quelque chose dans l'abonnement ou certains opérateurs sous le forkJoin
, ils ne seront pas touchés tant que toutes les observables ne seront pas terminées. Et le rappel d'abonnement n'obtiendra que le complet. De plus, combineLatest
déclenche également toutes les observables sources en parallèle. La différence réside dans la manière dont les notifications sont émises aux opérateurs suivants et à l'abonnement.
Avec quelques restrictions supplémentaires, ce problème devient plus facile. Malheureusement, des opérateurs tels que combineLatest,
et zip
ajoutent une structure supplémentaire à vos données. Je vais vous fournir une solution avec zip
ci-dessous, mais cela ne s'étend pas du tout (si vous souhaitez ajouter plus de logique en aval de votre zip
, vous n'avez pas de chance dans de nombreux cas).
En supposant, cependant, que getData1
et getData2
sont complètement orthogonaux (comment ils émettent et comment ils sont consommés par votre application ne sont pas liés de manière prévisible), alors une solution à cela nécessitera plusieurs abonnements ou un opérateur personnalisé chargé de suivre émissions .
Il est presque certain que vous pouvez faire quelque chose de plus élégant que cela, mais c'est la solution la plus générale à laquelle je pourrais penser et qui répond à vos critères très généraux.
Ici, je fusionne les appels de service, étiquette chaque appel et passe par les émissions jusqu'à ce que chaque appel ait été émis au moins une fois.
zip( this.service.getData1().pipe( tap(datum => { console.log('1') this.data1 = datum; ) ), this.service.getData2().pipe( tap(datum => { console.log('2') this.data2 = datum; ) ) ).pipe( map((payload, index) => { if(index === 0) doSomething(); return payload; }) ).subscribe();
Dans l'abonnement, nous devons séparer à nouveau les deux appels. Puisque vous définissez une variable globale, vous pouvez lancer cette logique comme effet secondaire dans l'opérateur tap pour chaque appel. Cela devrait avoir des résultats similaires.
merge( this.service.getData1().pipe( tap(datum => { console.log('1') this.data1 = datum; ), map(payload => ({fromData: 1, payload})) ), ...
zip
Cette solution est beaucoup plus courte à écrire mais présente certains inconvénients.
merge( this.service.getData1().pipe( tap(_ => console.log('1')), map(payload => ({fromData: 1, payload})) ), this.service.getData2().pipe( tap(_ => console.log('2')), map(payload => ({fromData: 2, payload})) ) ).pipe( // Custom Operator s => defer(() => { let fromData1 = false; let fromData2 = false; let done = false; return s.pipe( tap(({fromData}) => { if(done) return; if(fromData === 1) fromData1 = true; if(fromData === 2) fromData2 = true; if(fromData1 && fromData2){ done = true; doSomething(); } }) ); }) ).subscribe(({fromData, payload}) => { if(fromData === 1) this.data1 = payload; if(fromData === 2) this.data2 = payload; });
Ce qui est transmis à votre abonnement, ce sont les appels de service couplés. Ici, vous devez absolument définir une variable globale comme effet secondaire de l'appel de service d'origine. L'option de le faire dans l'abonnement est perdue (à moins que vous ne vouliez les définir par paires).
Essayez
combineLatest
: learnrxjs.io/learn-rxjs/operators/combination/combinelatest@HarunYilmaz Je pense que
forkJoin
serait un choix légèrement meilleur ici