5
votes

Y a-t-il des problèmes avec cette façon de démarrer une boucle infinie dans une application Spring Boot?

J'ai une application Spring Boot et elle doit traiter certaines données de streaming Kafka. J'ai ajouté une boucle infinie à une classe CommandLineRunner qui s'exécutera au démarrage. Il y a un consommateur Kafka qui peut être réveillé. J'ai ajouté un hook d'arrêt avec Runtime.getRuntime (). AddShutdownHook (new Thread (consumer :: wakeup)); . Vais-je rencontrer des problèmes? Y a-t-il une façon plus idiomatique de faire cela au printemps? Dois-je utiliser @Scheduled à la place? Le code ci-dessous est dépouillé des éléments spécifiques de l'implémentation de Kafka, mais par ailleurs complet.

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Properties;


    @Component
    public class InfiniteLoopStarter implements CommandLineRunner {

        private final Logger logger = LoggerFactory.getLogger(this.getClass());

        @Override
        public void run(String... args) {
            Consumer<AccountKey, Account> consumer = new KafkaConsumer<>(new Properties());
            Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup));

            try {
                while (true) {
                    ConsumerRecords<AccountKey, Account> records = consumer.poll(Duration.ofSeconds(10L));
                    //process records
                }
            } catch (WakeupException e) {
                logger.info("Consumer woken up for exiting.");
            } finally {
                consumer.close();
                logger.info("Closed consumer, exiting.");
            }
        }
    }


2 commentaires

Avez-vous envisagé d'utiliser Spring Cloud Stream , qui est en fait conçu pour gérer Kafka et outils similaires?


@jonrsharpe non Je ne l'ai pas fait, j'ai brièvement examiné la bibliothèque Spring for Kafka, mais je n'ai pas encore rencontré celle-ci. Je vais jeter un œil, merci.


3 Réponses :


4
votes

Je ne suis pas sûr que vous rencontriez des problèmes, mais c'est un peu sale - Spring a vraiment un support intégré pour travailler avec Kafka, donc je me pencherais vers cela (il y a beaucoup de documentation à ce sujet sur le Web , mais une belle est: https://www.baeldung.com/spring-kafka ).

Vous aurez besoin de la dépendance suivante:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println("Received Message: " + message"+ "from partition: " + partition);
}

La configuration est aussi simple en ajoutant l'annotation @EnableKafka à une classe de configuration, puis configuration des beans Listener et ConsumerFactory

Une fois configuré, vous pouvez facilement configurer un consommateur comme suit:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>


0 commentaires

1
votes

L'implémentation semble correcte mais l'utilisation de CommandLineRunner n'est pas faite pour cela. CommandLineRunner est utilisé pour exécuter une tâche au démarrage une seule fois. Du point de vue du design, ce n'est pas très élégant. Je préférerais utiliser le composant d'adaptateur d'intégration de ressort avec kafka. Vous pouvez trouver un exemple ici https://github.com/raphaelbrugier/spring-integration-kafka-sample/blob/master/src/main/java/com/github/rbrugier/esb/consumer/Consumer.java .


0 commentaires

1
votes

Pour répondre à ma propre question, j'ai jeté un coup d'œil aux bibliothèques d'intégration Kafka comme Spring-Kafka et Spring Cloud Stream, mais l'intégration avec le registre de schémas de Confluent n'est pas terminée ou pas tout à fait claire pour moi. C'est tout simplement suffisant pour les primitives, mais nous en avons besoin pour les objets Avro typés qui sont validés par le registre de schémas. J'ai maintenant implémenté une solution indépendante de Kafka, basée sur la réponse à Spring Boot - Meilleur moyen de démarrer un thread d'arrière-plan lors du déploiement

Le code final ressemble à ceci:

@Component
public class AccountStreamConsumer implements DisposableBean, Runnable {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final AccountService accountService;
    private final KafkaProperties kafkaProperties;
    private final Consumer<AccountKey, Account> consumer;

    @Autowired
    public AccountStreamConsumer(AccountService accountService, KafkaProperties kafkaProperties,
                                 ConfluentProperties confluentProperties) {

        this.accountService = accountService;
        this.kafkaProperties = kafkaProperties;

        if (!kafkaProperties.getEnabled()) {
            consumer = null;
            return;
        }

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentProperties.getSchemaRegistryUrl());
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaProperties.getSecurityProtocolConfig());
        props.put(SaslConfigs.SASL_MECHANISM, kafkaProperties.getSaslMechanism());
        props.put(SaslConfigs.SASL_JAAS_CONFIG, PlainLoginModule.class.getName() + " required username=\"" + kafkaProperties.getUsername() + "\" password=\"" + kafkaProperties.getPassword() + "\";");
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getAccountConsumerGroupId());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(kafkaProperties.getAccountsTopicName()));

        Thread thread = new Thread(this);
        thread.start();
    }

    @Override
    public void run() {
        if (!kafkaProperties.getEnabled())
            return;

        logger.debug("Started account stream consumer");
        try {
            //noinspection InfiniteLoopStatement
            while (true) {
                ConsumerRecords<AccountKey, Account> records = consumer.poll(Duration.ofSeconds(10L));
                List<Account> accounts = new ArrayList<>();
                records.iterator().forEachRemaining(record -> accounts.add(record.value()));
                if (accounts.size() != 0)
                    accountService.store(accounts);
            }
        } catch (WakeupException e) {
            logger.info("Account stream consumer woken up for exiting.");
        } finally {
            consumer.close();
        }
    }

    @Override
    public void destroy() {
        if (consumer != null)
            consumer.wakeup();

        logger.info("Woke up account stream consumer, exiting.");
    }
}


0 commentaires