3
votes

Comment utiliser camel-avro-consumer & producteur?

Je ne vois pas d'exemple d'utilisation du composant camel-avro pour produire et consommer des messages kafka avro? Actuellement, ma route à dos de chameau est la suivante. que doit-il être changé pour fonctionner avec schema-registry et d'autres accessoires comme celui-ci en utilisant le consommateur et le producteur camel-kafka-avro.

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);              
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 

public void configure() {
        PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class); 
        pc.setLocation("classpath:application.properties");

        log.info("About to start route: Kafka Server -> Log ");

        from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                + "&maxPollRecords={{consumer.maxPollRecords}}"
                + "&consumersCount={{consumer.consumersCount}}"
                + "&seekTo={{consumer.seekTo}}"
                + "&groupId={{consumer.group}}"
                +"&valueDeserializer="+KafkaAvroDeserializer.class
                +"&keyDeserializer="+StringDeserializer.class
                )
                .routeId("FromKafka")
            .log("${body}");


5 commentaires

Pourquoi est-ce voté à la baisse? pourraient-ils s'il vous plaît expliquer? même avec cette configuration -> cela ne fonctionne pas value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer.


Pouvez-vous clarifier ce qui se passe ?


@ cricket_007: L'application démarre et définit le regard du consommateur kafka sur cette valeur qu'AvroDeserializer est ma propre classe Je n'ai pas pu établir de connexion kafka à partir du consommateur avro de confluent. -> value.deserializer = class org.apache.camel.example.kafka.AvroDeserializer 18/03/2019 07: 56: 40,663 [nsumer [avro-t1]] WARN KafkaConsumer - KafkaException consommant avro-t1-Thread 0 à partir du sujet avro-t1. Essaiera de se reconnecter à la prochaine exécution ----- c'est l'exception et l'application continue de se reconnecter au courtier kafka et échoue donc.


Hmm. Je ne reconnais pas ce org.apache.camel.example.kafka.Avro ​​Deserializer . En théorie, tout désérialiseur sur le chemin de classe devrait fonctionner. Pourriez-vous activer la journalisation du débogage peut-être?


@ cricket_007: merci de m'aider dans le processus de réflexion.


3 Réponses :


2
votes

Je réponds à ma propre question car je me suis assis sur ce problème pendant quelques jours. J'espère que cette réponse sera utile pour les autres.

J'ai essayé d'utiliser le désérialiseur io.confluent.kafka.serializers.KafkaAvroDeserializer et j'ai obtenu une exception kafka. j'ai donc dû écrire mon propre désérialiseur pour faire les choses suivantes:

  1. définir le registre de schémas
  2. utiliser un lecteur avro spécifique (ce qui ne signifie pas la chaîne par défautDeserializer)

alors nous devons accéder à "schemaRegistry", "useSpecificAvroReader" et définir ces champs du AbstractKafkaAvroDeserializer (io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer)

Voici la solution ...

CAMEL-KAFKA-AVRO-ROUTE-BUILDER

package com.example.camel.kafka.avro;

import java.util.Collections;
import java.util.List;
import java.util.Map;


import io.confluent.common.config.ConfigException;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.Deserializer;



public class CustomKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer
    implements Deserializer<Object> {

    private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081";

    @Override
    public void configure(KafkaAvroDeserializerConfig config) {

     try {
          final List<String> schemas = 
                              Collections.singletonList(SCHEMA_REGISTRY_URL);
          this.schemaRegistry = new CachedSchemaRegistryClient(schemas, 
                                  Integer.MAX_VALUE);
          this.useSpecificAvroReader = true;

       } catch (ConfigException e) {
              throw new org.apache.kafka.common.config.ConfigException(e.getMessage());
     }
   }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    configure(null);
  }

  @Override
  public Object deserialize(String s, byte[] bytes) {
    return deserialize(bytes);
  }

  @Override
  public void close() {
  }
}

DESERIALIZER CLASSS - ceci définit le schema.registry.url et use.specific.avro.reader au niveau abstrait AbstractKafkaAvroDeserializer. Si je ne règle pas ceci, j'obtiendrai kafka-config-exception.

public static void main(String[] args) throws Exception {
    LOG.info("About to run Kafka-camel integration...");
    CamelContext camelContext = new DefaultCamelContext();
    // Add route to send messages to Kafka
    camelContext.addRoutes(new RouteBuilder() {
        public void configure() throws Exception {                
            PropertiesComponent pc = getContext().getComponent("properties", 
                                      PropertiesComponent.class);
            pc.setLocation("classpath:application.properties");

            log.info("About to start route: Kafka Server -> Log ");

            from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                    + "&maxPollRecords={{consumer.maxPollRecords}}"
                    + "&consumersCount={{consumer.consumersCount}}"
                    + "&seekTo={{consumer.seekTo}}"
                    + "&groupId={{consumer.group}}"
                    + "&keyDeserializer="+ StringDeserializer.class.getName() 
                    + "&valueDeserializer="+CustomKafkaAvroDeserializer.class.getName()
                    )
                    .routeId("FromKafka")
                .log("${body}");

        }
    });
    camelContext.start();
    // let it run for 5 minutes before shutting down
    Thread.sleep(5 * 60 * 1000);
    camelContext.stop();
}


4 commentaires

btw, il a été récemment ajouté - stackoverflow.com/a/55437784/2308683


@ cricket_007: Je ne pouvais pas voir à ce moment-là quand je cherchais et que je voulais une solution. Merci pour votre recherche approfondie.


Hé, je me demandais si c'était un projet de travail ou quelque chose que tu pourrais partager? J'aimerais voir l'exemple complet sur github ou quelque part comme ça? Merci


@ MickO'Gorman: désolé de ne pas avoir regardé ça avant. Regardez mon itinéraire: github.com/hykavitha/camel-kafka-microservice/blob/master/sr‌ c /… Regardez le CustomKafkaAvroDeserializer github.com/hykavitha/camel- kafka-microservice / blob / master / sr‌ c /…



1
votes

En utilisant camel-kafka-starter (pour spring boot) version: 3.6.0 , vous n'avez pas besoin de définir un CustomKafkaAvroDeserializer . Au lieu de cela, ajoutez les détails de configuration suivants à votre fichier application.yaml ou application.properties et le composant camel-kafka (à la fois producteur et consommateur) appliquera le SerDe approprié au objets / octets en cours de traitement.

camel:
  springboot:
    main-run-controller: true # to keep the JVM running
  component:
    kafka:
      brokers: http://localhost:9092
      schema-registry-u-r-l: http://localhost:8081

      #Consumer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

      #Producer
      key-serializer-class: org.apache.kafka.common.serialization.StringSerializer
      serializer-class: io.confluent.kafka.serializers.KafkaAvroSerializer

      specific-avro-reader: true

Vous devez également vous assurer de télécharger les fichiers avro-schema-json respectifs sur votre serveur de registre de schéma, par exemple confluent-schema -registry avant d'exécuter producteur / consommateur.


1 commentaires

Serait-il possible pour vous de partager un exemple d'itinéraire correspondant?



0
votes

J'ai lutté avec la même question pendant un moment. J'ai fait un exemple complet avec camel-quarkus et schema-registry de confluent: https://github.com/tstuber/camel-quarkus-kafka-schema- registre


0 commentaires