J'ai un générateur de messages sur ma machine locale et un courtier sur l'hôte distant (aws).
Après avoir envoyé un message du producteur, J'attends et j'appelle le consommateur de console sur l'hôte distant et voir les journaux excessifs. Sans la valeur du producteur.
Le producteur vide les données après avoir appelé la méthode send. Tout est configuré correctement.
Comment puis-je vérifier que le courtier a reçu le message du producteur et voir si le producteur a reçu la réponse?
3 Réponses :
La méthode Send envoie le message de manière asynchrone au sujet et
renvoie un Future de RecordMetadata.
java.util.concurrent.Future
Envoie de manière asynchrone un enregistrement à un sujet
Après l'appel de flush ,
vérifiez que le futur est terminé en appelant la méthode isDone .
(par exemple, Future.isDone () == true )
L'appel de cette méthode rend immédiatement disponibles tous les enregistrements mis en mémoire tampon (même si linger.ms est supérieur à 0) et bloque à la fin des requêtes associées à ces enregistrements. La post-condition de flush () est que tout enregistrement précédemment envoyé sera terminé (par exemple Future.isDone () == true). Une requête est considérée comme terminée lorsqu'elle a été acquittée avec succès selon la configuration d'acks que vous avez spécifiée ou bien elle entraîne une erreur.
Le RecordMetadata contient le offset et la partition
partition public int ( )
La partition à laquelle l'enregistrement a été envoyé
offset long public ()
le décalage de l'enregistrement, ou -1 si {hasOffset ()} renvoie false.
Ou vous pouvez également utiliser Fonction de rappel pour s'assurer que les messages ont été envoyés au sujet ou non
Une utilisation totalement non bloquante peut utiliser le paramètre Callback pour fournir un rappel qui sera appelé lorsque la requête est terminée.
voici un exemple clair dans la documentation
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
J'ai envoyé le lot à des courtiers à distance. Tant qu'il atteignait le courtier, une nouvelle partition était créée (c'était une nouvelle partition). Mais je ne peux pas voir les messages que j'ai envoyés à cette partition.
sont ces partitions modifiées @voipp
Je viens de déplacer le courtier sur la machine locale et tout fonctionne bien
@Deadpool, si hasOffset () renvoie true, puis-je dire que le sujet a reçu le message?
Oui, vous pouvez @jumping_monkey
@Deadpool, merci. Serait-ce mal si j'appelle simplement .get () dans le bloc try. Si la rubrique reçoit le message, le flux n'ira jamais dans le bloc catch, et si la rubrique n'a pas reçu le message, le flux ira au bloc catch? Je voudrais savoir quelle est la meilleure pratique, mon objectif est de toujours vérifier que le message a atteint le sujet.
@Deadpool, votre réponse mise à jour concerne un envoi asynchrone. Pouvez-vous en ajouter un pour un envoi synchrone?
Vous pouvez essayer l'API get () d'envoi, qui renverra le Future of RecordMetadata
ProducerRecord<String, String> record =
new ProducerRecord<>("SampleTopic", "SampleKey", "SampleValue");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
@Chandra Mohan G, si le bloc catch n'est jamais exécuté, ce qui signifie que le .get () est "exécuté" avec succès, puis-je dire que le sujet a reçu le message? Ou dois-je vérifier les RecordMetadata renvoyés?
Utilisez la livraison exactement une fois et vous n'aurez pas à vous soucier de savoir si votre message a été atteint ou non: https://www.baeldung.com/kafka-exactly-once , https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka- fait-le /
le problème réside dans le placement des courtiers sur un serveur AWS distant. Si je les mets localement, tout fonctionne bien. Comment votre proposition pourrait-elle m'aider?
Voulez-vous dire que votre producteur ne travaille pas? Qu'entendez-vous par «voir les journaux excessifs»? Avez-vous correctement configuré les écouteurs sur vos courtiers?
@RobinMoffatt Je voulais dire qu'il y avait beaucoup de journaux sans signification (sans valeur réelle) dans la console après avoir exécuté le consommateur de la console. Ce qui est intéressant, c'est qu'après avoir déplacé les courtiers vers la machine locale et modifié l'URL, tout fonctionne correctement. Je me demande comment puis-je aborder la communication avec les courtiers sur AWS. Encore une fois, j'ouvre tous les ports entrants et sortants.