12
votes

Comment Rx se comporte-t-il lorsque le flux de données est-il plus rapide que les abonnés peuvent consommer?

Je suis très excité de l'utilisation de RX dans la demande de production; où je vais écouter des mises à jour de notification entrantes provenant de différents canaux.

Je vais écrire une requête RX sur le dessus de ce flux où je vais étrangler à l'aide de l'opérateur .Window (). Abonné (dans mon cas c'est ActionBlock) traitera ces données en mode blocage; (c'est-à-dire que cela ne va pas accéder à la tâche de ActionBlock). Garder au-dessus à l'esprit si les données se présentent beaucoup plus rapidement que ce que mon abonné peut consommer, alors ce qui va arriver à des données entrantes. La requête RX utilise-t-elle un tampon en interne? sera-t-il débordé?


2 commentaires

Avez-vous absolument besoin de traiter chaque événement ou est-il possible d'ignorer certains d'entre eux (même si cela prend une logique fantaisie)?


Puisque vous utilisez déjà Dataflow en tant que consommateur, pourquoi ne pas l'utiliser aussi comme producteur? Il prend en charge un peu d'étranglement.


3 Réponses :


1
votes

Si l'abonné traite sur le même thread que l'émission observable, les données ne peuvent pas être plus rapides que l'abonné peuvent consommer. xxx

dans cet exemple, chaque int qui émet des données sera écrit sur la console avant que le prochain int est émis.

Si l'abonnement traverse des filets, alors ce qui précède ne tient pas.


2 commentaires
9
votes

Le phénomène que vous faites référence est appelé la pression arrière et l'équipe RX explore actuellement différentes façons de gérer cette situation. Une solution pourrait communiquer la pression arrière à l'observable de sorte qu'il pourrait "ralentir".

Pour atténuer la pression arrière, vous pouvez utiliser des opérateurs de perte tels que la manette des gaz ou l'échantillon.

La réponse de Timothy est principalement correcte, mais il est est possible d'avoir une contre-pression sur un seul fil. Cela peut arriver si vous utilisez du code asynchrone. En ce sens, la contre-pression est liée à la synchronisation et à la planification, pas de filetage (rappel que par défaut RX est uniforme à filetage).

Si vous rencontrez un scénario dans lequel les événements sont produits plus rapidement qu'ils ne peuvent être consommés et que vous n'utilisez pas un opérateur de perte pour atténuer la pression arrière, ces éléments sont généralement programmés / en file d'attente / tamponnées, qui peuvent conduire à beaucoup d'allocation de mémoire.

Personnellement, cela n'a pas été question pour moi, car généralement des événements sont traités plus rapidement qu'ils ne sont cédés, sinon la perte d'événements n'est tout simplement pas une option, et donc la consommation de mémoire supplémentaire est inévitable.


7 commentaires

Merci pour votre réponse; "Ces articles sont généralement programmés / en file d'attente / tamponnés, ce qui peut conduire à beaucoup d'allocation de mémoire" - alors ma question précédente était donc entourant cela; Où ce travail est tamponné (évidemment à l'intérieur de la mémoire); Mais car le développeur a-t-il géré ce tampon; et si nous pouvons dicter quelque chose de lié à cette mise en mémoire tampon de messages en tant que développeur. Merci


Vous devez utiliser un opérateur avec perte, comme le dit Chris - jetez un coup d'œil à cette approche pour reliever la pression arrière pour plus d'informations: Zerobugbuild.com/?p=192


Exactement. Vous avez quatre options. 1: Ralentissez la source, 2: tampon des événements (contre-pression), 3: accélérer le consommateur, 4: perdre des événements. Pour les opérateurs intégrés, le seul contrôle que vous avez sur Comment les événements sont programmés consiste à injecter un planificateur spécifique . Cela fournit suffisamment de contrôle dans certaines circonstances, mais de mon expérience sont programmés sur une base de souscription, pas une base par objet. Certains opérateurs ont également des surcharges de sélecteur, ce qui pourrait aider dans certaines zones. Si vous avez besoin d'un contrôle encore plus que cela, vous devrez peut-être écrire des opérateurs personnalisés.


Dans mon cas, je ne peux pas me permettre de perdre et des messages entrants car ils sont des mises à jour ...


Les positions sont-elles absolues? Si tel est le cas, vous pourriez, en théorie, perdez certaines des mises à jour lors d'une perte de douceur / précision. Nous pourrions aider plus si nous savions exactement quel était le problème.


Les positions de Chris sont indépendantes les unes des autres; Ce ne sont pas des tickers et je ne peux pas me permettre d'utiliser des opérateurs de perte. Il semble que ma seule solution utilise la mémoire tampon qui stockera toutes les mises à jour de position entrantes.


Vous devriez déterminer si le taux de notifications entrantes est vraiment aussi grand. Comme Chris l'a dit, ce n'est pas un problème qui vient souvent. Habituellement, quand il fait, c'est à cause d'un consommateur lent plutôt qu'à un producteur qui est si rapide rien ne pouvait éventuellement suivre. Des exemples de consommateurs lents sont tout ce qui concerne le réseau, la base de données, l'interface utilisateur, même les fichiers locaux, etc. Si vous faites de l'un d'entre eux dans votre observateur, vous devriez envisager de faire tamponner les mises à jour et la fusion du traitement si vous ne pouvez pas utiliser les opérateurs à perte de temps. . Profilage du code d'observateur en vaut la peine.