2
votes

BufferTime avec option principale

J'ai des événements que je voudrais mettre en mémoire tampon, mais je voudrais mettre en mémoire tampon uniquement après le premier élément.

[------bufferTime------]

Input over time:
[1, 2, 3, -------------|---4, 5, 6 ----------------]


Output over time:
[1]-----------------[2,3]---[4]------------------[5,6]

y a-t-il un moyen de le faire?


0 commentaires

4 Réponses :


2
votes

Vous pouvez utiliser multicast pour diviser le flux en deux et simplement passer la première valeur.

import { concat, Subject } from “rxjs”;
import { multicast, take, bufferCount } from “rxjs/operators”;

source.pipe(
  multicast(
    new Subject(),
    s => concat(
      s.pipe(take(1)),
      s.pipe(bufferCount(X)),
    )
  ),
);


3 commentaires

Wow c'est cool! Pour mon esprit simple, j'ai trouvé la réponse de Dan plus facile à comprendre. Celui-ci est toujours plus sexy :)


Je ne peux pas discuter avec ça @Kos


Seul le "1" est émis par lui-même, cela ne fonctionne pas pour le "4", voir codesandbox.io/s/focused-goodall-4r50e?file=/src/index.js



3
votes

Je pense que cela peut être résolu en divisant votre flux en deux, firstValue $ et afterFirstValue $, puis en les fusionnant.

  import { merge, Subject } from 'rxjs';
  import { take, skip, bufferTime } from 'rxjs/operators';
  import { Source } from 'webpack-sources';

  ...

  source$ = new Subject();

  firstValue$ = source$.pipe(
    take(1)
  );

  afterFirstValue$ = source$.pipe(
    skip(1),
    bufferTime(5000)
  );

  merge(firstValue$, afterFirstValue$)
    .subscribe(result => {
      // Do something
    });

  source$.next(1);
  source$.next(1);
  source$.next(1);

Réponse à la question de suivi concernant le sujet h2>

Donc je l'ai fait pour que la source originale soit un sujet ici. Ce n'est pas exactement comme vous l'avez décrit, mais je pense que c'est peut-être ce que vous voulez.

import { merge } from 'rxjs';
import { take, skip, bufferTime } from 'rxjs/operators';

...

firstValue$ = source$.pipe(
  take(1)
);

afterFirstValue$ = source$.pipe(
  skip(1),
  bufferTime(5000)
);

merge(firstValue$, afterFirstValue$)
  .subscribe(result => {
    // Do something
  });


9 commentaires

Merci, Dan & @martin, j'ai compris les deux approches. Comme je suis un peu nouveau dans Rx, je reste avec Dan pour l'instant, mais j'essaierai l'autre :) Maintenant j'ai du mal à l'intégrer dans mon code, pouvez-vous m'aider? J'ai besoin de la fusion de ces méthodes pour être un sujet, comme indiqué sur ce violon: jsfiddle.net/duj4htfa Comment puis-je transformer cet observable (de la fusion) en un sujet?


@melloc J'ai donc ajouté une réponse à cela également, voir ci-dessus. Veuillez me dire si ce n'était pas ce que vous vouliez. De plus, je vois dans votre violon que vous utilisez l'ancienne syntaxe pour beaucoup de choses RxJS. Par exemple, "buffered $ .skip (1)" est "buffered $ .pipe (skip (1))" avec la nouvelle syntaxe et "new Rx.Subject ()" est "new Subject ()".


Cool! Merci SnorreDan, mais take (1) ne fonctionnera qu'une seule fois, non? Je voulais que cela fonctionne sur chaque rafale d'événement, peut-être que je dois m'en tenir à l'option @martin? testé sur: codesandbox.io/s/1zl3q667ml


Oui, la prise (1) permet de ne prendre qu'un seul événement. Mais alors nous avons afterFirstValue $ qui prend en compte tous les autres événements. Ensuite, nous les fusionnons ensemble, de sorte que nous obtiendrons tous les événements de l'abonnement après la fusion.


Je pense que je vous comprends probablement mal quelque part @melloc


Essayez de lire mon commentaire ci-dessus et regardez l'exemple mis à jour sur la description. Je ne sais pas si je peux faire ça avec Rx, je vais peut-être écrire ma propre fonction


Je pense que vous pouvez également y parvenir. Les opérateurs Window, windowCount ou windowTime peuvent être utiles. La manière exacte de procéder dépend de la manière dont vous sélectionnez quel élément est le premier élément avant chaque tampon. Cela dépend-il du temps, s'agit-il toujours des premières valeurs 1, puis 2, ou y a-t-il une valeur spéciale qui indique que c'est la première valeur?


Non, l'algorithme serait quelque chose comme ceci: new [event]: si addToBuffer == false émet une sortie (prise 1), addToBuffer = false, sinon ajouter au tampon, l'abonné du tampon devra attribuer false au drapeau addToBuffer Je ne sais pas si cela sera compréhensible, haha ​​je vais essayer d'écrire ceci sur un jsfiddle


Je pense que je l'ai fait fonctionner, vérifiez ceci @SnorreDan: codesandbox.io/s/1r8jzwm764



0
votes

J'ai eu de très bonnes réponses qui ont éclairé ma vision du problème et m'ont amené à trouver la vraie chose dont j'avais besoin, c'était quelque chose comme ça:

function getLeadingBufferSubject (bufferTimeArg) {
    const source = new Subject()
    const result = new Subject()

    let didOutputLeading = false

    const buffered$ = source
        .pipe(bufferTime(bufferTimeArg))
        .pipe(filter(ar => ar.length > 0))
        .pipe(map(ar => [...new Set(ar)]))

    buffered$.subscribe(v => {
        didOutputLeading = false
        const slicedArray = v.slice(1)

        // emits buffered values (except the first)  and set flag to false
        if (.length > 0) result.next(v.slice(1))
    })

    // emits first value if buffer is empty
    source.subscribe(v => {
        if (!didOutputLeading) {
            didOutputLeading = true
             result.next(v)
        }
    })

    // call .next(value) on "source"
    // subscribe for results on "result"
    return {
        source,
        result
    }
}


1 commentaires

Vous souhaitez simplement proposer une autre solution, cela vous aide-t-il? stackoverflow.com/a/63994576/8578281



0
votes

J'ai eu le même problème et après avoir joué avec, j'ai trouvé cette solution supplémentaire:

source$.pipe(
  buffer(source$.pipe(
    throttleTime(bufferTime, asyncScheduler, {leading: true, trailing: true}),
    delay(10) // <-- This here bugs me like crazy though!
  )
)

Parce que throttle comporte déjà un leader code>, vous pouvez simplement l'utiliser pour déclencher manuellement les émissions de tampon.

J'aimerais vraiment me débarrasser de ce délai ici. Ceci est nécessaire car l'observable interne est d'abord déclenchée, ce qui entraîne l'émission prématurée du tampon.


0 commentaires