J'ai des événements que je voudrais mettre en mémoire tampon, mais je voudrais mettre en mémoire tampon uniquement après le premier élément.
[------bufferTime------] Input over time: [1, 2, 3, -------------|---4, 5, 6 ----------------] Output over time: [1]-----------------[2,3]---[4]------------------[5,6]
y a-t-il un moyen de le faire?
4 Réponses :
Vous pouvez utiliser multicast
pour diviser le flux en deux et simplement passer la première valeur.
import { concat, Subject } from ârxjsâ; import { multicast, take, bufferCount } from ârxjs/operatorsâ; source.pipe( multicast( new Subject(), s => concat( s.pipe(take(1)), s.pipe(bufferCount(X)), ) ), );
Wow c'est cool! Pour mon esprit simple, j'ai trouvé la réponse de Dan plus facile à comprendre. Celui-ci est toujours plus sexy :)
Je ne peux pas discuter avec ça @Kos
Seul le "1" est émis par lui-même, cela ne fonctionne pas pour le "4", voir codesandbox.io/s/focused-goodall-4r50e?file=/src/index.js
Je pense que cela peut être résolu en divisant votre flux en deux, firstValue $ et afterFirstValue $, puis en les fusionnant.
import { merge, Subject } from 'rxjs'; import { take, skip, bufferTime } from 'rxjs/operators'; import { Source } from 'webpack-sources'; ... source$ = new Subject(); firstValue$ = source$.pipe( take(1) ); afterFirstValue$ = source$.pipe( skip(1), bufferTime(5000) ); merge(firstValue$, afterFirstValue$) .subscribe(result => { // Do something }); source$.next(1); source$.next(1); source$.next(1);
Donc je l'ai fait pour que la source originale soit un sujet ici. Ce n'est pas exactement comme vous l'avez décrit, mais je pense que c'est peut-être ce que vous voulez.
import { merge } from 'rxjs'; import { take, skip, bufferTime } from 'rxjs/operators'; ... firstValue$ = source$.pipe( take(1) ); afterFirstValue$ = source$.pipe( skip(1), bufferTime(5000) ); merge(firstValue$, afterFirstValue$) .subscribe(result => { // Do something });
Merci, Dan & @martin, j'ai compris les deux approches. Comme je suis un peu nouveau dans Rx, je reste avec Dan pour l'instant, mais j'essaierai l'autre :) Maintenant j'ai du mal à l'intégrer dans mon code, pouvez-vous m'aider? J'ai besoin de la fusion de ces méthodes pour être un sujet, comme indiqué sur ce violon: jsfiddle.net/duj4htfa Comment puis-je transformer cet observable (de la fusion) en un sujet?
@melloc J'ai donc ajouté une réponse à cela également, voir ci-dessus. Veuillez me dire si ce n'était pas ce que vous vouliez. De plus, je vois dans votre violon que vous utilisez l'ancienne syntaxe pour beaucoup de choses RxJS. Par exemple, "buffered $ .skip (1)" est "buffered $ .pipe (skip (1))" avec la nouvelle syntaxe et "new Rx.Subject ()" est "new Subject ()".
Cool! Merci SnorreDan, mais take (1) ne fonctionnera qu'une seule fois, non? Je voulais que cela fonctionne sur chaque rafale d'événement, peut-être que je dois m'en tenir à l'option @martin? testé sur: codesandbox.io/s/1zl3q667ml
Oui, la prise (1) permet de ne prendre qu'un seul événement. Mais alors nous avons afterFirstValue $ qui prend en compte tous les autres événements. Ensuite, nous les fusionnons ensemble, de sorte que nous obtiendrons tous les événements de l'abonnement après la fusion.
Je pense que je vous comprends probablement mal quelque part @melloc
Essayez de lire mon commentaire ci-dessus et regardez l'exemple mis à jour sur la description. Je ne sais pas si je peux faire ça avec Rx, je vais peut-être écrire ma propre fonction
Je pense que vous pouvez également y parvenir. Les opérateurs Window, windowCount ou windowTime peuvent être utiles. La manière exacte de procéder dépend de la manière dont vous sélectionnez quel élément est le premier élément avant chaque tampon. Cela dépend-il du temps, s'agit-il toujours des premières valeurs 1, puis 2, ou y a-t-il une valeur spéciale qui indique que c'est la première valeur?
Non, l'algorithme serait quelque chose comme ceci: new [event]: si addToBuffer == false émet une sortie (prise 1), addToBuffer = false, sinon ajouter au tampon, l'abonné du tampon devra attribuer false code > au drapeau
addToBuffer
Je ne sais pas si cela sera compréhensible, haha je vais essayer d'écrire ceci sur un jsfiddle
Je pense que je l'ai fait fonctionner, vérifiez ceci @SnorreDan: codesandbox.io/s/1r8jzwm764
J'ai eu de très bonnes réponses qui ont éclairé ma vision du problème et m'ont amené à trouver la vraie chose dont j'avais besoin, c'était quelque chose comme ça:
function getLeadingBufferSubject (bufferTimeArg) { const source = new Subject() const result = new Subject() let didOutputLeading = false const buffered$ = source .pipe(bufferTime(bufferTimeArg)) .pipe(filter(ar => ar.length > 0)) .pipe(map(ar => [...new Set(ar)])) buffered$.subscribe(v => { didOutputLeading = false const slicedArray = v.slice(1) // emits buffered values (except the first) and set flag to false if (.length > 0) result.next(v.slice(1)) }) // emits first value if buffer is empty source.subscribe(v => { if (!didOutputLeading) { didOutputLeading = true result.next(v) } }) // call .next(value) on "source" // subscribe for results on "result" return { source, result } }
Vous souhaitez simplement proposer une autre solution, cela vous aide-t-il? stackoverflow.com/a/63994576/8578281
J'ai eu le même problème et après avoir joué avec, j'ai trouvé cette solution supplémentaire:
source$.pipe( buffer(source$.pipe( throttleTime(bufferTime, asyncScheduler, {leading: true, trailing: true}), delay(10) // <-- This here bugs me like crazy though! ) )
Parce que throttle
comporte déjà un leader code>, vous pouvez simplement l'utiliser pour déclencher manuellement les émissions de tampon.
J'aimerais vraiment me débarrasser de ce délai
ici. Ceci est nécessaire car l'observable interne est d'abord déclenchée, ce qui entraîne l'émission prématurée du tampon.