Je ne vois pas d'exemple d'utilisation du composant camel-avro pour produire et consommer des messages kafka avro? Actuellement, ma route à dos de chameau est la suivante. que doit-il être changé pour fonctionner avec schema-registry et d'autres accessoires comme celui-ci en utilisant le consommateur et le producteur camel-kafka-avro.
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); public void configure() { PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class); pc.setLocation("classpath:application.properties"); log.info("About to start route: Kafka Server -> Log "); from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}" + "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount={{consumer.consumersCount}}" + "&seekTo={{consumer.seekTo}}" + "&groupId={{consumer.group}}" +"&valueDeserializer="+KafkaAvroDeserializer.class +"&keyDeserializer="+StringDeserializer.class ) .routeId("FromKafka") .log("${body}");
3 Réponses :
Je réponds à ma propre question car je me suis assis sur ce problème pendant quelques jours. J'espère que cette réponse sera utile pour les autres.
J'ai essayé d'utiliser le désérialiseur io.confluent.kafka.serializers.KafkaAvroDeserializer et j'ai obtenu une exception kafka. j'ai donc dû écrire mon propre désérialiseur pour faire les choses suivantes:
alors nous devons accéder à "schemaRegistry", "useSpecificAvroReader" et définir ces champs du AbstractKafkaAvroDeserializer (io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer)
Voici la solution ...
package com.example.camel.kafka.avro; import java.util.Collections; import java.util.List; import java.util.Map; import io.confluent.common.config.ConfigException; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import org.apache.kafka.common.serialization.Deserializer; public class CustomKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> { private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081"; @Override public void configure(KafkaAvroDeserializerConfig config) { try { final List<String> schemas = Collections.singletonList(SCHEMA_REGISTRY_URL); this.schemaRegistry = new CachedSchemaRegistryClient(schemas, Integer.MAX_VALUE); this.useSpecificAvroReader = true; } catch (ConfigException e) { throw new org.apache.kafka.common.config.ConfigException(e.getMessage()); } } @Override public void configure(Map<String, ?> configs, boolean isKey) { configure(null); } @Override public Object deserialize(String s, byte[] bytes) { return deserialize(bytes); } @Override public void close() { } }
public static void main(String[] args) throws Exception { LOG.info("About to run Kafka-camel integration..."); CamelContext camelContext = new DefaultCamelContext(); // Add route to send messages to Kafka camelContext.addRoutes(new RouteBuilder() { public void configure() throws Exception { PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class); pc.setLocation("classpath:application.properties"); log.info("About to start route: Kafka Server -> Log "); from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}" + "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount={{consumer.consumersCount}}" + "&seekTo={{consumer.seekTo}}" + "&groupId={{consumer.group}}" + "&keyDeserializer="+ StringDeserializer.class.getName() + "&valueDeserializer="+CustomKafkaAvroDeserializer.class.getName() ) .routeId("FromKafka") .log("${body}"); } }); camelContext.start(); // let it run for 5 minutes before shutting down Thread.sleep(5 * 60 * 1000); camelContext.stop(); }
btw, il a été récemment ajouté - stackoverflow.com/a/55437784/2308683
@ cricket_007: Je ne pouvais pas voir à ce moment-là quand je cherchais et que je voulais une solution. Merci pour votre recherche approfondie.
Hé, je me demandais si c'était un projet de travail ou quelque chose que tu pourrais partager? J'aimerais voir l'exemple complet sur github ou quelque part comme ça? Merci
@ MickO'Gorman: désolé de ne pas avoir regardé ça avant. Regardez mon itinéraire: github.com/hykavitha/camel-kafka-microservice/blob/master/sr c /… Regardez le CustomKafkaAvroDeserializer github.com/hykavitha/camel- kafka-microservice / blob / master / sr c /…
En utilisant camel-kafka-starter
(pour spring boot) version: 3.6.0
, vous n'avez pas besoin de définir un CustomKafkaAvroDeserializer
. Au lieu de cela, ajoutez les détails de configuration suivants à votre fichier application.yaml
ou application.properties
et le composant camel-kafka (à la fois producteur et consommateur) appliquera le SerDe approprié au objets / octets en cours de traitement.
camel: springboot: main-run-controller: true # to keep the JVM running component: kafka: brokers: http://localhost:9092 schema-registry-u-r-l: http://localhost:8081 #Consumer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer #Producer key-serializer-class: org.apache.kafka.common.serialization.StringSerializer serializer-class: io.confluent.kafka.serializers.KafkaAvroSerializer specific-avro-reader: true
Vous devez également vous assurer de télécharger les fichiers avro-schema-json
respectifs sur votre serveur de registre de schéma, par exemple confluent-schema -registry avant d'exécuter producteur / consommateur.
Serait-il possible pour vous de partager un exemple d'itinéraire correspondant?
J'ai lutté avec la même question pendant un moment. J'ai fait un exemple complet avec camel-quarkus et schema-registry de confluent: https://github.com/tstuber/camel-quarkus-kafka-schema- registre
Pourquoi est-ce voté à la baisse? pourraient-ils s'il vous plaît expliquer? même avec cette configuration -> cela ne fonctionne pas value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer.
Pouvez-vous clarifier ce qui se passe ?
@ cricket_007: L'application démarre et définit le regard du consommateur kafka sur cette valeur qu'AvroDeserializer est ma propre classe Je n'ai pas pu établir de connexion kafka à partir du consommateur avro de confluent. -> value.deserializer = class org.apache.camel.example.kafka.AvroDeserializer 18/03/2019 07: 56: 40,663 [nsumer [avro-t1]] WARN KafkaConsumer - KafkaException consommant avro-t1-Thread 0 à partir du sujet avro-t1. Essaiera de se reconnecter à la prochaine exécution ----- c'est l'exception et l'application continue de se reconnecter au courtier kafka et échoue donc.
Hmm. Je ne reconnais pas ce
org.apache.camel.example.kafka.Avro Deserializer
. En théorie, tout désérialiseur sur le chemin de classe devrait fonctionner. Pourriez-vous activer la journalisation du débogage peut-être?@ cricket_007: merci de m'aider dans le processus de réflexion.