Je suis très excité de l'utilisation de RX dans la demande de production; où je vais écouter des mises à jour de notification entrantes provenant de différents canaux. p>
Je vais écrire une requête RX sur le dessus de ce flux où je vais étrangler à l'aide de l'opérateur .Window (). Abonné (dans mon cas c'est ActionBlock) traitera ces données en mode blocage; (c'est-à-dire que cela ne va pas accéder à la tâche de ActionBlock). Garder au-dessus à l'esprit si les données se présentent beaucoup plus rapidement que ce que mon abonné peut consommer, alors ce qui va arriver à des données entrantes. La requête RX utilise-t-elle un tampon en interne? sera-t-il débordé? P>
3 Réponses :
Si l'abonné traite sur le même thread que l'émission observable, les données ne peuvent pas être plus rapides que l'abonné peuvent consommer. dans cet exemple, chaque Si l'abonnement traverse des filets, alors ce qui précède ne tient pas. p> p> int code> qui émet des données
code> sera écrit sur la console avant que le prochain
int code> est émis. p>
Alors; Qu'est-ce que vous dites en tant que requête RX et abonné (c'est-à-dire Observateur) toujours exécuté sur le même thread? Si je suis iobservable sur FileStream; Comment cela se comportera-t-il? Parce que FileStream est indépendant de Iobservable et peut donner aux données si la requête RX assis sur elle est en train d'exécuter ou non ...
De quoi vous parlez dans ce commentaire est la différence entre les observables chauds et les observables à froid. Jetez un coup d'œil à cette question:
Ceci est effectivement bas à la mise en œuvre d'opérateurs individuels, mais les bâtiments intégrés seront tampon sur une base EM> par abonnement EM>, donc un consommateur lent ne bloquera pas d'autres abonnés, à moins bien sûr Les abonnés partagent des threads. P>
sur une note connexe, RX ne protège pas toujours la grammaire RX; Par exemple, il est de votre responsabilité de vous assurer que vous ne faites pas d'appels concurrents à ONNext sur un sujet. Vous pouvez utiliser
Est-ce que cela crée-t-il à l'interne plus d'une instance d'abonné ou une fois-t-il toujours?
Au lieu de "abonné", dites "Observateur".
votes
observable.synchroniser () code> pour résoudre ce problème. P>
iobservable code> Un
S'abonner CODE> La méthode accepte un
iobserver code>. Les surcharges qui acceptent les délégués sont en fait des méthodes d'extension qui créent un observateur anonyme dans les coulisses - il y a toujours un observateur par abonnement.
Le phénomène que vous faites référence est appelé la pression arrière et l'équipe RX explore actuellement différentes façons de gérer cette situation. Une solution pourrait communiquer la pression arrière à l'observable de sorte qu'il pourrait "ralentir". P>
Pour atténuer la pression arrière, vous pouvez utiliser des opérateurs de perte tels que la manette des gaz ou l'échantillon. P>
La réponse de Timothy est principalement correcte, mais il est est em> possible d'avoir une contre-pression sur un seul fil. Cela peut arriver si vous utilisez du code asynchrone. En ce sens, la contre-pression est liée à la synchronisation et à la planification, pas de filetage (rappel que par défaut RX est uniforme à filetage). P>
Si vous rencontrez un scénario dans lequel les événements sont produits plus rapidement qu'ils ne peuvent être consommés et que vous n'utilisez pas un opérateur de perte pour atténuer la pression arrière, ces éléments sont généralement programmés / en file d'attente / tamponnées, qui peuvent conduire à beaucoup d'allocation de mémoire. p>
Personnellement, cela n'a pas été question pour moi, car généralement des événements sont traités plus rapidement qu'ils ne sont cédés, sinon la perte d'événements n'est tout simplement pas une option, et donc la consommation de mémoire supplémentaire est inévitable. P>
Merci pour votre réponse; "Ces articles sont généralement programmés / en file d'attente / tamponnés, ce qui peut conduire à beaucoup d'allocation de mémoire" - alors ma question précédente était donc entourant cela; Où ce travail est tamponné (évidemment à l'intérieur de la mémoire); Mais car le développeur a-t-il géré ce tampon; et si nous pouvons dicter quelque chose de lié à cette mise en mémoire tampon de messages en tant que développeur. Merci
Vous devez utiliser un opérateur avec perte, comme le dit Chris - jetez un coup d'œil à cette approche pour reliever la pression arrière pour plus d'informations: Zerobugbuild.com/?p=192
Exactement. Vous avez quatre options. 1: Ralentissez la source, 2: tampon des événements (contre-pression), 3: accélérer le consommateur, 4: perdre des événements. Pour les opérateurs intégrés, le seul contrôle que vous avez sur Comment i> les événements sont programmés consiste à injecter un planificateur spécifique code>. Cela fournit suffisamment de contrôle dans certaines circonstances, mais de mon expérience sont programmés sur une base de souscription, pas une base par objet. Certains opérateurs ont également des surcharges de sélecteur, ce qui pourrait aider dans certaines zones. Si vous avez besoin d'un contrôle encore plus que cela, vous devrez peut-être écrire des opérateurs personnalisés.
Dans mon cas, je ne peux pas me permettre de perdre et des messages entrants car ils sont des mises à jour ...
Les positions sont-elles absolues? Si tel est le cas, vous pourriez, en théorie, perdez certaines des mises à jour lors d'une perte de douceur / précision. Nous pourrions aider plus si nous savions exactement quel était le problème.
Les positions de Chris sont indépendantes les unes des autres; Ce ne sont pas des tickers et je ne peux pas me permettre d'utiliser des opérateurs de perte. Il semble que ma seule solution utilise la mémoire tampon qui stockera toutes les mises à jour de position entrantes.
Vous devriez déterminer si le taux de notifications entrantes est vraiment aussi grand. Comme Chris l'a dit, ce n'est pas un problème qui vient souvent. Habituellement, quand il fait, c'est à cause d'un consommateur lent plutôt qu'à un producteur qui est si rapide rien ne pouvait éventuellement suivre. Des exemples de consommateurs lents sont tout ce qui concerne le réseau, la base de données, l'interface utilisateur, même les fichiers locaux, etc. Si vous faites de l'un d'entre eux dans votre observateur, vous devriez envisager de faire tamponner les mises à jour et la fusion du traitement si vous ne pouvez pas utiliser les opérateurs à perte de temps. . Profilage du code d'observateur en vaut la peine.
Avez-vous absolument besoin de traiter chaque événement ou est-il possible d'ignorer certains d'entre eux (même si cela prend une logique fantaisie)?
Puisque vous utilisez déjà Dataflow en tant que consommateur, pourquoi ne pas l'utiliser aussi comme producteur? Il prend en charge un peu d'étranglement.