J'ai une application Spring Boot qui définit:
(J'ai changé le suffixe pour éviter toute confusion possible, et créé les sujets à la main, car sinon je recevais un avertissement, STREAM_TOPIC_IN_ xxx = LEADER_NOT_AVAILABLE et le flux ne fonctionnerait pas pendant une minute environ.)
Le premier auditeur et le flux semblent fonctionner, mais lorsque l'auditeur sur le STREAM_OUT_TOPIC essaie de désérialiser le message, j'obtiens l'exception ci-dessous. Je fournis le serde dans le flux avec Produced.with. Que dois-je faire pour que l'auditeur sache le type vers lequel désérialiser?
Journal
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: foo auto-offset-reset: latest key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 properties: spring.json.trusted.packages: com.teramedica.kafakaex001web.model spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer streams: application-id: kafka9000-v0.1 properties: # properties not explicitly handled by KafkaProperties.streams default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.springframework.kafka.support.serializer.JsonSerde spring.json.trusted.packages: com.teramedica.kafakaex001web.model
Voici la configuration :
REST (spring mvc):
@Bean public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) { KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN); stream.peek((k, greeting) -> { logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent()); }) .map((k, v) -> new KeyValue<>(k, new GreetingResponse(v))) .to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class))); return stream; } @KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler") public void listenForGreetingResponse(ConsumerRecord<String, GreetingResponse> cr) throws Exception { logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString()); } @KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar") public void listenForGreetingResponses(ConsumerRecord<String, Greeting> cr) throws Exception { logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString()); }
Kafka Config (spring-kafka): strong >
@RequestMapping("/greeting") public Greeting greeting(@RequestParam(value = "name", defaultValue = "World") String name) { Greeting gr = new Greeting(counter.incrementAndGet(), String.format(msgTemplate, name)); this.kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", gr); logger.debug("Sending a Kafka Message"); return gr; }
application.yml
11 Mar 2019 14:34:00,194 DEBUG [KafkaMessageController [] http-nio-8080-exec-1] Sending a Kafka Message 11 Mar 2019 14:34:00,236 INFO [KafkaConfig [] kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1] -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World! 11 Mar 2019 14:34:00,241 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: ConsumerRecord: {}ConsumerRecord(topic = STREAM_TOPIC_IN_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = com.teramedica.kafakaex001web.model.Greeting@7b6c8fcc) 11 Mar 2019 14:34:00,243 INFO [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1-producer] Cluster ID: y48IEZaGQWKcWDVGf4mD6g 11 Mar 2019 14:34:00,367 ERROR [LoggingErrorHandler [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] Error while processing: ConsumerRecord(topic = STREAM_TOPIC_OUT_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 48, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [ REDACTED ])], isReadOnly = false), key = 1, value = null) org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2.deserializationException(ErrorHandlingDeserializer2.java:204) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
4 Réponses :
Voir la documentation .
Plus précisément ...
JsonDeserializer.VALUE_DEFAULT_TYPE
: Type de secours pour la désérialisation des valeurs si aucune information d'en-tête n'est présente.
C'est spring.json.value.default.type
Vous pouvez également définir spring.json.use.type.headers
(true par défaut) pour empêcher même de rechercher des en-têtes.
Le désérialiseur approuve automatiquement le package du type par défaut, il n'est donc pas nécessaire de l'ajouter ici.
< EDIT
Cependant, consultez également Spring Messaging Message Conversion a>.
Utilisez un BytesDeserializer
et un BytesJsonMessageConverter
et le framework passera le type de paramètre de méthode comme cible pour la conversion.
Le désérialiseur approuve automatiquement le package du type par défaut, il n'est donc pas nécessaire de l'ajouter ici également.
J'aurais dû préciser que je veux autre que la valeur par défaut. N'est-il pas courant de vouloir écouter différents types sur différents sujets? Dans tous les cas, pourquoi n'y a-t-il pas d'informations d'en-tête présentes en premier lieu?
De plus, je pense avoir trouvé une solution: le KafkaListener a un champ de propriétés qui permet de spécifier des propriétés qui remplacent celles de l'usine grand public, je peux donc ajouter ce qui suit à l'annotation KafkaListener: properties = {ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + " : com.teramedica.kafakaex001web.GreetingResponseDeserializer "}
, où GreetingResponseDeserializer étend JsonDeserializer et construit avec super (GreetingResponse.class)
Cela semble fonctionner. Je me demande pourquoi ce n'est pas plus directement pris en charge dans l'annotation.
Ah - ok - au lieu d'un JsonDeserializer
utilisez un StringDeserializer
(ou BytesDeserializer
) avec un StringJsonMessageConverter
(ou BytesJsonMessageConverter
) (les versions Bytes ...
sont plus efficaces. Ensuite, le framework indique au convertisseur le type trouvé dans la méthode d'écoute. Encore une fois, voir la documentation ,
Si la configuration automatique de démarrage trouve un convertisseur @Bean
, il le connectera automatiquement à l'usine de conteneurs.
Donc, j'ai ajouté @Bean RecordMessageConverter messageConverter () {return new StringJsonMessageConverter (); }
et a changé le désérialiseur: spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
(j'utilise ErrorHandlingDeserializer2). Cela a fonctionné mais a "cassé" un auditeur qui avait un enregistrement ConsumerRecord
comme paramètre, dans le sens où ce n'est pas un message d'accueil, c'est une chaîne json. Ce qui est en fait logique, rétrospectivement, puisque le désérialiseur n'analyse pas le json, le messageConverter l'est. Bref, merci!
Ah. Oui, vous pouvez utiliser Message
"Répondre" à ma propre question en grande partie pour consolider les informations dans les commentaires vers et depuis @GaryRussell, mais en gros, il a fourni la meilleure réponse. En bref, j'ai fait ce qui suit:
Une autre chose: par défaut, ajouter simplement le messageConverter l'ajoute également au kafkaTemplate configuré automatiquement lors de l'utilisation de la configuration automatique de spring boot. Cela ne semble pas être un problème lors de l'appel de kafkaTemplate.send (K9000Consts.STREAM_TOPIC_IN, "1", salutation)
, bien que je pense que cela peut être si vous utilisez send (Message).
Voici une configuration fonctionnelle, dans la mesure où je reçois les messages comme prévu avec une configuration minimale
application.yml:
13 Mar 2019 09:56:57,884 DEBUG [KafkaMessageController [] http-nio-8080-exec-1] Sending a Kafka Message 13 Mar 2019 09:56:57,913 INFO [KafkaConfig [] kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1] -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World! 13 Mar 2019 09:56:57,919 INFO [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1-producer] Cluster ID: 8nREAmTCS0SZT-NzWsCacQ 13 Mar 2019 09:56:57,919 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: Greeting: Hello, World! 13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: Record: ConsumerRecord(topic = STREAM_TOPIC_IN_SSS, partition = 0, offset = 23, CreateTime = 1552489017878, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 116, 101, 114, 97, 109, 101, 100, 105, 99, 97, 46, 107, 97, 102, 97, 107, 97, 101, 120, 48, 48, 49, 119, 101, 98, 46, 109, 111, 100, 101, 108, 46, 71, 114, 101, 101, 116, 105, 110, 103])], isReadOnly = false), key = 1, value = {"id":1,"content":"Hello, World!"}) 13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener: From Headers: topic: STREAM_TOPIC_IN_SSS, partition: 0, key: 1 13 Mar 2019 09:56:57,920 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener:: From Record: topic: STREAM_TOPIC_IN_SSS, parition: 0, key: 1 13 Mar 2019 09:56:57,921 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] STREAM_IN_TOPIC Listener:: record value: {"id":1,"content":"Hello, World!"}, class: class java.lang.String 13 Mar 2019 09:56:58,030 INFO [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] STREAM_OUT_TOPIC Listener : GreetingResponse id: 1000, response: Hello, World!, yourself
KafkaConfig:
@Bean RecordMessageConverter messageConverter() { return new StringJsonMessageConverter(); } ... @Bean public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) { KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN); stream.peek((k, greeting) -> { logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent()); }) .map((k, v) -> new KeyValue<>(k, new GreetingResponse(v))) .to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class))); return stream; } @KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler") public void listenForGreetingResponse(GreetingResponse gr) throws Exception { // logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString()); logger.info("STREAM_OUT_TOPIC Listener : GreetingResponse is {}" + gr); } @KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar") public void listenForGreetingResponses(@Payload Greeting gr, ConsumerRecord<String, String> record, // <--- NOTE: String, NOT Greeting @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) throws Exception { //logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString()); logger.info("STREAM_IN_TOPIC Listener: Greeting: {}", gr.getContent()); logger.info("STREAM_IN_TOPIC Listener: From Headers: topic: {}, partition: {}, key: {}", topic, partition, key); logger.info("STREAM_IN_TOPIC Listener:: From Record: topic: {}, parition: {}, key: {}", record.topic(), record.partition(), record.key()); logger.info("STREAM_IN_TOPIC Listener:: record value: {}, class: {}", record.value(), record.value().getClass() ); } @Bean public KafkaListenerErrorHandler myTopicErrorHandler() { return (m, e) -> { logger.error("Got an error {}", e.getMessage()); return "some info about the failure"; }; }
Et la sortie d'un message est:
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: foo auto-offset-reset: latest key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 properties: spring.json.trusted.packages: com.teramedica.kafakaex001web.model spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer streams: application-id: kafka9000-v0.1 properties: # properties not explicitly handled by KafkaProperties.streams default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.springframework.kafka.support.serializer.JsonSerde spring.json.trusted.packages: com.teramedica.kafakaex001web.model
tu as sauvé ma journée!
Ce n'est pas une réponse; mais cela pourrait aider les personnes qui arrivent ici à partir des moteurs de recherche.
Si vous rencontrez cette exception lors de l'exécution de l'application KafkaStreams.
Remarque 1: assurez-vous d'avoir initialisé jsonSerde comme décrit ci-dessous:
import org.springframework.kafka.support.serializer.JsonSerde; new JsonSerde<JsonNode>(); // This is wrong
Remarque 2: Erreur la plus courante
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.connect.json.JsonDeserializer; import org.apache.kafka.connect.json.JsonSerializer; Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); Serializer<JsonNode> jsonSerializer = new JsonSerializer(); Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
Mon cas, le problème, j'avais déjà publié différents types de messages sur le sujet Kafka et obtenu cette exception.
Pour y remédier.
J'ai créé un nouveau sujet et publié des messages là-bas. Puis le consommateur a commencé à parler de ce sujet et cela a bien fonctionné.