10
votes

Traitement simultané des messages avec un ordre strict

Dans mon application Web Javae, je dois traiter des messages entrants strictement dans l'ordre d'arrivée. Je suppose que mon conteneur WebApp (Tomcat 6) conserve l'ordre des messages lors de leur arrivée au port HTTP.

Qu'est-ce qui cause des maux de tête pour moi, c'est la façon dont je traite à l'intérieur de ces messages. Pour une charge de travail améliorée, j'appuie le traitement de chaque message à un threadpool autant de choses à faire ici. Analyse XML, parfois enrichissement des données à l'aide de services Web externes. Une fois le traitement terminé, je repousse la représentation Java du message dans un moteur de traitement de flux complexe esper.codehaus.org , qui est threadsafe. Ici, différents modèles sont vérifiés lorsque l'ordre d'entrée est le seuil exigeant le plus élevé d'un phénomène dépasse.

J'ai eu l'idée d'insérer chaque message traité dans une priorityQuiee avec un identifiant prioritaire qu'ils ont reçu au moment de votre arrivée (dans mon servlet, où il est incrémenté pour chaque message). Le problème est le suivant:

Le fil qui sonne l'interrogation Les éléments de la file d'attente (ID la plus basse correspond à la tête de la file d'attente) pour l'insérer dans Esper pourraient sauter un identifiant car il ne vérifie pas les éléments manquants. Je suppose qu'une illustration fonctionne mieux:

Entrez la description de l'image ici

pour les étapes (1) à (4) tout fonctionne comme prévu. Mais à l'étape (5), l'treeColler récupère l'élément 6 et non l'élément 4 (qui est ensuite inséré à l'étape (6)). Cela se traduit par la commande de message: 2; 3; 6; 4

Ce que j'ai essayé de faire était de modifier la mise en œuvre de l'interrogation de la tête de la file d'attente pour suivre l'ordre strict des identifiants. Signification, si l'élément de l'identifiant suivant n'est pas encore inséré dans la file d'attente, attendez une barrière jusqu'à ce que c'est là. Cela semblait fonctionner pendant les 10 premières minutes, puis pendu, probablement en raison d'un élément qui n'a jamais été inséré dans la file d'attente.

Quelqu'un est-il là-bas ayant un problème similaire dans le passé et a un indice pour moi?


5 commentaires

Vous ferez peut-être mieux la création des articles de la file d'attente dès qu'ils arrivent sur votre serveur, ne leur permettent qu'à partir de la file d'attente de la file d'attente et une fois le traitement d'entre eux terminé.


Ce que je ne comprends pas, c'est que si l'ordre des éléments de la file d'attente de sortie doit correspondre à la commande dans la file d'attente d'entrée, quelle est la bonne traitement hors de la commande? C'est-à-dire pourquoi 6 avant 4 que la queuePoller a besoin d'avoir 4 avant 6 ans?


J'ai pensé à ce sujet, mais je suppose que cela conduirait à des résultats similaires. En supposant, j'utiliserais un drapeau qui indique que le traitement est terminé, les mêmes problèmes pourraient survenir si quelque chose n'allait pas dans le traitement (délai d'attente du serveur, etc. qui est également le problème pourquoi l'élément n'est pas inséré). Ainsi, le drapeau indicateur ne serait jamais défini sur True.


@AIB: Profitez de la CPU multicœur et de l'utilisation matérielle Si le traitement des messages (analyse d'analyse XML, enrichissement à l'aide de services Web) est coûteux. J'ai fait des tests de stress sur mon application avec une solution à threads unique et il a montré moins de performances.


@matthes que vous sondant la solution pourrait fonctionner, si vous ajoutez un délai d'interrogation: si la file d'attente ne contient pas de n articles commandés après M Millisecondes, prenez la première partie des articles commandés et les traitera (peut-être que le morceau ne contiendra qu'un seul article dans la limite)


4 Réponses :


3
votes

Consultez perturbateur - une file d'attente haute performance avec une commande stricte (première entrée - premier servi)


6 commentaires

Bon sang, j'étais sur le point de dire ça :-)


Il semble que la situation de l'OP est que les éléments entrent dans le traitement du fil de la commande et il doit les traiter dans l'ordre donné par la clé de l'article. Comment le perturbateur va-t-il résoudre ce problème?


Merci pour le lien, je vais certainement vérifier cela et faire rapport, si cela résolvait mes problèmes. Après une lecture rapide, il ne semble toujours pas résoudre l'attente des éléments manquants - ou je n'ai pas lu assez.


La chose est la perturbatrice divise deux actions: 1) lorsque le thread "producteur" "revendique" un numéro, il se réserve essentiellement. 2) Lorsque le thread "producteur" a mis ses données dans la file d'attente, elle s'engage et permet aux "consommateurs" de le traiter. La perturbatrice garantit que les messages seraient traités dans l'ordre des revendications. Cela signifie que si un "producteur" a été réservé au n ° 5, alors le consommateur attendrait jusqu'à ce qu'il soit prêt même s'il y a déjà # 6 dans la file d'attente.


Ok, cela semble être la chose que je cherche. Je vais essayer d'essayer de faire rapport! Merci


C'est vraiment génial. Les gars, consultez le perturbateur! Il a résolu tous mes problèmes avec moins de 100 lignes de code. A pris un moment jusqu'à ce que j'ai eu le point que cela ne stocke pas d'objets, mais juste des identifiants comme une clé pour les objets réels. Cela le rend vite fouet. Je l'ai comparé à un conteneur auto-écrit avec des délais d'attente comme suggéré par AIB et Victor. Le mien était bien plus lent alors qu'il fonctionnait avec des barrières muniples.



0
votes

La bibliothèque de classe fournit une implémentation flexible de piscine de thread, ainsi que des configurations prédéfinies utiles. Vous pouvez créer une piscine de fil en appelant l'une des méthodes d'usine statiques dans les exécutants:

Pour vos besoins, je pense que les exécutors.Newsinglethreadexecutor () est le meilleur. Un exécuteur unique fileté crée un seul fil de travail pour traiter les tâches, le remplaçant s'il meurt de manière inattendue. Les tâches sont garanties pour être traitées de manière séquentielle selon l'ordre imposé par la file d'attente de la tâche (FIFO, LIFO, Ordinateur de priorité).


2 commentaires

Qui satisfait à la contrainte de commande en effet, au coût de la performance.


Yup, comme mentionné dans les commentaires ci-dessus, c'est la dernière solution que je réaliserai, si tout le reste ne fonctionnera pas. Le traitement des messages peut être très coûteux dans de nombreux cas.



0
votes

Comme en témoigne votre problème et la nécessité d'un diagramme (+1 pour cela, en passant) une file d'attente prioritaire n'est pas une bonne construction pour ce que vous voulez. En effet, une file d'attente est parfaitement heureuse de vous servir un 6 disponible plutôt que d'attendre un indisponible 4.

Je pense qu'il est temps de faire rouler votre propre conteneur synchronisé.


1 commentaires

Ne devrait pas être beaucoup plus que d'abuser d'une file d'attente prioritaire: D



1
votes

Vous pouvez ajouter instantanément un espace réservé pour les demandes entrantes à votre file d'attente de traitement. L'espace réservé est prétraité à l'arrière-plan par une piscine de fil, mais le traitement principal attend le prétraitement. La construction que j'ai à l'esprit est un avenir .


0 commentaires