Je lance une application qui consomme des messages kafka.
J'ai suivi Spring-docs sur la gestion des erreurs de désérialisation afin de détecter l'exception de désérialisation. J'ai essayé la méthode failedDeserializationFunction.
Ceci est ma classe de configuration consommateur
public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> { @Override public NTCMessageBody apply(byte[] t, Headers u) { return new NTCBadMessageBody(t); } } public class NTCBadMessageBody extends NTCMessageBody{ private final byte[] failedDecode; public NTCBadMessageBody(byte[] failedDecode) { this.failedDecode = failedDecode; } public byte[] getFailedDecode() { return this.failedDecode; } }
Ceci est le fournisseur BiFunction
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> consumerProps = new HashMap<>(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); /* Error Handling */ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class); consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName()); consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class); return consumerProps; } @Bean public ConsumerFactory<String, NTCMessageBody> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(NTCMessageBody.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }
Lorsque j'envoie un seul message corrompu sur le sujet, j'ai cette erreur (en boucle):
org.apache.kafka.common.errors.SerializationException: Erreur de désérialisation de la clé / valeur
J'ai compris que ErrorHandlingDeserializer2 devrait déléguer le type NTCBadMessageBody et continuer la consommation. J'ai également vu (en mode débogage) que cela n'allait jamais dans le constructeur de la classe NTCBadMessageBody.
Quelqu'un peut-il m'aider?
Merci beaucoup.
3 Réponses :
Lorsqu'un désérialiseur ne parvient pas à désérialiser un message, Spring n'a aucun moyen de gérer le problème car il se produit avant le retour de poll (). Pour résoudre ce problème, la version 2.2 a introduit le ErrorHandlingDeserializer. Ce désérialiseur délègue à un véritable désérialiseur (clé ou valeur). Si le délégué ne parvient pas à désérialiser le contenu de l'enregistrement, ErrorHandlingDeserializer renvoie à la place une DeserializationException, contenant la cause et les octets bruts. Lors de l'utilisation d'un MessageListener au niveau de l'enregistrement, si la clé ou la valeur contient une DeserializationException, le ErrorHandler du conteneur est appelé avec le ConsumerRecord qui a échoué. Lors de l'utilisation d'un BatchMessageListener, l'enregistrement ayant échoué est transmis à l'application avec les enregistrements restants du lot, il est donc de la responsabilité de l'écouteur de l'application de vérifier si la clé ou la valeur d'un enregistrement particulier est une DeserializationException.
Donc, selon votre code, vous utilisez record-level MessageListener
puis ajoutez simplement ErrorHandler
à Container
Si votre gestionnaire d'erreurs implémente cette interface, vous pouvez, par exemple, ajuster les décalages en conséquence. Par exemple, pour réinitialiser le décalage pour relire le message d'échec, vous pouvez faire quelque chose comme ce qui suit; notez cependant que ce sont des implémentations simplistes et vous voudrez probablement plus de vérification dans le gestionnaire d'erreurs.
@Bean public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setErrorHandler(new ErrorHandler() { @Override public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) { String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0]; String topics = s.split("-")[0]; int offset = Integer.valueOf(s.split("offset ")[1]); int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]); TopicPartition topicPartition = new TopicPartition(topics, partition); //log.info("Skipping " + topic + "-" + partition + " offset " + offset); consumer.seek(topicPartition, offset + 1); System.out.println("OKKKKK"); } @Override public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) { } @Override public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) { String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0]; String topics = s.split("-")[0]; int offset = Integer.valueOf(s.split("offset ")[1]); int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]); TopicPartition topicPartition = new TopicPartition(topics, partition); //log.info("Skipping " + topic + "-" + partition + " offset " + offset); consumer.seek(topicPartition, offset + 1); System.out.println("OKKKKK"); } }); return factory; }
Ou vous pouvez faire une implémentation personnalisée comme dans cet exemple
@Bean public ConsumerAwareListenerErrorHandler listen3ErrorHandler() { return (m, e, c) -> { this.listen3Exception = e; MessageHeaders headers = m.getHeaders(); c.seek(new org.apache.kafka.common.TopicPartition( headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class), headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)), headers.get(KafkaHeaders.OFFSET, Long.class)); return null; }; }
The method setErrorHandler(new ErrorHandler(){}) is undefined for the type ContainerProperties
... mais il semble qu'il y ait un problème dans votre deuxième exemple car j'ai obtenu The method setErrorHandler(new ErrorHandler(){}) is undefined for the type ContainerProperties
Faire cette correction factory.setErrorHandler(new ErrorHandler()
ça marche très bien! Merci beaucoup!
La réponse ci-dessus peut avoir un problème si le nom de la partition a un caractère comme «-». donc, j'ai modifié la même logique avec regex.
@Bean public ConcurrentKafkaListenerContainerFactory<String, GenericType> macdStatusListenerFactory() { ConcurrentKafkaListenerContainerFactory<String, GenericType> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(macdStatusConsumerFactory()); factory.setRetryTemplate(retryTemplate()); factory.setErrorHandler(new KafkaErrHandler()); return factory; }
enfin, utilisez le gestionnaire d'erreurs dans config.
import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.MessageListenerContainer; import lombok.extern.slf4j.Slf4j; @Slf4j public class KafkaErrHandler implements ErrorHandler { /** * Method prevents serialization error freeze * * @param e * @param consumer */ private void seekSerializeException(Exception e, Consumer<?, ?> consumer) { String p = ".*partition (.*) at offset ([0-9]*).*"; Pattern r = Pattern.compile(p); Matcher m = r.matcher(e.getMessage()); if (m.find()) { int idx = m.group(1).lastIndexOf("-"); String topics = m.group(1).substring(0, idx); int partition = Integer.parseInt(m.group(1).substring(idx)); int offset = Integer.parseInt(m.group(2)); TopicPartition topicPartition = new TopicPartition(topics, partition); consumer.seek(topicPartition, (offset + 1)); log.info("Skipped message with offset {} from partition {}", offset, partition); } } @Override public void handle(Exception e, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) { log.error("Error in process with Exception {} and the record is {}", e, record); if (e instanceof SerializationException) seekSerializeException(e, consumer); } @Override public void handle(Exception e, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) { log.error("Error in process with Exception {} and the records are {}", e, records); if (e instanceof SerializationException) seekSerializeException(e, consumer); } @Override public void handle(Exception e, ConsumerRecord<?, ?> record) { log.error("Error in process with Exception {} and the record is {}", e, record); } }
Cependant, l'analyse de la chaîne d'erreur pour obtenir la parition, la rubrique et le décalage n'est pas recommandée. Si quelqu'un a une meilleure solution, veuillez poster ici.
Utilisez ErrorHandlingDeserializer.
Lorsqu'un désérialiseur ne parvient pas à désérialiser un message, Spring n'a aucun moyen de gérer le problème car il se produit avant le retour de poll (). Pour résoudre ce problème, la version 2.2 a introduit le ErrorHandlingDeserializer. Ce désérialiseur délègue à un véritable désérialiseur (clé ou valeur). Si le délégué ne parvient pas à désérialiser le contenu de l'enregistrement, ErrorHandlingDeserializer renvoie à la place une DeserializationException, contenant la cause et les octets bruts. Lors de l'utilisation d'un MessageListener au niveau de l'enregistrement, si la clé ou la valeur contient une DeserializationException, le ErrorHandler du conteneur est appelé avec le ConsumerRecord qui a échoué. Lors de l'utilisation d'un BatchMessageListener, l'enregistrement ayant échoué est transmis à l'application avec les enregistrements restants du lot, il est donc de la responsabilité de l'écouteur de l'application de vérifier si la clé ou la valeur d'un enregistrement particulier est une DeserializationException.
Vous pouvez utiliser le constructeur DefaultKafkaConsumerFactory qui prend les objets de désérialiseur de clé et de valeur et les connecte dans ErrorHandlingDeserializer approprié configuré avec les délégués appropriés. Vous pouvez également utiliser les propriétés de configuration du consommateur qui sont utilisées par ErrorHandlingDeserializer pour instancier les délégués. Les noms de propriété sont ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS et ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS; la valeur de la propriété peut être un nom de classe ou de classe
package com.mypackage.app.config; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import com.mypacakage.app.model.kafka.message.KafkaEvent; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryTemplate; import lombok.extern.slf4j.Slf4j; @EnableKafka @Configuration @Slf4j public class KafkaConsumerConfig { @Value("${kafka.bootstrap-servers}") private String servers; @Value("${listener.group-id}") private String groupId; @Bean public ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> ListenerFactory() { ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setRetryTemplate(retryTemplate()); factory.setErrorHandler(((exception, data) -> { /* * here you can do you custom handling, I am just logging it same as default * Error handler does If you just want to log. you need not configure the error * handler here. The default handler does it for you. Generally, you will * persist the failed records to DB for tracking the failed records. */ log.error("Error in process with Exception {} and the record is {}", exception, data); })); return factory; } @Bean public ConsumerFactory<String, KafkaEvent> consumerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class); config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName()); config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.mypackage.app.model.kafka.message.KafkaEvent"); config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.mypackage.app"); return new DefaultKafkaConsumerFactory<>(config); } private RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); /* * here retry policy is used to set the number of attempts to retry and what * exceptions you wanted to try and what you don't want to retry. */ retryTemplate.setRetryPolicy(retryPolicy()); return retryTemplate; } private SimpleRetryPolicy retryPolicy() { Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>(); // the boolean value in the map determines whether exception should be retried exceptionMap.put(IllegalArgumentException.class, false); exceptionMap.put(TimeoutException.class, true); exceptionMap.put(ListenerExecutionFailedException.class, true); return new SimpleRetryPolicy(3, exceptionMap, true); } }