2
votes

Comment attraper une erreur de désérialisation dans Kafka-Spring?

Je lance une application qui consomme des messages kafka.

J'ai suivi Spring-docs sur la gestion des erreurs de désérialisation afin de détecter l'exception de désérialisation. J'ai essayé la méthode failedDeserializationFunction.

Ceci est ma classe de configuration consommateur

public class FailedNTCMessageBodyProvider implements BiFunction<byte[], Headers, NTCMessageBody> {

    @Override
    public NTCMessageBody apply(byte[] t, Headers u) {
        return new NTCBadMessageBody(t);
    }

}

public class NTCBadMessageBody extends NTCMessageBody{

    private final byte[] failedDecode;

    public NTCBadMessageBody(byte[] failedDecode) {
        this.failedDecode = failedDecode;
    }

    public byte[] getFailedDecode() {
        return this.failedDecode;
    }

}

Ceci est le fournisseur BiFunction

@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);

        /*  Error Handling */
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);

        return consumerProps;
    }

    @Bean
    public ConsumerFactory<String, NTCMessageBody> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                new JsonDeserializer<>(NTCMessageBody.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, NTCMessageBody> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

Lorsque j'envoie un seul message corrompu sur le sujet, j'ai cette erreur (en boucle):

org.apache.kafka.common.errors.SerializationException: Erreur de désérialisation de la clé / valeur

J'ai compris que ErrorHandlingDeserializer2 devrait déléguer le type NTCBadMessageBody et continuer la consommation. J'ai également vu (en mode débogage) que cela n'allait jamais dans le constructeur de la classe NTCBadMessageBody.

Quelqu'un peut-il m'aider?

Merci beaucoup.


0 commentaires

3 Réponses :


3
votes

ErrorHandlingDeserializer

Lorsqu'un désérialiseur ne parvient pas à désérialiser un message, Spring n'a aucun moyen de gérer le problème car il se produit avant le retour de poll (). Pour résoudre ce problème, la version 2.2 a introduit le ErrorHandlingDeserializer. Ce désérialiseur délègue à un véritable désérialiseur (clé ou valeur). Si le délégué ne parvient pas à désérialiser le contenu de l'enregistrement, ErrorHandlingDeserializer renvoie à la place une DeserializationException, contenant la cause et les octets bruts. Lors de l'utilisation d'un MessageListener au niveau de l'enregistrement, si la clé ou la valeur contient une DeserializationException, le ErrorHandler du conteneur est appelé avec le ConsumerRecord qui a échoué. Lors de l'utilisation d'un BatchMessageListener, l'enregistrement ayant échoué est transmis à l'application avec les enregistrements restants du lot, il est donc de la responsabilité de l'écouteur de l'application de vérifier si la clé ou la valeur d'un enregistrement particulier est une DeserializationException.

Donc, selon votre code, vous utilisez record-level MessageListener puis ajoutez simplement ErrorHandler à Container

Gestion des exceptions

Si votre gestionnaire d'erreurs implémente cette interface, vous pouvez, par exemple, ajuster les décalages en conséquence. Par exemple, pour réinitialiser le décalage pour relire le message d'échec, vous pouvez faire quelque chose comme ce qui suit; notez cependant que ce sont des implémentations simplistes et vous voudrez probablement plus de vérification dans le gestionnaire d'erreurs.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
kafkaListenerContainerFactory()  {

    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setErrorHandler(new ErrorHandler() {
        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            String topics = s.split("-")[0];
            int offset = Integer.valueOf(s.split("offset ")[1]);
            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
            System.out.println("OKKKKK");
        }

        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {

        }

        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {
            String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
            String topics = s.split("-")[0];
            int offset = Integer.valueOf(s.split("offset ")[1]);
            int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

            TopicPartition topicPartition = new TopicPartition(topics, partition);
            //log.info("Skipping " + topic + "-" + partition + " offset " + offset);
            consumer.seek(topicPartition, offset + 1);
            System.out.println("OKKKKK");


        }
    });


    return factory;
}

Ou vous pouvez faire une implémentation personnalisée comme dans cet exemple

@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
    this.listen3Exception = e;
    MessageHeaders headers = m.getHeaders();
    c.seek(new org.apache.kafka.common.TopicPartition(
            headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
            headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
            headers.get(KafkaHeaders.OFFSET, Long.class));
    return null;
   };
}


2 commentaires

The method setErrorHandler(new ErrorHandler(){}) is undefined for the type ContainerProperties ... mais il semble qu'il y ait un problème dans votre deuxième exemple car j'ai obtenu The method setErrorHandler(new ErrorHandler(){}) is undefined for the type ContainerProperties


Faire cette correction factory.setErrorHandler(new ErrorHandler() ça marche très bien! Merci beaucoup!



1
votes

La réponse ci-dessus peut avoir un problème si le nom de la partition a un caractère comme «-». donc, j'ai modifié la même logique avec regex.

 @Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericType> macdStatusListenerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, GenericType> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(macdStatusConsumerFactory());
    factory.setRetryTemplate(retryTemplate());
    factory.setErrorHandler(new KafkaErrHandler());

    return factory;
}

enfin, utilisez le gestionnaire d'erreurs dans config.

    import java.util.List;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.errors.SerializationException;
    import org.springframework.kafka.listener.ErrorHandler;
    import org.springframework.kafka.listener.MessageListenerContainer;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class KafkaErrHandler implements ErrorHandler {
    
        /**
         * Method prevents serialization error freeze
         * 
         * @param e
         * @param consumer
         */
        private void seekSerializeException(Exception e, Consumer<?, ?> consumer) {
            String p = ".*partition (.*) at offset ([0-9]*).*";
            Pattern r = Pattern.compile(p);
    
            Matcher m = r.matcher(e.getMessage());
    
            if (m.find()) {
                int idx = m.group(1).lastIndexOf("-");
                String topics = m.group(1).substring(0, idx);
                int partition = Integer.parseInt(m.group(1).substring(idx));
                int offset = Integer.parseInt(m.group(2));
    
                TopicPartition topicPartition = new TopicPartition(topics, partition);
    
                consumer.seek(topicPartition, (offset + 1));
    
                log.info("Skipped message with offset {} from partition {}", offset, partition);
            }
        }
    
        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) {
            log.error("Error in process with Exception {} and the record is {}", e, record);
    
            if (e instanceof SerializationException)
                seekSerializeException(e, consumer);
        }
    
        @Override
        public void handle(Exception e, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                MessageListenerContainer container) {
            log.error("Error in process with Exception {} and the records are {}", e, records);
    
            if (e instanceof SerializationException)
                seekSerializeException(e, consumer);
    
        }
    
        @Override
        public void handle(Exception e, ConsumerRecord<?, ?> record) {
            log.error("Error in process with Exception {} and the record is {}", e, record);
        }
    
    } 

Cependant, l'analyse de la chaîne d'erreur pour obtenir la parition, la rubrique et le décalage n'est pas recommandée. Si quelqu'un a une meilleure solution, veuillez poster ici.


0 commentaires

0
votes

Utilisez ErrorHandlingDeserializer.

Lorsqu'un désérialiseur ne parvient pas à désérialiser un message, Spring n'a aucun moyen de gérer le problème car il se produit avant le retour de poll (). Pour résoudre ce problème, la version 2.2 a introduit le ErrorHandlingDeserializer. Ce désérialiseur délègue à un véritable désérialiseur (clé ou valeur). Si le délégué ne parvient pas à désérialiser le contenu de l'enregistrement, ErrorHandlingDeserializer renvoie à la place une DeserializationException, contenant la cause et les octets bruts. Lors de l'utilisation d'un MessageListener au niveau de l'enregistrement, si la clé ou la valeur contient une DeserializationException, le ErrorHandler du conteneur est appelé avec le ConsumerRecord qui a échoué. Lors de l'utilisation d'un BatchMessageListener, l'enregistrement ayant échoué est transmis à l'application avec les enregistrements restants du lot, il est donc de la responsabilité de l'écouteur de l'application de vérifier si la clé ou la valeur d'un enregistrement particulier est une DeserializationException.

Vous pouvez utiliser le constructeur DefaultKafkaConsumerFactory qui prend les objets de désérialiseur de clé et de valeur et les connecte dans ErrorHandlingDeserializer approprié configuré avec les délégués appropriés. Vous pouvez également utiliser les propriétés de configuration du consommateur qui sont utilisées par ErrorHandlingDeserializer pour instancier les délégués. Les noms de propriété sont ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS et ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS; la valeur de la propriété peut être un nom de classe ou de classe

package com.mypackage.app.config;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import com.mypacakage.app.model.kafka.message.KafkaEvent;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import lombok.extern.slf4j.Slf4j;

@EnableKafka
@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String servers;

    @Value("${listener.group-id}")
    private String groupId;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> ListenerFactory() {
    
        ConcurrentKafkaListenerContainerFactory<String, KafkaEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        factory.setRetryTemplate(retryTemplate());
        factory.setErrorHandler(((exception, data) -> {
            /*
             * here you can do you custom handling, I am just logging it same as default
             * Error handler does If you just want to log. you need not configure the error
             * handler here. The default handler does it for you. Generally, you will
             * persist the failed records to DB for tracking the failed records.
             */
            log.error("Error in process with Exception {} and the record is {}", exception, data);
        }));

        return factory;

    }

    @Bean
    public ConsumerFactory<String, KafkaEvent> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE,
                "com.mypackage.app.model.kafka.message.KafkaEvent");
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.mypackage.app");

        return new DefaultKafkaConsumerFactory<>(config);
    }

    private RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        /*
         * here retry policy is used to set the number of attempts to retry and what
         * exceptions you wanted to try and what you don't want to retry.
         */
        retryTemplate.setRetryPolicy(retryPolicy());

        return retryTemplate;
    }

    private SimpleRetryPolicy retryPolicy() {
        Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();

        // the boolean value in the map determines whether exception should be retried
        exceptionMap.put(IllegalArgumentException.class, false);
        exceptionMap.put(TimeoutException.class, true);
        exceptionMap.put(ListenerExecutionFailedException.class, true);

        return new SimpleRetryPolicy(3, exceptionMap, true);
    }
}


0 commentaires