3
votes

Comment configurer la stratégie de rétention des rubriques Kafka lors de la création dans spring-mvc?

J'ai besoin de configurer la politique de rétention d'un sujet particulier lors de la création. J'ai essayé de chercher une solution, je ne pouvais trouver que la commande de modification de niveau de commande comme ci-dessous

./bin/kafka-topics.sh --zookeeper localhost: 2181 --alter --topic my-topic --config retention.ms = 1680000

Quelqu'un peut-il me faire savoir un moyen de le configurer lors de la création, quelque chose comme xml ou la configuration du fichier de propriétés dans spring-mvc.


0 commentaires

3 Réponses :


1
votes

Je suppose que vous pouvez utiliser le client admin ( https://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html ) pour cela. Vous pouvez créer une instance de client Admin dans votre application et utiliser la commande create ou alter topic pour manipuler les configurations de rubrique, y compris la rétention.


0 commentaires

5
votes

Spring Kafka vous permet de créer de nouveaux sujets en déclarant @Bean s dans le contexte de votre application. Cela nécessitera un bean de type KafkaAdmin dans le contexte de l'application, qui sera créé automatiquement si vous utilisez Spring Boot. Vous pouvez définir votre sujet comme suit:

Map<String, Object> config = new HashMap<>();                
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
                         
AdminClient client = AdminClient.create(config);
                         
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");
            
// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000");
Map<ConfigResource, Config> updateConfig = new HashMap<>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));

AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(1);
configs.put(resource, Arrays.asList(op));

AlterConfigsResult alterConfigsResult = client.incrementalAlterConfigs(configs);
        alterConfigsResult.all();

Si vous n'utilisez pas Spring Boot, vous devrez également définir le bean KafkaAdmin :

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    return new KafkaAdmin(configs);
}

Si vous souhaitez modifier la configuration d'un sujet existant, vous devrez utiliser le AdminClient , voici l'extrait de code pour modifier le AdminClient retention.ms au niveau du sujet:

@Bean
public NewTopic myTopic() {
    return TopicBuilder.name("my-topic")
            .partitions(4)
            .replicas(3)
            .config(TopicConfig.RETENTION_MS_CONFIG, "1680000")
            .build();
}


2 commentaires

Merci pour l'exemple de code. @Sergi alterConfigsResult.all () lance-t-il une exception si nous essayons de modifier la rétention d'un sujet inexistant ? Comment savoir si nous modifions un sujet existant?


La mise à jour de la configuration d'un sujet existant ne lève aucune exception. Vous pouvez utiliser la méthode describeConfigs pour obtenir la configuration actuelle d'un sujet existant



0
votes

Pour créer une rubrique à l'aide d' AdminClient programmation avec le temps de rétention spécifié, procédez comme suit:

NewTopic topic = new NewTopic(topicName, numPartitions, replicationFactor);
topic.configs(Map.of(TopicConfig.RETENTION_MS_CONFIG, retentionMs.toString()));
adminClient.createTopics(List.of(topic));


0 commentaires