4
votes

Valider de manière asynchrone un message juste après la lecture du sujet

J'essaie de valider un message juste après l'avoir lu à partir du sujet. J'ai suivi ce lien ( https://www.confluent.io / blog / apache-kafka-spring-boot-application ) pour créer un consommateur Kafka avec spring. Normalement, cela fonctionne parfaitement et le consommateur reçoit le message et attend qu'une autre personne entre dans la file d'attente. Mais le problème est que lorsque je traite ces messages, cela prend beaucoup de temps (environ 10 minutes), la file d'attente kafka pense que le message n'est pas consommé (validé) et les consommateurs le relisent encore et encore. Je dois dire que lorsque mon temps de traitement est inférieur à 5 minutes, cela fonctionne bien, mais quand il dure plus longtemps, cela ne valide pas le message.

J'ai cherché des réponses mais cela ne m'aide pas car je n'utilise pas le même code source (et bien sûr une structure différente). J'ai essayé d'envoyer des méthodes asynchrones et aussi de valider le message de manière asynchrone mais j'ai échoué. Certaines des sources sont:

Spring Boot Kafka: La validation ne peut pas être terminée car le groupe a déjà rééquilibré

https://www.confluent.io/blog/tutorial-getting-started -avec-le-nouveau-client-apache-kafka-0-9-consommateur /

https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly- o

Kafka 0.10 Le consommateur Java ne lit pas le message du sujet

https://github.com/confluentinc/confluent-kafka-dotnet/issues/470

La classe principale est ici:

@Service
public class Consumer {

@Autowired
    AppPropert prop;

   Consumer cons;


   @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();

        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        //factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProperties());
    }

     @Bean
    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<>();
        Properties  propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works)
        //props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers"));
        //props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id"));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        //props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer"));
        //props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer")); 
        return props;
    }




    @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
    public void consume(String message) throws IOException {
        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
        acknowledgment.acknowledge();// commit immediately
        Properties  props=prop.startProp();//just getting my properties from my config-file
        ControllerPRO pro = new ControllerPRO();

        List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
        try {

            CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method 
            /*This works fine when the processLaunch method takes less than 5 minutes, 
            if it takes longer the consumer will get the same message from the topic and start again with this operation 
            */

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("End of consumer method ");

    }

    }

``````````````````````````````````````````````````````````

La classe consommateur (où je dois valider mon message)

@Service
public class Consumer {

@Autowired
    AppPropert prop;

   Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
    public void consume(String message) throws IOException {
        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */

        Properties  props=prope.startProp();//just getting my properties from my config-file
        ControllerPRO pro = new ControllerPRO();

        List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
        try {

            CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method 
            /*This works fine when the processLaunch method takes less than 5 minutes, 
            if it takes longer the consumer will get the same message from the topic and start again with this operation 
            */

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("End of consumer method ");

    }

    }


Comment puis-je valider le message juste après l'avoir lu dans la file d'attente.

Je veux être sûr que lorsque je reçois le message, je le valide immédiatement. À l'heure actuelle, le message est validé lorsque j'ai terminé d'exécuter la méthode juste après le (System.out.println). Est-ce que quelqu'un peut me dire comment faire cela?

----- update -------

Désolé pour la réponse tardive, mais comme @GirishB l'a suggéré, j'ai recherché la configuration de GirishB mais je ne vois pas où je peux définir le sujet que je veux lire / écouter à partir de mon fichier de configuration (applications.yml). Tous les exemples que je vois utilisent une structure similaire à celle-ci ( http: // tutoriels .jenkov.com / java-util-concurrent / blockingqueue.html ). Existe-t-il une option permettant de lire une rubrique déclarée sur un autre serveur? Utilisation de quelque chose de similaire à ce @KafkaListener (topics = "$ {app.topic.pro}", groupId = "group_id")

=========== SOLUTION 1 === ======================================

J'ai suivi les conseils de @victor gallet et inclus la déclaration des propriétés de confiseur afin d'accueillir l'objet "Acknowledgement" dans la méthode de consommation. J'ai également suivi ce lien ( https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/ grussell-spring-kafka-master / s1p-kafka / src / main / java / org / s1p / CommonConfiguration.java ) pour obtenir toutes les méthodes que j'ai utilisées pour déclarer et définir toutes les propriétés (consumerProperties, consumerFactory, kafkaListenerContainerFactory). Le seul problème que j'ai trouvé est le "new SeekToCurrentErrorHandler ()" déclaration car j'obtiens une erreur et pour le moment je ne suis pas en mesure de la résoudre (ce serait bien si quelqu'un me l'explique).

@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaApp .class, args);
    }



1 commentaires

si votre temps de traitement est élevé et que vous ne souhaitez vous engager qu'APRÈS la réussite du processus, envisagez d'ajuster ces paramètres: max.poll.interval.ms , max.poll.interval.ms et max.poll.records voir kafka.apache.org/documentation


3 Réponses :


1
votes

Vous pouvez utiliser un java.util.concurrent.BlockingQueue pour pousser le message lorsque vous consommez et validez le décalage Kafka. Ensuite, en utilisant un autre thread, récupérez le message de blockingQueue et processus. De cette façon, vous n'avez pas à attendre la fin du traitement.


0 commentaires

3
votes

Vous devez modifier la configuration de votre consommateur avec la propriété enable.auto.commit définie sur false:

@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message, Acknowledgment acknowledgment) {
   // commit immediately
    acknowledgment.acknowledge();
}

Ensuite, vous devez modifier la fabrique Spring Kafka Listener et définissez le mode d'acquittement sur MANUAL_IMMEDIATE . Voici un exemple de ConcurrentKafkaListenerContainerFactory :

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setErrorHandler(new SeekToCurrentErrorHandler());
    return factory;
}

Comme expliqué dans la documentation, MANUAL_IMMEDIATE signifie: Validez le décalage immédiatement lorsque le La méthode Acknowledgment.acknowledge () est appelée par l'auditeur.

Vous pouvez trouver toutes les méthodes de validation ici .

Ensuite, dans votre code auditeur, vous pouvez valider l'offset manuellement en ajoutant un objet Acknowledgement , pour exemple:

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);


0 commentaires

0
votes

properties.put (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Après avoir défini la propriété ci-dessus, si vous souhaitez traiter par lots, vous pouvez suivre les configurations suivantes.

    factory.getContainerProperties().setAckOnError(true);
    //specifying batch error handler because i have enabled to listen records in batch
    factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
    factory.setBatchListener(true);
    factory.getContainerProperties().setSyncCommits(false);

// vous pouvez définir Manuel ou MANUAL_IMMEDIATE car // KafkaMessageListenerContainer invoque // ConsumerBatchAcknowledgment pour tout type d'ackmode manuel

     factory.getContainerProperties().setAckMode(AckMode.MANUAL);


0 commentaires