0
votes

Comment définir X-OPT-décalsets lors de l'établissement de la connexion au hub d'événement à l'aide d'AMQP pour éviter la relecture de message

Mon application se connecte au hub d'événement Azure pour recevoir des messages et les traiter. Je vois que chaque fois que je redémarre ma demande, tous les messages de la période de conservation sont rejoués. J'ai lu sur le décalage pour éviter ce problème et j'ai une méthode qui configure une connexion à un hub d'événement Azure comme: xxx

Le code commenté sur le décalage était ce que j'essayais après avoir lu ici : https://azure.github.io/amqpnetlite/articles/azure_eventhubs.htmlLec a>

Quelle est la bonne façon de définir le décalage afin que les messages ne soient pas réaffectés lorsque l'application redémarre?


0 commentaires

3 Réponses :


2
votes

APCHE QPID ne prend pas en charge les filtres AMQP (le proton J sous-jacent Apache Proton J) ..

J'ai corrigé l'amqpontumerbuilder.configuresource () métho en ajoutant les lignes suivantes à la fin: P>

Symbol filterKey = Symbol.valueOf("apache.org:selector-filter:string");
UnknownDescribedType filterValue = new UnknownDescribedType(filterKey, String.format("%s > '%s'",amqp.annotation.x-opt-offset", lastOffset));

filters.put(filterKey, filterValue);


0 commentaires

0
votes

C'était beaucoup plus facile que j'avais pensé! Trouvé ce lien: https://timjansen.github.io/jarfrefer/guide/ jms / selectors.xhtml

Donc, tout ce que je devais faire était d'ajouter la condition de filtrage comme ceci: xxx


3 commentaires

Cela n'a pas fonctionné pour moi et entraînera une exception comme: javax.jms.invalidselectOrxception: amqp.annotation.x-Opt-Offset> = '1'


Ce doit être une valeur de compensation valide, si vous commencez avec une file d'attente vide, la valeur doit être -1.


Lorsque vous utilisez Qpid-JMS-Client, un filtre avec un point dans le nom, entraînera une exception invalide. Vous avez raison sur la file d'attente vide. Je l'ai couru aussi. Mais cela entraînera une autre erreur. Utilisation de la version forcée, j'ai montré comme réponse, fonctionne bien avec Qpid-JMS-Client



0
votes

Inspiré de la réponse de Mourad Zouabi, j'ai créé une fourchette du référentiel Qpid-JMS et a modifié la classe AMQPCONSumerBuilder comme suit pour permettre de passer le filtre à un messageconsumer:

MessageConsumer consumer = session.createConsumer(queue,  String.format("x-opt-enqueued-time > %s", timeStamp));


0 commentaires