8
votes

Comment traitez-vous les messages en parallèle tout en assurant la FIFO par entité?

Disons que vous avez une entité, disons, "personne" dans votre système et que vous souhaitez traiter des événements qui modifient diverses entités de personnes. Il est important que:

  • Les événements pour la même personne sont traités dans l'ordre FIFO
  • Les flux d'événements multiples personnes sont traités en parallèle par différents threads / processus

    Nous avons une implémentation qui résout celle-ci en utilisant une base de données et des verrous partagés. Les threads se font concurrence pour acquérir la serrure pour une personne, puis traiter des événements dans l'ordre après avoir acquis la serrure. Nous aimerions passer à une file d'attente de message pour éviter les interrogations et le verrouillage, que nous pensons réduire la charge sur la base de données et simplifierait la mise en œuvre du code de consommation.

    J'ai fait des recherches sur Activemq, Rabbbitmq et Hornetq, mais je ne vois pas de manière évidente de mettre en œuvre cela.

    ActiveMQ soutient les caractères génériques de souscription des consommateurs, mais je ne vois pas un moyen de limiter la concurrence sur chaque file d'attente à 1. Si je pouvais le faire, la solution serait simple:

    • Dites à un courtier de permettre une concurrence de 1 pour toutes les files d'attente à partir de: / File d'attente / personne.
    • Éditeur écrit l'événement à la file d'attente à l'aide de la personne ID dans le nom de la file d'attente. E.g.: /queue/person.20
    • Les consommateurs s'abonnent à la file d'attente avec Wildcards: / File d'attente / personne.>
    • Chaque consommateur recevrait des messages pour différentes files d'attente. Si toutes les files d'attente de la personne étaient utilisées, certains consommateurs peuvent rester inactifs, ce qui est OK
    • Après le traitement d'un message, le consommateur envoie un ACK, qui indique au courtier, il est fait avec le message et permet à un autre message pour que la file d'attente de cette personne soit envoyée à un autre consommateur (éventuellement le même)

      ActiveMQ est arrivé à proximité: vous pouvez faire des abonnements génériques et activer le «consommateur exclusif», mais cette combinaison entraîne un seul consommateur recevant tous les messages envoyés à toutes les files d'attente correspondantes, réduisant ainsi votre concurrence à 1 à toutes les personnes. J'ai envie de me manquer quelque chose d'évident.

      questions:

      • Y a-t-il un moyen de mettre en œuvre l'approche ci-dessus avec une implémentation de la file d'attente de message majeure? Nous sommes assez ouverts aux options. La seule exigence est qu'elle fonctionne sur Linux.
      • Y a-t-il un moyen différent de résoudre le problème général que je ne considère pas?

        Merci!


0 commentaires

3 Réponses :


0
votes

Si vous avez déjà un système qui permet des verrous partagés, pourquoi ne pas avoir de verrou pour chaque file d'attente, que les consommateurs doivent acquérir avant de lire la file d'attente?


2 commentaires

Avec un protocole de file d'attente non interrogatoire (AMQP, Stomp), le courtier fournira déjà le message au consommateur en fonction des règles de souscription. Le consommateur pourrait consulter la table de verrouillage et attendre qu'il soit disponible, mais cela introduit la complexité et réduit le parallélisme car le consommateur pourrait travailler sur un message pour une autre entité au lieu d'attendre la serrure.


Oui, mais le consommateur n'a pas besoin d'attendre sur la serrure - elle peut transmettre la prochaine file d'attente. Vraisemblablement, votre protocole de file d'attente permettrait également aux consommateurs de refuser de lire le message de la file d'attente et de tester la serrure.



1
votes

Un moyen général de résoudre ce problème (si j'ai eu votre problème), c'est introduire une propriété unique pour la personne (par exemple, un identifiant de niveau de base de données) et utilisez hachage de cette propriété comme indice de la follefère FIFO pour mettre cela Personne in.
Étant donné que le hachage de cette propriété peut être lourd (vous ne pouvez pas vous permettre 2 ^ 32 files d'attente / fils), utilisez uniquement les bits les moins importants de ce hachage. Chaque file d'attente FIFO devrait avoir un travailleur dédié qui fonctionnera sur elle - voila, vos besoins sont satisfaits!

Cette approche dispose d'un inconvénient - vos personnes doivent avoir des ID bien distribués pour que toutes les files d'attente fonctionnent avec une charge plus ou moins égale. Si vous ne pouvez pas vous garantir cela, envisagez d'utiliser l'ensemble des files d'attente rond-robin et de suivre quelles personnes sont en cours de traitement pour assurer un traitement séquentiel pour la même personne.


3 commentaires

Dans votre solution, les consommateurs font-ils toujours un abonnement Wildcard? Si oui, comment puis-je vous assurer que chaque file d'attente n'a qu'un consommateur actif? Ou dites-vous que je hachane le nom de la file d'attente de sorte que n == # de consommateurs actifs?


@James Désolé de vous confondre, j'ai parlé de solution générale, sans JMS en tête. C'est juste des fils et des collections JDK. Donc, dans ma solution, il faut configurer manuellement la cartographie entre la file d'attente et le consommateur - il est généralement très facile à faire.


Ah je comprends. Merci.



4
votes

On dirait que jmsxgroupid est ce que je cherche. Du Docs Activemq:

http://activemq.apache.org/message-groups.html < / p>

Leur exemple de contrôle d'utilisation avec les cours des actions est exactement ce que je suis après. Ma seule préoccupation est ce qui se passe si le consommateur unique meurt. Espérons que le courtier détectera cela et choisissez un autre consommateur à associer à cet ID de groupe.


2 commentaires

On dirait que cela fonctionne totalement. Voici un exemple de Python que j'ai testé avec succès contre Activemq et HornetQ pour ce type de problème si quelqu'un est intéressé: Gist.github. COM / 923228


De JMS oracle 1.1 especification J'ai trouvé jmsxgroupid , on dirait que c'est nécessaire pour toutes les implémentations JMS. Mais cela n'explique pas le comportement attendu avec des détails, puis dépendra du fournisseur