11
votes

Analogue de la file d'attente.peek () pour le blocageCollection lors de l'écoute de consommer iEnumerable

J'utilise motif de pipelines Mise en œuvre pour découpler les messages consommateurs de Un producteur pour éviter une question lente consommation.

en cas d'exception sur une étape de traitement de message [1] code> il sera perdu et non envoyé à un autre service / couche [2 ] code>. Comment puis-je gérer un tel problème dans [3] code> afin que le message ne soit pas perdu et ce qui est important! Les commandes de messages ne seront pas mélangées afin que le service supérieur / couche obtiendra des messages dans l'ordre dans lequel ils sont entrés. J'ai une idée qui implique une autre queue intermédiaire code> mais il semble complexe? Malheureusement BlockingCollection code> n'expose aucun analogue de la méthode de la file d'attente Queue.peek () Code> pour que je puisse simplement lire le prochain message disponible et en cas de traitement réussi, do Dequeue () code> p> xxx pré>

modifier strud>: oublié de dire que c'est le service WCF hébergé IIS qui renvoie des messages à Silverlight Client WCF Proxy via client Contrat de rappel. p>

Edit2: strong> ci-dessous est de savoir comment je ferais cela en utilisant peek () code>, je manque quelque chose? P>

bool successfullySent = true;
try
{
   var item = queue.Peek();
   PreProcessItem(item);
   SendItem(item);       
}
catch(Exception exception)
{
   successfullySent = false;
}
finally
{
   if (successfullySent)
   {
       // just remove already sent item from the queue
       queue.Dequeue();
   }
}


7 commentaires

Vous attendez-vous à ce que l'exception ne soit pas lancée lorsque vous essayez à nouveau? Si oui, mettez simplement une boucle de temps jusqu'à ce qu'aucune exception ne soit lancée. Si non, vous devez décider de quoi faire dans le cas, un élément ne peut pas être traité. Peut-être transmettre un message d'erreur au prochain calque?


Il s'agit d'un message d'expédition de serveur WCF Server WHCIH à un proxy client, il peut s'agir de la connexion si le client a échoué et en 5 secondes, le client se reconnectera à un service et prêt à recevoir un message, tandis que est ce que je ' M faisant maintenant avec n renvoient des tentatives, je dois en parler, mais il y a toujours un numéro de Magic Whsih pour reconectérétries et délai d'attente à choisir


On dirait que vous êtes sur le point de réinventer les files d'attente de messages. Avez-vous envisagé d'utiliser quelque chose comme MSMQ ou RabbitMQ?


@DTB: Il s'agit d'une technologie lourde, je ne suis pas sûr que c'est la meilleure façon d'aller pour le service WCF ?! mais doit admettre que c'est le plus robuste et fiable


@sll et pourquoi pensez-vous qu'à utiliser quelque chose comme peek () vous aiderait à éviter ces nombres magiques?


Fait éditer Stackoverflow.com/a/13583279/659190


@SLL, cela signifie que s'il y a une défaillance permanente, vous n'allez jamais passer au-delà de cet article. Si c'est ce que vous voulez, vous pouvez simplement utiliser un nombre infini de tentatives, vous n'avez pas besoin Peek () pour cela.


4 Réponses :


0
votes

Vous pouvez utiliser Concurrentqueur à la place, il a trydeffeue () méthode.

Concurrentqueue .tryDequeue (résultat t) essaie de supprimer et de retourner l'objet au début de la file d'attente simultanée, il retourne vrai si Un élément a été retiré et retourné du début de la concurrence avec succès.

Donc, pas besoin de vérifier un coup d'œil en premier.

trydequeue () est le fil de sécurité:

Concurrentqueue gère toutes la synchronisation interne . Si deux threads appellent TryDequeue (T) à précisément le même moment, aucune opération n'est bloquée.

Aussi loin que je comprenne il ne renvoie faux que si la file d'attente est vide :

Si la file d'attente était remplie de code tel que q.enqueue ("A"); q.Enqueue ("B"); q.Enqueue ("c"); et deux threads essaient simultanément de déroger un élément, un thread va résoudre un et l'autre thread va déléillir b. Les deux appels à TryDequeue (T) reviendront tristes, car ils étaient tous deux capables de résoudre un élément. Si chaque thread remonte à dequoise un élément supplémentaire, l'un des threads va ressusueuler C et revenir vrai, tandis que l'autre thread trouvera la file d'attente vide et retournera false.

http://msdn.microsoft. COM / EN-US / Bibliothèque / DD287208% 28V = vs.100% 29.aspx

mise à jour

Peut-être que l'option la plus simple utiliserait TasksCHEDuler classe. Avec cela, vous pouvez envelopper toutes vos tâches de traitement dans les articles de la file d'attente et simplifier la mise en œuvre de la synchronisation.


6 commentaires

BlockingCollection est un wrapper autour de Concurrentqueue par défaut. Il y a une différence entre eux: blocagecollection permet d'attendre qu'il y ait des éléments disponibles dans la file d'attente, tandis que Concurrentqueue n'a pas cette fonctionnalité.


Ok trydequeue () permet de vérifier si un élément a été détresuisé, c'est comme int32.tryparse () - fonctionne sans exception en cas de défaillance


@sll elle ne vérifie pas seulement, elle supprime effectivement (DEQUUE) l'élément: "tente de supprimer et de renvoyer l'objet au début de la file d'attente simultanée." Il ne renvoie faux que si la file d'attente est vide.


Bien sûr, il vérifie et supprimez, il suffit donc de la standard file d'attente.dequeue () : il ne jette pas invalidOperationException si la file d'attente est vide et de retourner faux, rien de plus. Alors, comment recommanderiez-vous l'utiliser dans mon cas? Surtout à la file d'attente? Je n'aime pas tandis que (file d'attente.tryDequeue (), timout) boucle


@sll Désolé, j'ai peut-être mal compris le problème, je pensais que c'était seulement environ Peek () Méthode. Vous avez sûrement besoin d'un objet de synchronisation supplémentaire; J'utiliserais un événement, manuelResetevent peut-être ...


Je pense qu'il vous manque un point de ma question, il n'y a essentiellement rien à propos de plusieurs threads Dequeue d'éléments, le problème est quand un élément délégé et le service n'a pas pu l'envoyer à un autre service - élément est principalement perdu car il est déjà sorti d'un La file d'attente et non envoyée à un service cible, donc en cas de défaillance de ce moment, un article sera perdu. À l'aide de la file d'attente, je peux jeter un coup d'œil article, envoyez-le, et si vous avez envoyé avec succès - dequoise, sinon quittez la file d'attente, puisque Peek ne le supprime pas.



9
votes

Vous devez envisager une file d'attente intermédiaire.

blocageCollection ne peut pas "jeter d'œil" des objets à cause de sa nature - il peut y avoir plus d'un consommateur. L'un d'entre eux peut PEEK un article, et un autre peut prise de .


4 commentaires

S'il n'y a qu'un seul client, vous pouvez faire un coup d'œil avec la méthode (code> pour ienumerable , qui ne supprime pas un élément de la collection de blocage .


@pvoosten - merci pour cela. Il "ressent" dangereux de la mettre en œuvre comme ça, puisque ces types d'hypothèses s'avèrent souvent faux à une date ultérieure lorsque l'architecture est difficile à annuler. Mais si les gens savent il n'y a qu'un seul consommateur, alors cela semble un excellent moyen de le faire.


@PVoosten: premier () lancera une exception s'il n'y a rien dans la collection. Et si vous voulez bloquer jusqu'à ce qu'il y ait au moins un élément de la collection (comme prendre () mais sans supprimer l'élément).


@tigrou: Suivez ensuite la réponse et envisagez une file d'attente intermédiaire. La vérification des articles de la collection aura toujours toujours introduire une condition de course ou la nécessité de verrouiller.



4
votes

comme Dennis dit dans son commentaire, BlockingCollection fournit une enveloppe de blocage à n'importe quel implémenteur du iProducerconsumercollection interface.

Comme vous pouvez le constater, iProducerconsumerclection , par conception, ne définit pas un peek ou d'autres méthodes nécessaires pour en mettre en oeuvre une. Cela signifie que BlockingCollection ne peut pas, comme il se trouve, offre une analyouge à Peek .

Si vous considérez, cela réduit considérablement les problèmes de concurrence créés par le commerce des services publics d'un PEEK implémentation. Comment pouvez-vous consommer sans consommer? PEEK Parallèlement, vous devez verrouiller la tête de la collection jusqu'à ce que le fonctionnement PEEK a été terminé que i et les concepteurs de blockingcollection voir comme sous-optimal. Je pense que ce serait également désordonné et difficile à mettre en œuvre, nécessitant une sorte de contexte de peink jetable.

Si vous consommez un message et que sa consommation échoue, vous devrez gérer avec elle. Vous pouvez l'ajouter à une autre file d'attente de défaillance, ré-ajoutez-la à la file d'attente de traitement normale pour une nouvelle tentative de future ou simplement enregistrer sa défaillance de la postérité ou une autre action appropriée à votre contexte.

Si vous ne voulez pas consommer les messages simultanément, il n'est pas nécessaire d'utiliser BlockingCollection puisque vous n'avez pas besoin de consommation simultanée. Vous pouvez utiliser Concurrentqueue Directement, vous obtiendrez toujours la synchronicité des ajouts et vous pouvez utiliser Trypeek en toute sécurité car vous contrôlez un seul consommateur. Si la consommation échoue, vous pouvez arrêter la consommation avec une boucle infinie de nouvelle tentative dans votre désir bien que, je suggère que cela nécessite une pensée de conception.


1 commentaires

Je crois que le BlockingCollection ajoute un avantage potentiel de plus, à savoir que tryque () attendra jusqu'à ce qu'un élément soit disponible. Si nous voulons conserver le BlockingCollection à cause de cela, nous pouvons toujours jeter un coup d'œil à l'aide de tout () , correct? (Sous l'hypothèse d'un thread de consommation unique.)



3
votes

blocageCollection est un wrapper autour de iproducerconsumercollection , qui est plus générique que par exemple par ex. Concurrentqueue et donne à l'agent de mise en œuvre la liberté de ne pas avoir à mettre en œuvre un (Essayez) jeter la méthode .

Toutefois, vous pouvez toujours appeler trypseek Sur la file d'attente sous-jacente directement: xxx

note quelle que soit vous ne devez pas modifier votre file d'attente via utillyforpeking , sinon BlockingCollection sera confus et peut lancer invalidOperationException s à vous, mais je serais surpris si vous appelez le Trypeek non modifiant sur cette structure de données simultanée serait un problème.


0 commentaires