3
votes

Opérateur RxJS waitUntil

a: 1---2-3-4--5---6
b: ------T---------

o: ------1234-5---6
Using RxJS, is there some operator that can accomplish the diagram above? I have stream A which is a random stream of events, given a stream B which has a single true event, can I have an output stream that doesn't emit anything until that true event, and then sends everything is had saved up until then and afterwards emits normally?I thought maybe I could use buffer(), but it seems like there is no way to do a one time buffer like this with that operator.

1 commentaires

Vous pouvez utiliser l' opérateur de délai pour cela


3 Réponses :


3
votes

<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>
const { concat, interval, of, from } = rxjs;
const { share, delay, toArray, takeUntil, mergeMap } = rxjs.operators;

const waitUntil = signal$ => source$ => {
  const sharedSource$ = source$.pipe(share());
  return concat(
    sharedSource$.pipe(
      takeUntil(signal$),
      toArray(),
      mergeMap(from)
    ),
    sharedSource$
  );
}

const stopWaiting$ = of('signal').pipe(delay(2000));

const source$ = interval(500).pipe(
    waitUntil(stopWaiting$)
).subscribe(console.log);


0 commentaires

2
votes

Je pense que la solution de @ ZahiC est correcte, mais personnellement, je le ferais en une seule chaîne en utilisant l'opérateur multicast .

a$.pipe(
  multicast(new Subject(), s => concat(
    s.pipe(
      buffer(b$),
      take(1),
    ),
    s
  )),
)

multicast va essentiellement cracher le flux en deux où concat s'abonnera d'abord au premier qui est mis en mémoire tampon jusqu'à ce que b$ émette. Ensuite, il se termine immédiatement car take(1) et concat s'abonnent à nouveau au même steam mais cette fois sans tampon.


0 commentaires

0
votes

Voici ma solution, en utilisant TypeScript:

a$.pipe(
    queueUntil(b$)
)

et peut être utilisé comme ceci:

export const queueUntil = <T>(signal$: Observable<any>) => (source$: Observable<T>) => {
    let shouldBuffer = true;

    return source$.pipe(
        bufferWhen(() => shouldBuffer ? signal$.pipe(
            tap(() => shouldBuffer = false),
        ) : source$),
        concatMap(v => v),
    );
};


0 commentaires