2
votes

Kafka Consumer ne lit pas les données de rubrique via Java

J'envoie des données JSON d'entrée au sujet Kafka. Je peux voir les mêmes données JSON dans kafka consumer en utilisant la commande ci-dessous.

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic mytopic --from-beginning

Cas 1: Mais lorsque j'essaie de lire les enregistrements du consommateur kafka (java), je n'obtiens aucun enregistrement dans la console java. J'ai essayé les exemples donnés dans ce lien https : //kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Cas 2: Si j'envoie un message du producteur (fenêtre de commande), le même est dans le consommateur (fenêtre de commande). et capable de voir les mêmes enregistrements dans la console java. Ce scénario fonctionne.

Si je soumets les données au sujet via le programme java. Puis les mêmes données JSON apparaissant dans consumer (fenêtre de commande). Mais ne vient pas dans la console java. Case1 ne fonctionne pas.
Case2 fonctionne. Veuillez me faire part de toute configuration à effectuer?


4 commentaires

alors vous commencez à consommer à partir de java, puis envoyez de nouvelles données au sujet et aucun message n'apparaît (cas 1)?


Exécutez-vous case2 en utilisant une seule JVM?


Comment avez-vous écrit votre producteur? Partagez-nous votre code.


Démarrez à la fois le consommateur de la console et le consommateur personnalisé. Ensuite, lancez votre producteur java. Assurez-vous également que votre producteur envoie des messages à Kafka. Voyez-vous des messages sur les deux consommateurs?


3 Réponses :


0
votes

Vous devez définir ConsumerConfig.AUTO_OFFSET_RESET_CONFIG pour pouvoir lire depuis le début.

kafkaConsumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); 

Et assurez-vous également que vous n'exécutez pas des processus consommateurs différents en utilisant le même ID de groupe de consommateurs , car les données d'une partition peuvent être lues à partir d'un processus et un autre ne verra rien.


0 commentaires

0
votes

En supposant que le code du producteur et du consommateur est correct.

  • Arrêtez tous les consommateurs.
  • Réinitialisez votre groupe de consommateurs

$ KAFKA_HOME / bin / kafka-consumer-groups.sh --bootstrap-server localhost: 9092 --group group_name --topic topic_name --reset-offsets --au plus tôt --exécuter

  • Démarrez maintenant votre consommateur

Cela devrait résoudre votre problème.

Voici quelques propriétés des consommateurs Kafka:

    bootstrap.servers: 'localhost:9092'
    group.id: 'group_id'
    auto.offset.reset: 'earliest'
    key.deserializer: 'org.apache.kafka.common.serialization.*' //Replace * with class
    value.deserializer: 'org.apache.kafka.common.serialization.*'

Merci

p >


0 commentaires

1
votes

Vous devez vous assurer que vous n'envoyez pas de lignes vides à la fin et que vous ne consommez que le dernier - utilisez auto.offset.reset: 'early' comme ci-dessus, ou properties.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "le plus tôt");

Et puis, consumer.seekToBeginning (consumer.assignment ()); pour vous en assurer.


0 commentaires