Mon application se connecte au hub d'événement Azure pour recevoir des messages et les traiter. Je vois que chaque fois que je redémarre ma demande, tous les messages de la période de conservation sont rejoués. J'ai lu sur le décalage pour éviter ce problème et j'ai une méthode qui configure une connexion à un hub d'événement Azure comme: Le code commenté sur le décalage était ce que j'essayais après avoir lu ici : https://azure.github.io/amqpnetlite/articles/azure_eventhubs.htmlLec a> p> Quelle est la bonne façon de définir le décalage afin que les messages ne soient pas réaffectés lorsque l'application redémarre? p> p>
3 Réponses :
APCHE QPID ne prend pas en charge les filtres AMQP (le proton J sous-jacent Apache Proton J) ..
J'ai corrigé l'amqpontumerbuilder.configuresource () métho en ajoutant les lignes suivantes à la fin: P>
Symbol filterKey = Symbol.valueOf("apache.org:selector-filter:string"); UnknownDescribedType filterValue = new UnknownDescribedType(filterKey, String.format("%s > '%s'",amqp.annotation.x-opt-offset", lastOffset)); filters.put(filterKey, filterValue);
C'était beaucoup plus facile que j'avais pensé! Trouvé ce lien: https://timjansen.github.io/jarfrefer/guide/ jms / selectors.xhtml
Donc, tout ce que je devais faire était d'ajouter la condition de filtrage comme ceci: p>
Cela n'a pas fonctionné pour moi et entraînera une exception comme: javax.jms.invalidselectOrxception: amqp.annotation.x-Opt-Offset> = '1'
Ce doit être une valeur de compensation valide, si vous commencez avec une file d'attente vide, la valeur doit être -1.
Lorsque vous utilisez Qpid-JMS-Client, un filtre avec un point dans le nom, entraînera une exception invalide. Vous avez raison sur la file d'attente vide. Je l'ai couru aussi. Mais cela entraînera une autre erreur. Utilisation de la version forcée, j'ai montré comme réponse, fonctionne bien avec Qpid-JMS-Client
Inspiré de la réponse de Mourad Zouabi, j'ai créé une fourchette du référentiel Qpid-JMS et a modifié la classe AMQPCONSumerBuilder comme suit pour permettre de passer le filtre à un messageconsumer:
MessageConsumer consumer = session.createConsumer(queue, String.format("x-opt-enqueued-time > %s", timeStamp));