a: 1---2-3-4--5---6 b: ------T--------- o: ------1234-5---6 Using RxJS, is there some operator that can accomplish the diagram above? I have stream A which is a random stream of events, given a stream B which has a single true event, can I have an output stream that doesn't emit anything until that true event, and then sends everything is had saved up until then and afterwards emits normally?I thought maybe I could use buffer(), but it seems like there is no way to do a one time buffer like this with that operator.
3 Réponses :
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>
const { concat, interval, of, from } = rxjs;
const { share, delay, toArray, takeUntil, mergeMap } = rxjs.operators;
const waitUntil = signal$ => source$ => {
const sharedSource$ = source$.pipe(share());
return concat(
sharedSource$.pipe(
takeUntil(signal$),
toArray(),
mergeMap(from)
),
sharedSource$
);
}
const stopWaiting$ = of('signal').pipe(delay(2000));
const source$ = interval(500).pipe(
waitUntil(stopWaiting$)
).subscribe(console.log);Je pense que la solution de @ ZahiC est correcte, mais personnellement, je le ferais en une seule chaîne en utilisant l'opérateur multicast .
a$.pipe(
multicast(new Subject(), s => concat(
s.pipe(
buffer(b$),
take(1),
),
s
)),
)
multicast va essentiellement cracher le flux en deux où concat s'abonnera d'abord au premier qui est mis en mémoire tampon jusqu'à ce que b$ émette. Ensuite, il se termine immédiatement car take(1) et concat s'abonnent à nouveau au même steam mais cette fois sans tampon.
Voici ma solution, en utilisant TypeScript:
a$.pipe(
queueUntil(b$)
)
et peut être utilisé comme ceci:
export const queueUntil = <T>(signal$: Observable<any>) => (source$: Observable<T>) => {
let shouldBuffer = true;
return source$.pipe(
bufferWhen(() => shouldBuffer ? signal$.pipe(
tap(() => shouldBuffer = false),
) : source$),
concatMap(v => v),
);
};
Vous pouvez utiliser l' opérateur de délai pour cela