J'essaie de valider un message juste après l'avoir lu à partir du sujet. J'ai suivi ce lien ( https://www.confluent.io / blog / apache-kafka-spring-boot-application ) pour créer un consommateur Kafka avec spring. Normalement, cela fonctionne parfaitement et le consommateur reçoit le message et attend qu'une autre personne entre dans la file d'attente. Mais le problème est que lorsque je traite ces messages, cela prend beaucoup de temps (environ 10 minutes), la file d'attente kafka pense que le message n'est pas consommé (validé) et les consommateurs le relisent encore et encore. Je dois dire que lorsque mon temps de traitement est inférieur à 5 minutes, cela fonctionne bien, mais quand il dure plus longtemps, cela ne valide pas le message.
J'ai cherché des réponses mais cela ne m'aide pas car je n'utilise pas le même code source (et bien sûr une structure différente). J'ai essayé d'envoyer des méthodes asynchrones et aussi de valider le message de manière asynchrone mais j'ai échoué. Certaines des sources sont:
Spring Boot Kafka: La validation ne peut pas être terminée car le groupe a déjà rééquilibré
https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly- o
Kafka 0.10 Le consommateur Java ne lit pas le message du sujet
https://github.com/confluentinc/confluent-kafka-dotnet/issues/470
La classe principale est ici:
@Service public class Consumer { @Autowired AppPropert prop; Consumer cons; @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckOnError(false); factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); //factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerProperties()); } @Bean public Map<String, Object> consumerProperties() { Map<String, Object> props = new HashMap<>(); Properties propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works) //props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers")); //props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup"); props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id")); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000); //props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer")); //props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer")); return props; } @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id") public void consume(String message) throws IOException { /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */ acknowledgment.acknowledge();// commit immediately Properties props=prop.startProp();//just getting my properties from my config-file ControllerPRO pro = new ControllerPRO(); List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me try { CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method /*This works fine when the processLaunch method takes less than 5 minutes, if it takes longer the consumer will get the same message from the topic and start again with this operation */ } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("End of consumer method "); } } ``````````````````````````````````````````````````````````
La classe consommateur (où je dois valider mon message)
@Service public class Consumer { @Autowired AppPropert prop; Consumer cons; @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id") public void consume(String message) throws IOException { /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */ Properties props=prope.startProp();//just getting my properties from my config-file ControllerPRO pro = new ControllerPRO(); List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me try { CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method /*This works fine when the processLaunch method takes less than 5 minutes, if it takes longer the consumer will get the same message from the topic and start again with this operation */ } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("End of consumer method "); } }
Comment puis-je valider le message juste après l'avoir lu dans la file d'attente.
Je veux être sûr que lorsque je reçois le message, je le valide immédiatement. À l'heure actuelle, le message est validé lorsque j'ai terminé d'exécuter la méthode juste après le (System.out.println). Est-ce que quelqu'un peut me dire comment faire cela?
----- update -------
Désolé pour la réponse tardive, mais comme @GirishB l'a suggéré, j'ai recherché la configuration de GirishB mais je ne vois pas où je peux définir le sujet que je veux lire / écouter à partir de mon fichier de configuration (applications.yml). Tous les exemples que je vois utilisent une structure similaire à celle-ci ( http: // tutoriels .jenkov.com / java-util-concurrent / blockingqueue.html ). Existe-t-il une option permettant de lire une rubrique déclarée sur un autre serveur? Utilisation de quelque chose de similaire à ce @KafkaListener (topics = "$ {app.topic.pro}", groupId = "group_id")
=========== SOLUTION 1 === ======================================
J'ai suivi les conseils de @victor gallet et inclus la déclaration des propriétés de confiseur afin d'accueillir l'objet "Acknowledgement" dans la méthode de consommation. J'ai également suivi ce lien ( https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/ grussell-spring-kafka-master / s1p-kafka / src / main / java / org / s1p / CommonConfiguration.java ) pour obtenir toutes les méthodes que j'ai utilisées pour déclarer et définir toutes les propriétés (consumerProperties, consumerFactory, kafkaListenerContainerFactory). Le seul problème que j'ai trouvé est le "new SeekToCurrentErrorHandler ()" déclaration car j'obtiens une erreur et pour le moment je ne suis pas en mesure de la résoudre (ce serait bien si quelqu'un me l'explique).
@SpringBootApplication @EnableAsync public class SpringBootKafkaApp { public static void main(String[] args) { SpringApplication.run(SpringBootKafkaApp .class, args); }
3 Réponses :
Vous pouvez utiliser un java.util.concurrent.BlockingQueue
pour pousser le message lorsque vous consommez et validez le décalage Kafka. Ensuite, en utilisant un autre thread, récupérez le message de blockingQueue et processus. De cette façon, vous n'avez pas à attendre la fin du traitement.
Vous devez modifier la configuration de votre consommateur avec la propriété enable.auto.commit
définie sur false:
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id") public void consume(String message, Acknowledgment acknowledgment) { // commit immediately acknowledgment.acknowledge(); }
Ensuite, vous devez modifier la fabrique Spring Kafka Listener et définissez le mode d'acquittement sur MANUAL_IMMEDIATE
. Voici un exemple de ConcurrentKafkaListenerContainerFactory
:
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckOnError(false); factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); factory.setErrorHandler(new SeekToCurrentErrorHandler()); return factory; }
Comme expliqué dans la documentation, MANUAL_IMMEDIATE
signifie: Validez le décalage immédiatement lorsque le La méthode Acknowledgment.acknowledge () est appelée par l'auditeur.
Vous pouvez trouver toutes les méthodes de validation ici .
Ensuite, dans votre code auditeur, vous pouvez valider l'offset manuellement en ajoutant un objet Acknowledgement
, pour exemple:
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Après avoir défini la propriété ci-dessus, si vous souhaitez traiter par lots, vous pouvez suivre les configurations suivantes.
factory.getContainerProperties().setAckOnError(true); //specifying batch error handler because i have enabled to listen records in batch factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler()); factory.setBatchListener(true); factory.getContainerProperties().setSyncCommits(false);
// vous pouvez définir Manuel ou MANUAL_IMMEDIATE car // KafkaMessageListenerContainer invoque // ConsumerBatchAcknowledgment pour tout type d'ackmode manuel
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
si votre temps de traitement est élevé et que vous ne souhaitez vous engager qu'APRÈS la réussite du processus, envisagez d'ajuster ces paramètres:
max.poll.interval.ms
,max.poll.interval.ms
etmax.poll.records
voir kafka.apache.org/documentation