3
votes

Compatibilité Spring Kafka, Spring Cloud Stream et Avro Octet magique inconnu

J'ai un problème avec la désérialisation des messages des sujets Kafka. Les messages ont été sérialisés à l'aide de spring-cloud-stream et Apache Avro. Je les lis en utilisant Spring Kafka et j'essaye de les désérialiser. Si j'utilise spring-cloud à la fois pour produire et consommer les messages, je peux désérialiser les messages correctement. Le problème est lorsque je les consomme avec Spring Kafka et que j'essaye de les désérialiser.

J'utilise un registre de schémas (à la fois le registre de schémas à démarrage à ressort pour le développement, ainsi qu'un schéma Confluent en production), mais les problèmes de désérialisation semblent à se produire avant que l'événement appelle le registre de schéma.

Il est difficile de publier tout le code pertinent sur cette question, je l'ai donc publié dans un dépôt dans git hub: https://github.com/robjwilkins/avro-example

L'objet que j'envoie sur le sujet est juste un simple pojo:

public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe {
....
private ByteBuffer getByteBuffer(byte[] payload) {
  ByteBuffer buffer = ByteBuffer.wrap(payload);
  if (buffer.get() != 0) {
    throw new SerializationException("Unknown magic byte!");
  } else {
    return buffer;
  }
}

Le code qui produit des messages sur Kafka ressemble à ceci:

2019-01-30 20:01:39.900 ERROR 30876 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test-request-0 at offset 43. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

et application.yaml:

spring:
  application.name: avro-consumer
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: avro-consumer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
#      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        schema.registry.url: http://localhost:8071

Ensuite j'ai un consommateur:

@Slf4j
@Component
public class TopicListener {

    @KafkaListener(topics = {"test-request"})
    public void listenForMessage(ConsumerRecord<String, Request> consumerRecord) {
        log.info("listenForMessage. got a message: {}", consumerRecord);
        consumerRecord.headers().forEach(header -> log.info("header. key: {}, value: {}", header.key(), asString(header.value())));
    }

    private String asString(byte[] byteArray) {
        return new String(byteArray, Charset.defaultCharset());
    }
}

Et le projet qui consomme a la configuration application.yaml: p >

spring:
  application.name: avro-producer
  kafka:
    bootstrap-servers: localhost:9092
    consumer.group-id: avro-producer
  cloud:
    stream:
      schema-registry-client.endpoint: http://localhost:8071
      schema.avro.dynamic-schema-generation-enabled: true
      kafka:
        binder:
          brokers: ${spring.kafka.bootstrap-servers}
      bindings:
        test-request:
          destination: test-request
          contentType: application/*+avro

Lorsque le consommateur reçoit un message, il en résulte une exception:

@EnableBinding(MessageChannels.class)
@Slf4j
@RequiredArgsConstructor
@RestController
public class ProducerController {

  private final MessageChannels messageChannels;

  @GetMapping("/produce")
  public void produceMessage() {
    Request request = new Request();
    request.setMessage("hello world");
    Message<Request> requestMessage = MessageBuilder.withPayload(request).build();
    log.debug("sending message");
    messageChannels.testRequest().send(requestMessage);
  }
}

J'ai parcouru le code de désérialisation pour t Le point où cette exception est lancée

@Data
public class Request {
  private String message;
}

Cela se produit parce que le désérialiseur vérifie le contenu d'octets de l'objet sérialisé (tableau d'octets) et s'attend à ce qu'il soit 0, mais ce n'est pas . D'où la raison pour laquelle je me demande si le MessageConverter spring-cloud-stream qui a sérialisé l'objet est compatible avec l'objet io.confluent que j'utilise pour désérialiser l'objet. Et s'ils ne sont pas compatibles, que dois-je faire?

merci pour toute aide.


7 commentaires

Ne modifiez pas la trace de la pile; montrer le tout. Cause par: en eux-mêmes sont inutiles.


Si à tout moment vous avez poussé une valeur de chaîne (ou non-avro) sur le sujet, alors KafkaAvroDeserializer ne pourra pas le lire ...


@GaryRussell Je n'ai pas édité le stacktrace - c'est tout ce qui s'affiche sur la console. Ont également ajouté plus de description du problème


@ cricket_007 - il est possible que j'aie publié un message avec une charge utile de chaîne, cependant j'ai réinitialisé les décalages de sujet aux derniers pour m'assurer que les anciens messages ne sont pas repris.


Eh bien, tout ce que je peux dire, c'est que ce n'est pas une trace de pile appropriée; en plus des entrées Causé par: , une trace de pile normale contient des informations sur la pile d'appels (classes / méthodes / numéros de ligne). C'est pourquoi on l'appelle une trace stack . Si vous ne l'avez pas modifié, votre sous-système de journalisation est peut-être configuré pour supprimer ces informations importantes, ce qui serait vraiment très étrange.


Tout ce que je peux vraiment dire est basé sur mon expérience avec cette erreur. Si vous êtes passé au dernier décalage, le consommateur ne lira rien car il n'y a pas de nouveaux messages. Si vous avez redémarré un producteur en envoyant des données, et alors le consommateur a la même erreur - alors le problème persiste; les messages n'ont pas été sérialisés à l'aide des sérialiseurs Confluent ... Je ne sais pas ce qu'est cette annotation @Data , mais elle n'est pas utilisée par les exemples Confluent github.com/confluentinc/examples/tree/5.1.0-post/clients/avr‌ o


@Data est une annotation Lombok - il crée juste automatiquement du code getter / setter


4 Réponses :


0
votes

Vous devez définir explicitement le désérialiseur, en créant DefaultKafkaConsumerFactory et votre bean TopicListener dans une configuration, quelque chose comme ceci:

@Configuration
@EnableKafka
public class TopicListenerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value(("${spring.kafka.consumer.group-id}"))
private String groupId;


@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.wilkins.avro.consumer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}

@Bean
public TopicListener topicListener() {
    return new TopicListener();
}
}

p >


1 commentaires

Remarque: la question utilise Avro, pas le JsonDeserializer , qui ne peut pas lire ces données



0
votes

Vous pouvez configurer la liaison pour utiliser à la place un sérialiseur Kafka nativement.

Définissez la propriété du producteur useNativeEncoding sur true et configurez le sérialiseur à l'aide du . ..producer.configuration Propriétés Kafka.

MODIFIER

Exemple:

spring:
  cloud:
    stream:
# Generic binding properties
      bindings:
        input:
          consumer:
            use-native-decoding: true
          destination: so54448732
          group: so54448732
        output:
          destination: so54448732
          producer:
            use-native-encoding: true
# Kafka-specific binding properties
      kafka:
        bindings:
          input:
            consumer:
              configuration:
                value.deserializer: com.example.FooDeserializer
          output:
            producer:
              configuration:
                value.serializer: com.example.FooSerializer


4 commentaires

Je vous suggère d'ouvrir un problème ici avec vos suggestions d'améliorations.


il serait très utile si vous pouviez fournir un exemple des propriétés du producteur / consommateur que je devrais définir s'il vous plaît? Merci de votre aide.


Merci pour la réponse. Mon exemple n'utilise pas spring-cloud-stream en tant que consommateur, mais c'est là que j'ai le problème de désérialisation?


Cela n'a pas d'importance; J'ai montré l'utilisation de sérialiseurs / désérialiseurs natifs des deux côtés pour être complet. Votre question porte sur la compatibilité entre le convertisseur de messages et le désérialiseur avro. L'utilisation de l'encodage natif supprime les convertisseurs de messages; la charge utile est envoyée directement au client Kafka en tant que ProducerRecord.value et la sérialisation est effectuée par Kafka; Le printemps n'y prend pas part. Si vous rencontrez toujours des problèmes lors de l'utilisation du codage natif, le problème réside ailleurs, peut-être dans les données elles-mêmes.



1
votes

Le nœud de ce problème est que le producteur utilise spring-cloud-stream pour envoyer des messages à Kafka, mais le consommateur utilise spring-kaka. Les raisons en sont:

  • Le système existant est déjà bien établi et utilise spring-cloud-stream
  • Un nouveau consommateur doit écouter plusieurs sujets en utilisant la même méthode, en se liant uniquement sur une liste csv de noms de sujets
  • Il est nécessaire de consommer une collection de messages à la fois, plutôt qu'individuellement, afin que leur contenu puisse être écrit en masse dans une base de données.

Spring-cloud-stream ne permet pas actuellement au consommateur de lier un auditeur à plusieurs sujets, et il n'y a aucun moyen de consommer une collection de messages à la fois (sauf erreur de ma part). p>

J'ai trouvé une solution qui ne nécessite aucune modification du code producteur qui utilise spring-cloud-stream pour publier des messages sur Kafka. Spring-cloud-stream utilise un MessageConverter pour gérer la sérialisation et la désérialisation. Dans le AbstractAvroMessageConverter , il existe des méthodes: convertFromInternal et convertToInternal qui gèrent la transformation vers / depuis un tableau d'octets. Ma solution était d'étendre ce code (en créant une classe qui étend AvroSchemaRegistryClientMessageConverter ), afin que je puisse réutiliser une grande partie de la fonctionnalité spring-cloud-stream, mais avec une interface accessible depuis mon spring-kafka KafkaListener . J'ai ensuite modifié mon TopicListener pour utiliser cette classe pour effectuer la conversion:

Le convertisseur:

@Slf4j
@Component
@RequiredArgsConstructor
public class TopicListener {

  private final AvroKafkaMessageConverter messageConverter;

  @KafkaListener(topics = {"test-request"})
  public void listenForMessage(ConsumerRecord<?, ?> consumerRecord) {
    log.info("listenForMessage. got a message: {}", consumerRecord);
    Request request = messageConverter.convertFromInternal(
        consumerRecord, Request.class, MimeType.valueOf("application/vnd.*+avr"));
    log.info("request message: {}", request.getMessage());
  }
}

Le TopicListener modifié:

@Component
@Slf4j
public class AvroKafkaMessageConverter extends AvroSchemaRegistryClientMessageConverter {

  public AvroKafkaMessageConverter(SchemaRegistryClient schemaRegistryClient) {
    super(schemaRegistryClient, new NoOpCacheManager());
  }

  public <T> T convertFromInternal(ConsumerRecord<?, ?> consumerRecord, Class<T> targetClass,
      Object conversionHint) {
    T result;
    try {
      byte[] payload = (byte[]) consumerRecord.value();

      Map<String, String> headers = new HashMap<>();
      consumerRecord.headers().forEach(header -> headers.put(header.key(), asString(header.value())));

      MimeType mimeType = messageMimeType(conversionHint, headers);
      if (mimeType == null) {
        return null;
      }

      Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType);
      Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass);

      @SuppressWarnings("unchecked")
      DatumReader<Object> reader = getDatumReader((Class<Object>) targetClass, readerSchema, writerSchema);
      Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null);
      result = (T) reader.read(null, decoder);
    }
    catch (IOException e) {
      throw new RuntimeException("Failed to read payload", e);
    }
    return result;
  }

  private MimeType messageMimeType(Object conversionHint, Map<String, String> headers) {
    MimeType mimeType;
    try {
      String contentType = headers.get(MessageHeaders.CONTENT_TYPE);
      log.debug("contentType: {}", contentType);
      mimeType = MimeType.valueOf(contentType);
    } catch (InvalidMimeTypeException e) {
      log.error("Exception getting object MimeType from contentType header", e);
      if (conversionHint instanceof MimeType) {
        mimeType = (MimeType) conversionHint;
      }
      else {
        return null;
      }
    }
    return mimeType;
  }

  private String asString(byte[] byteArray) {
    String theString = new String(byteArray, Charset.defaultCharset());
    return theString.replace("\"", "");
  }
}

Cette solution ne consomme qu’un message à la fois mais peut être facilement modifiée pour consommer des lots de messages.

La solution complète est ici: https://github.com/robjwilkins/avro-example/tree/develop


0 commentaires

0
votes

Merci d'avoir sauvé ma journée en utilisant le codage natif et le printemps: nuage: stream:

Propriétés de liaison générique

  kafka:
    bindings:
      input:
        consumer:
          configuration:
            value.deserializer: com.example.FooDeserializer
      output:
        producer:
          configuration:
            value.serializer: com.example.FooSerializer

Propriétés de liaison spécifiques à Kafka

  bindings:
    input:
      consumer:
        use-native-decoding: true
      destination: so54448732
      group: so54448732
    output:
      destination: so54448732
      producer:
        use-native-encoding: true


0 commentaires