J'ai un problème avec la désérialisation des messages des sujets Kafka. Les messages ont été sérialisés à l'aide de spring-cloud-stream et Apache Avro. Je les lis en utilisant Spring Kafka et j'essaye de les désérialiser. Si j'utilise spring-cloud à la fois pour produire et consommer les messages, je peux désérialiser les messages correctement. Le problème est lorsque je les consomme avec Spring Kafka et que j'essaye de les désérialiser.
J'utilise un registre de schémas (à la fois le registre de schémas à démarrage à ressort pour le développement, ainsi qu'un schéma Confluent en production), mais les problèmes de désérialisation semblent à se produire avant que l'événement appelle le registre de schéma.
Il est difficile de publier tout le code pertinent sur cette question, je l'ai donc publié dans un dépôt dans git hub: https://github.com/robjwilkins/avro-example
L'objet que j'envoie sur le sujet est juste un simple pojo:
public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe { .... private ByteBuffer getByteBuffer(byte[] payload) { ByteBuffer buffer = ByteBuffer.wrap(payload); if (buffer.get() != 0) { throw new SerializationException("Unknown magic byte!"); } else { return buffer; } }
Le code qui produit des messages sur Kafka ressemble à ceci:
2019-01-30 20:01:39.900 ERROR 30876 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test-request-0 at offset 43. If needed, please seek past the record to continue consumption. Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
et application.yaml:
spring: application.name: avro-consumer kafka: bootstrap-servers: localhost:9092 consumer: group-id: avro-consumer value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer # value-deserializer: org.apache.kafka.common.serialization.StringDeserializer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: schema.registry.url: http://localhost:8071
Ensuite j'ai un consommateur:
@Slf4j @Component public class TopicListener { @KafkaListener(topics = {"test-request"}) public void listenForMessage(ConsumerRecord<String, Request> consumerRecord) { log.info("listenForMessage. got a message: {}", consumerRecord); consumerRecord.headers().forEach(header -> log.info("header. key: {}, value: {}", header.key(), asString(header.value()))); } private String asString(byte[] byteArray) { return new String(byteArray, Charset.defaultCharset()); } }
Et le projet qui consomme a la configuration application.yaml: p >
spring: application.name: avro-producer kafka: bootstrap-servers: localhost:9092 consumer.group-id: avro-producer cloud: stream: schema-registry-client.endpoint: http://localhost:8071 schema.avro.dynamic-schema-generation-enabled: true kafka: binder: brokers: ${spring.kafka.bootstrap-servers} bindings: test-request: destination: test-request contentType: application/*+avro
Lorsque le consommateur reçoit un message, il en résulte une exception:
@EnableBinding(MessageChannels.class) @Slf4j @RequiredArgsConstructor @RestController public class ProducerController { private final MessageChannels messageChannels; @GetMapping("/produce") public void produceMessage() { Request request = new Request(); request.setMessage("hello world"); Message<Request> requestMessage = MessageBuilder.withPayload(request).build(); log.debug("sending message"); messageChannels.testRequest().send(requestMessage); } }
J'ai parcouru le code de désérialisation pour t Le point où cette exception est lancée
@Data public class Request { private String message; }
Cela se produit parce que le désérialiseur vérifie le contenu d'octets de l'objet sérialisé (tableau d'octets) et s'attend à ce qu'il soit 0, mais ce n'est pas . D'où la raison pour laquelle je me demande si le MessageConverter spring-cloud-stream qui a sérialisé l'objet est compatible avec l'objet io.confluent que j'utilise pour désérialiser l'objet. Et s'ils ne sont pas compatibles, que dois-je faire?
merci pour toute aide.
4 Réponses :
Vous devez définir explicitement le désérialiseur, en créant DefaultKafkaConsumerFactory
et votre bean TopicListener
dans une configuration, quelque chose comme ceci:
@Configuration @EnableKafka public class TopicListenerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value(("${spring.kafka.consumer.group-id}")) private String groupId; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.wilkins.avro.consumer"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public TopicListener topicListener() { return new TopicListener(); } }
p >
Remarque: la question utilise Avro, pas le JsonDeserializer
, qui ne peut pas lire ces données
Vous pouvez configurer la liaison pour utiliser à la place un sérialiseur Kafka nativement.
Définissez la propriété du producteur useNativeEncoding
sur true
et configurez le sérialiseur à l'aide du . ..producer.configuration
Propriétés Kafka.
MODIFIER
Exemple:
spring: cloud: stream: # Generic binding properties bindings: input: consumer: use-native-decoding: true destination: so54448732 group: so54448732 output: destination: so54448732 producer: use-native-encoding: true # Kafka-specific binding properties kafka: bindings: input: consumer: configuration: value.deserializer: com.example.FooDeserializer output: producer: configuration: value.serializer: com.example.FooSerializer
Je vous suggère d'ouvrir un problème ici avec vos suggestions d'améliorations.
il serait très utile si vous pouviez fournir un exemple des propriétés du producteur / consommateur que je devrais définir s'il vous plaît? Merci de votre aide.
Merci pour la réponse. Mon exemple n'utilise pas spring-cloud-stream en tant que consommateur, mais c'est là que j'ai le problème de désérialisation?
Cela n'a pas d'importance; J'ai montré l'utilisation de sérialiseurs / désérialiseurs natifs des deux côtés pour être complet. Votre question porte sur la compatibilité entre le convertisseur de messages et le désérialiseur avro. L'utilisation de l'encodage natif supprime les convertisseurs de messages; la charge utile est envoyée directement au client Kafka en tant que ProducerRecord.value
et la sérialisation est effectuée par Kafka; Le printemps n'y prend pas part. Si vous rencontrez toujours des problèmes lors de l'utilisation du codage natif, le problème réside ailleurs, peut-être dans les données elles-mêmes.
Le nœud de ce problème est que le producteur utilise spring-cloud-stream pour envoyer des messages à Kafka, mais le consommateur utilise spring-kaka. Les raisons en sont:
Spring-cloud-stream ne permet pas actuellement au consommateur de lier un auditeur à plusieurs sujets, et il n'y a aucun moyen de consommer une collection de messages à la fois (sauf erreur de ma part). p>
J'ai trouvé une solution qui ne nécessite aucune modification du code producteur qui utilise spring-cloud-stream pour publier des messages sur Kafka. Spring-cloud-stream utilise un MessageConverter
pour gérer la sérialisation et la désérialisation. Dans le AbstractAvroMessageConverter
, il existe des méthodes: convertFromInternal
et convertToInternal
qui gèrent la transformation vers / depuis un tableau d'octets. Ma solution était d'étendre ce code (en créant une classe qui étend AvroSchemaRegistryClientMessageConverter
), afin que je puisse réutiliser une grande partie de la fonctionnalité spring-cloud-stream, mais avec une interface accessible depuis mon spring-kafka KafkaListener
. J'ai ensuite modifié mon TopicListener pour utiliser cette classe pour effectuer la conversion:
Le convertisseur:
@Slf4j @Component @RequiredArgsConstructor public class TopicListener { private final AvroKafkaMessageConverter messageConverter; @KafkaListener(topics = {"test-request"}) public void listenForMessage(ConsumerRecord<?, ?> consumerRecord) { log.info("listenForMessage. got a message: {}", consumerRecord); Request request = messageConverter.convertFromInternal( consumerRecord, Request.class, MimeType.valueOf("application/vnd.*+avr")); log.info("request message: {}", request.getMessage()); } }
Le TopicListener
modifié:
@Component @Slf4j public class AvroKafkaMessageConverter extends AvroSchemaRegistryClientMessageConverter { public AvroKafkaMessageConverter(SchemaRegistryClient schemaRegistryClient) { super(schemaRegistryClient, new NoOpCacheManager()); } public <T> T convertFromInternal(ConsumerRecord<?, ?> consumerRecord, Class<T> targetClass, Object conversionHint) { T result; try { byte[] payload = (byte[]) consumerRecord.value(); Map<String, String> headers = new HashMap<>(); consumerRecord.headers().forEach(header -> headers.put(header.key(), asString(header.value()))); MimeType mimeType = messageMimeType(conversionHint, headers); if (mimeType == null) { return null; } Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType); Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass); @SuppressWarnings("unchecked") DatumReader<Object> reader = getDatumReader((Class<Object>) targetClass, readerSchema, writerSchema); Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null); result = (T) reader.read(null, decoder); } catch (IOException e) { throw new RuntimeException("Failed to read payload", e); } return result; } private MimeType messageMimeType(Object conversionHint, Map<String, String> headers) { MimeType mimeType; try { String contentType = headers.get(MessageHeaders.CONTENT_TYPE); log.debug("contentType: {}", contentType); mimeType = MimeType.valueOf(contentType); } catch (InvalidMimeTypeException e) { log.error("Exception getting object MimeType from contentType header", e); if (conversionHint instanceof MimeType) { mimeType = (MimeType) conversionHint; } else { return null; } } return mimeType; } private String asString(byte[] byteArray) { String theString = new String(byteArray, Charset.defaultCharset()); return theString.replace("\"", ""); } }
Cette solution ne consomme qu’un message à la fois mais peut être facilement modifiée pour consommer des lots de messages.
La solution complète est ici: https://github.com/robjwilkins/avro-example/tree/develop
Merci d'avoir sauvé ma journée en utilisant le codage natif et le printemps: nuage: stream:
kafka: bindings: input: consumer: configuration: value.deserializer: com.example.FooDeserializer output: producer: configuration: value.serializer: com.example.FooSerializer
bindings: input: consumer: use-native-decoding: true destination: so54448732 group: so54448732 output: destination: so54448732 producer: use-native-encoding: true
Ne modifiez pas la trace de la pile; montrer le tout.
Cause par:
en eux-mêmes sont inutiles.Si à tout moment vous avez poussé une valeur de chaîne (ou non-avro) sur le sujet, alors
KafkaAvroDeserializer
ne pourra pas le lire ...@GaryRussell Je n'ai pas édité le stacktrace - c'est tout ce qui s'affiche sur la console. Ont également ajouté plus de description du problème
@ cricket_007 - il est possible que j'aie publié un message avec une charge utile de chaîne, cependant j'ai réinitialisé les décalages de sujet aux derniers pour m'assurer que les anciens messages ne sont pas repris.
Eh bien, tout ce que je peux dire, c'est que ce n'est pas une trace de pile appropriée; en plus des entrées
Causé par:
, une trace de pile normale contient des informations sur la pile d'appels (classes / méthodes / numéros de ligne). C'est pourquoi on l'appelle une trace stack . Si vous ne l'avez pas modifié, votre sous-système de journalisation est peut-être configuré pour supprimer ces informations importantes, ce qui serait vraiment très étrange.Tout ce que je peux vraiment dire est basé sur mon expérience avec cette erreur. Si vous êtes passé au dernier décalage, le consommateur ne lira rien car il n'y a pas de nouveaux messages. Si vous avez redémarré un producteur en envoyant des données, et alors le consommateur a la même erreur - alors le problème persiste; les messages n'ont pas été sérialisés à l'aide des sérialiseurs Confluent ... Je ne sais pas ce qu'est cette annotation
@Data
, mais elle n'est pas utilisée par les exemples Confluent github.com/confluentinc/examples/tree/5.1.0-post/clients/avr o a >@Data est une annotation Lombok - il crée juste automatiquement du code getter / setter