a: 1---2-3-4--5---6 b: ------T--------- o: ------1234-5---6 Using RxJS, is there some operator that can accomplish the diagram above? I have stream A which is a random stream of events, given a stream B which has a single true event, can I have an output stream that doesn't emit anything until that true event, and then sends everything is had saved up until then and afterwards emits normally?I thought maybe I could use buffer(), but it seems like there is no way to do a one time buffer like this with that operator.
3 Réponses :
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>
const { concat, interval, of, from } = rxjs; const { share, delay, toArray, takeUntil, mergeMap } = rxjs.operators; const waitUntil = signal$ => source$ => { const sharedSource$ = source$.pipe(share()); return concat( sharedSource$.pipe( takeUntil(signal$), toArray(), mergeMap(from) ), sharedSource$ ); } const stopWaiting$ = of('signal').pipe(delay(2000)); const source$ = interval(500).pipe( waitUntil(stopWaiting$) ).subscribe(console.log);
Je pense que la solution de @ ZahiC est correcte, mais personnellement, je le ferais en une seule chaîne en utilisant l'opérateur multicast
.
a$.pipe( multicast(new Subject(), s => concat( s.pipe( buffer(b$), take(1), ), s )), )
multicast
va essentiellement cracher le flux en deux où concat
s'abonnera d'abord au premier qui est mis en mémoire tampon jusqu'à ce que b$
émette. Ensuite, il se termine immédiatement car take(1)
et concat
s'abonnent à nouveau au même steam mais cette fois sans tampon.
Voici ma solution, en utilisant TypeScript:
a$.pipe( queueUntil(b$) )
et peut être utilisé comme ceci:
export const queueUntil = <T>(signal$: Observable<any>) => (source$: Observable<T>) => { let shouldBuffer = true; return source$.pipe( bufferWhen(() => shouldBuffer ? signal$.pipe( tap(() => shouldBuffer = false), ) : source$), concatMap(v => v), ); };
Vous pouvez utiliser l' opérateur de délai pour cela