5
votes

spring kafka Aucune information de type dans les en-têtes et aucun type par défaut fourni

J'ai une application Spring Boot qui définit:

  • un contrôleur REST qui écrit dans une rubrique kafka, STREAM_TOPIC_IN_QQQ
  • un KafkaListener qui lit à partir de STREAM_TOPIC_IN_QQQ (groupId = "bar") et des journaux
  • un KStream qui examine le sujet et le consigne, le convertit en un autre type, puis l'écrit dans STREAM_TOPIC_OUT_QQQ
  • un autre KafkaListener qui lit à partir de STREAM_TOPIC_OUT_QQQ.

(J'ai changé le suffixe pour éviter toute confusion possible, et créé les sujets à la main, car sinon je recevais un avertissement, STREAM_TOPIC_IN_ xxx = LEADER_NOT_AVAILABLE et le flux ne fonctionnerait pas pendant une minute environ.)

Le premier auditeur et le flux semblent fonctionner, mais lorsque l'auditeur sur le STREAM_OUT_TOPIC essaie de désérialiser le message, j'obtiens l'exception ci-dessous. Je fournis le serde dans le flux avec Produced.with. Que dois-je faire pour que l'auditeur sache le type vers lequel désérialiser?

Journal

spring:
kafka:
  bootstrap-servers:  localhost:9092
  consumer:
    group-id: foo
    auto-offset-reset: latest
    key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
    value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
    properties:
      spring.json.trusted.packages: com.teramedica.kafakaex001web.model
      spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
      spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
  producer:
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  streams:
    application-id: kafka9000-v0.1
    properties: # properties not explicitly handled by KafkaProperties.streams
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.trusted.packages: com.teramedica.kafakaex001web.model

Voici la configuration :

REST (spring mvc):

@Bean
public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
    KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
    stream.peek((k, greeting) -> {
        logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
    })
          .map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
          .to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
    return stream;
}

@KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
public void listenForGreetingResponse(ConsumerRecord<String, GreetingResponse> cr) throws Exception {
    logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
}

@KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
public void listenForGreetingResponses(ConsumerRecord<String, Greeting> cr) throws Exception {
    logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
}

Kafka Config (spring-kafka): strong >

@RequestMapping("/greeting")
public Greeting greeting(@RequestParam(value = "name", defaultValue = "World") String name) {
    Greeting gr = new Greeting(counter.incrementAndGet(), String.format(msgTemplate, name));
    this.kafkaTemplate.send(K9000Consts.STREAM_TOPIC_IN, "1", gr);
    logger.debug("Sending a Kafka Message");
    return gr;
}

application.yml

11 Mar 2019 14:34:00,194   DEBUG    [KafkaMessageController [] http-nio-8080-exec-1]   Sending a Kafka Message
11 Mar 2019 14:34:00,236   INFO     [KafkaConfig [] kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1]   -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
11 Mar 2019 14:34:00,241   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener: ConsumerRecord: {}ConsumerRecord(topic = STREAM_TOPIC_IN_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = com.teramedica.kafakaex001web.model.Greeting@7b6c8fcc)
11 Mar 2019 14:34:00,243   INFO     [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0a60795-0258-48d9-8c87-30fa9a97d7b8-StreamThread-1-producer]   Cluster ID: y48IEZaGQWKcWDVGf4mD6g
11 Mar 2019 14:34:00,367   ERROR    [LoggingErrorHandler [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]   Error while processing: ConsumerRecord(topic = STREAM_TOPIC_OUT_QQQ, partition = 0, offset = 0, CreateTime = 1552332840188, serialized key size = 1, serialized value size = 48, headers = RecordHeaders(headers = [RecordHeader(key = springDeserializerExceptionValue, value = [ REDACTED ])], isReadOnly = false), key = 1, value = null)
org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2.deserializationException(ErrorHandlingDeserializer2.java:204) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]


0 commentaires

4 Réponses :


5
votes

Voir la documentation .

Plus précisément ...

JsonDeserializer.VALUE_DEFAULT_TYPE : Type de secours pour la désérialisation des valeurs si aucune information d'en-tête n'est présente.

C'est spring.json.value.default.type

Vous pouvez également définir spring.json.use.type.headers (true par défaut) pour empêcher même de rechercher des en-têtes.

Le désérialiseur approuve automatiquement le package du type par défaut, il n'est donc pas nécessaire de l'ajouter ici.

< EDIT

Cependant, consultez également Spring Messaging Message Conversion .

Utilisez un BytesDeserializer et un BytesJsonMessageConverter et le framework passera le type de paramètre de méthode comme cible pour la conversion.


7 commentaires

Le désérialiseur approuve automatiquement le package du type par défaut, il n'est donc pas nécessaire de l'ajouter ici également.


J'aurais dû préciser que je veux autre que la valeur par défaut. N'est-il pas courant de vouloir écouter différents types sur différents sujets? Dans tous les cas, pourquoi n'y a-t-il pas d'informations d'en-tête présentes en premier lieu?


De plus, je pense avoir trouvé une solution: le KafkaListener a un champ de propriétés qui permet de spécifier des propriétés qui remplacent celles de l'usine grand public, je peux donc ajouter ce qui suit à l'annotation KafkaListener: properties = {ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + " : com.teramedica.kafakaex001web.GreetingResponseDeserializer‌ "} , où GreetingResponseDeserializer étend JsonDeserializer et construit avec super (GreetingResponse.class) Cela semble fonctionner. Je me demande pourquoi ce n'est pas plus directement pris en charge dans l'annotation.


Ah - ok - au lieu d'un JsonDeserializer utilisez un StringDeserializer (ou BytesDeserializer ) avec un StringJsonMessageConverter (ou BytesJsonMessageConverter ) (les versions Bytes ... sont plus efficaces. Ensuite, le framework indique au convertisseur le type trouvé dans la méthode d'écoute. Encore une fois, voir la documentation ,


Si la configuration automatique de démarrage trouve un convertisseur @Bean , il le connectera automatiquement à l'usine de conteneurs.


Donc, j'ai ajouté @Bean RecordMessageConverter messageConverter () {return new StringJsonMessageConverter (); } et a changé le désérialiseur: spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer (j'utilise ErrorHandlingDeserializer2). Cela a fonctionné mais a "cassé" un auditeur qui avait un enregistrement ConsumerRecord comme paramètre, dans le sens où ce n'est pas un message d'accueil, c'est une chaîne json. Ce qui est en fait logique, rétrospectivement, puisque le désérialiseur n'analyse pas le json, le messageConverter l'est. Bref, merci!


Ah. Oui, vous pouvez utiliser Message ou simplement Salutation. Vous pouvez obtenir d'autres champs à partir de ConsumerRecord. Par exemple "@Header (KafkaHeaders.OFFSET) long offset" si vous en avez besoin (ou message.getHeaders (). Get (...) si vous prenez la première option.



5
votes

"Répondre" à ma propre question en grande partie pour consolider les informations dans les commentaires vers et depuis @GaryRussell, mais en gros, il a fourni la meilleure réponse. En bref, j'ai fait ce qui suit:

  • Définir le désérialiseur consommateur sur un StringDeserializer
  • Ajouter un bean messageConverter en tant que StringJsonMessageConverter
  • Dans les méthodes annotées KafkaListener, utilisez simplement le type attendu pour la charge utile
  • Si vous utilisez un ConsumerRecord dans la méthode annotée KafaListener, ne vous attendez PAS à ce qu'il soit du type Payload. Ce sera désormais String (puisque le convertisseur de message, et non le désérialiseur, le fait).

Une autre chose: par défaut, ajouter simplement le messageConverter l'ajoute également au kafkaTemplate configuré automatiquement lors de l'utilisation de la configuration automatique de spring boot. Cela ne semble pas être un problème lors de l'appel de kafkaTemplate.send (K9000Consts.STREAM_TOPIC_IN, "1", salutation) , bien que je pense que cela peut être si vous utilisez send (Message).

Voici une configuration fonctionnelle, dans la mesure où je reçois les messages comme prévu avec une configuration minimale

application.yml:

13 Mar 2019 09:56:57,884   DEBUG    [KafkaMessageController [] http-nio-8080-exec-1]   Sending a Kafka Message
13 Mar 2019 09:56:57,913   INFO     [KafkaConfig [] kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1]   -------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: Hello, World!
13 Mar 2019 09:56:57,919   INFO     [Metadata [] kafka-producer-network-thread | kafka9000-v0.1-b0589cc5-2fab-4b72-81f7-b0d5488c7478-StreamThread-1-producer]   Cluster ID: 8nREAmTCS0SZT-NzWsCacQ
13 Mar 2019 09:56:57,919   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:   Greeting: Hello, World!
13 Mar 2019 09:56:57,920   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:   Record: ConsumerRecord(topic = STREAM_TOPIC_IN_SSS, partition = 0, offset = 23, CreateTime = 1552489017878, serialized key size = 1, serialized value size = 34, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 116, 101, 114, 97, 109, 101, 100, 105, 99, 97, 46, 107, 97, 102, 97, 107, 97, 101, 120, 48, 48, 49, 119, 101, 98, 46, 109, 111, 100, 101, 108, 46, 71, 114, 101, 101, 116, 105, 110, 103])], isReadOnly = false), key = 1, value = {"id":1,"content":"Hello, World!"})
13 Mar 2019 09:56:57,920   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:  From Headers: topic: STREAM_TOPIC_IN_SSS, partition: 0, key: 1
13 Mar 2019 09:56:57,920   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:: From Record: topic: STREAM_TOPIC_IN_SSS, parition: 0, key: 1
13 Mar 2019 09:56:57,921   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1]   STREAM_IN_TOPIC Listener:: record value: {"id":1,"content":"Hello, World!"}, class: class java.lang.String
13 Mar 2019 09:56:58,030   INFO     [KafkaConfig [] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]   STREAM_OUT_TOPIC Listener : GreetingResponse id: 1000, response: Hello, World!, yourself

KafkaConfig:

        @Bean RecordMessageConverter messageConverter() {  return new StringJsonMessageConverter();  }

...
    @Bean
    public KStream<String, Greeting> kStream(StreamsBuilder kStreamBuilder) {
        KStream<String, Greeting> stream = kStreamBuilder.stream(K9000Consts.STREAM_TOPIC_IN);
        stream.peek((k, greeting) -> {
            logger.info("-------------- STREAM_IN_TOPIC peek: Got a greeting in the stream: {}", greeting.getContent());
        })
              .map((k, v) -> new KeyValue<>(k, new GreetingResponse(v)))
              .to(K9000Consts.STREAM_TOPIC_OUT, Produced.with(stringSerde, new JsonSerde<>(GreetingResponse.class)));
        return stream;
    }

    @KafkaListener(topics = K9000Consts.STREAM_TOPIC_OUT, groupId="oofda", errorHandler = "myTopicErrorHandler")
    public void listenForGreetingResponse(GreetingResponse gr) throws Exception {
    //    logger.info("STREAM_OUT_TOPIC Listener : {}" + cr.toString());
        logger.info("STREAM_OUT_TOPIC Listener : GreetingResponse is {}" + gr);
    }

    @KafkaListener(topics = K9000Consts.STREAM_TOPIC_IN, groupId = "bar")
    public void listenForGreetingResponses(@Payload Greeting gr,
            ConsumerRecord<String, String> record, // <--- NOTE: String, NOT Greeting
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) throws Exception {
        //logger.info("STREAM_IN_TOPIC Listener: ConsumerRecord: {}" + cr.toString());
        logger.info("STREAM_IN_TOPIC Listener:   Greeting: {}", gr.getContent());
        logger.info("STREAM_IN_TOPIC Listener:  From Headers: topic: {}, partition: {}, key: {}", topic, partition,
                    key);
        logger.info("STREAM_IN_TOPIC Listener:: From Record: topic: {}, parition: {}, key: {}",
                    record.topic(), record.partition(), record.key());
        logger.info("STREAM_IN_TOPIC Listener:: record value: {}, class: {}", record.value(), record.value().getClass() );
    }

    @Bean
    public KafkaListenerErrorHandler myTopicErrorHandler() {
        return (m, e) -> {
            logger.error("Got an error {}", e.getMessage());
            return "some info about the failure";
        };
    }

Et la sortie d'un message est:

  spring:
    kafka:
      bootstrap-servers:  localhost:9092
      consumer:
        group-id: foo
        auto-offset-reset: latest
        key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        properties:
          spring.json.trusted.packages: com.teramedica.kafakaex001web.model
          spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
          spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
      producer:
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      streams:
        application-id: kafka9000-v0.1
        properties: # properties not explicitly handled by KafkaProperties.streams
          default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
          spring.json.trusted.packages: com.teramedica.kafakaex001web.model


1 commentaires

tu as sauvé ma journée!



0
votes

Ce n'est pas une réponse; mais cela pourrait aider les personnes qui arrivent ici à partir des moteurs de recherche.

Si vous rencontrez cette exception lors de l'exécution de l'application KafkaStreams.

  • Avez-vous enregistré jsonSerde à tous les endroits nécessaires de votre DSL?
  • Avez-vous fourni jsonSerde lors de l'instatioation du magasin d'état?

Remarque 1: assurez-vous d'avoir initialisé jsonSerde comme décrit ci-dessous:

import org.springframework.kafka.support.serializer.JsonSerde;
new JsonSerde<JsonNode>(); // This is wrong

Remarque 2: Erreur la plus courante

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;

Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);


0 commentaires

0
votes

Mon cas, le problème, j'avais déjà publié différents types de messages sur le sujet Kafka et obtenu cette exception.

Pour y remédier.

J'ai créé un nouveau sujet et publié des messages là-bas. Puis le consommateur a commencé à parler de ce sujet et cela a bien fonctionné.


0 commentaires