J'ai besoin de configurer la politique de rétention d'un sujet particulier lors de la création. J'ai essayé de chercher une solution, je ne pouvais trouver que la commande de modification de niveau de commande comme ci-dessous
./bin/kafka-topics.sh --zookeeper localhost: 2181 --alter --topic my-topic --config retention.ms = 1680000
Quelqu'un peut-il me faire savoir un moyen de le configurer lors de la création, quelque chose comme xml ou la configuration du fichier de propriétés dans spring-mvc.
3 Réponses :
Je suppose que vous pouvez utiliser le client admin ( https://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html ) pour cela. Vous pouvez créer une instance de client Admin dans votre application et utiliser la commande create ou alter topic pour manipuler les configurations de rubrique, y compris la rétention.
Spring Kafka vous permet de créer de nouveaux sujets en déclarant @Bean
s dans le contexte de votre application. Cela nécessitera un bean de type KafkaAdmin
dans le contexte de l'application, qui sera créé automatiquement si vous utilisez Spring Boot. Vous pouvez définir votre sujet comme suit:
Map<String, Object> config = new HashMap<>(); config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); AdminClient client = AdminClient.create(config); ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic"); // Update the retention.ms value ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000"); Map<ConfigResource, Config> updateConfig = new HashMap<>(); updateConfig.put(resource, new Config(Collections.singleton(retentionEntry))); AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET); Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(1); configs.put(resource, Arrays.asList(op)); AlterConfigsResult alterConfigsResult = client.incrementalAlterConfigs(configs); alterConfigsResult.all();
Si vous n'utilisez pas Spring Boot, vous devrez également définir le bean KafkaAdmin
:
@Bean public KafkaAdmin admin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); return new KafkaAdmin(configs); }
Si vous souhaitez modifier la configuration d'un sujet existant, vous devrez utiliser le AdminClient
, voici l'extrait de code pour modifier le AdminClient
retention.ms
au niveau du sujet:
@Bean public NewTopic myTopic() { return TopicBuilder.name("my-topic") .partitions(4) .replicas(3) .config(TopicConfig.RETENTION_MS_CONFIG, "1680000") .build(); }
Merci pour l'exemple de code. @Sergi alterConfigsResult.all () lance-t-il une exception si nous essayons de modifier la rétention d'un sujet inexistant ? Comment savoir si nous modifions un sujet existant?
La mise à jour de la configuration d'un sujet existant ne lève aucune exception. Vous pouvez utiliser la méthode describeConfigs pour obtenir la configuration actuelle d'un sujet existant
Pour créer une rubrique à l'aide d' AdminClient
programmation avec le temps de rétention spécifié, procédez comme suit:
NewTopic topic = new NewTopic(topicName, numPartitions, replicationFactor); topic.configs(Map.of(TopicConfig.RETENTION_MS_CONFIG, retentionMs.toString())); adminClient.createTopics(List.of(topic));