4
votes

confluent-python peut-il produire des données avec une valeur en avro et une clé en chaîne?

J'utilise python3 et confluent-python pour envoyer un message à Kafka . J'ai besoin d'envoyer des données avec une valeur au format Avro et la clé sous forme de chaîne. Mais j'ai trouvé que confluent-python ne peut envoyer les deux qu'en Avro ou les deux dans la chaîne. Le code source de confluent-python est aussi éclatant:

def produce(self, **kwargs):
    """
        Asynchronously sends a message to Kafka by encoding with specified or default Avro schema.

        :param str topic: topic name
        :param object value: An object to serialize
        :param str value_schema: Avro schema for value
        :param object key: An object to serialize
        :param str key_schema: Avro schema for key

        Plus any other parameters accepted by confluent_kafka.Producer.produce

        :raises SerializerError: On serialization failure
        :raises BufferError: If producer queue is full.
        :raises KafkaException: For other produce failures.
    """
    # get schemas from  kwargs if defined
    key_schema = kwargs.pop('key_schema', self._key_schema)
    value_schema = kwargs.pop('value_schema', self._value_schema)
    topic = kwargs.pop('topic', None)
    if not topic:
        raise ClientError("Topic name not specified.")
    value = kwargs.pop('value', None)
    key = kwargs.pop('key', None)

    if value is not None:
        if value_schema:
            value = self._serializer.encode_record_with_schema(topic, value_schema, value)
        else:
            raise ValueSerializerError("Avro schema required for values")

    if key is not None:
        if key_schema:
            key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
        else:
            raise KeySerializerError("Avro schema required for key")

    super(AvroProducer, self).produce(topic, value, key, **kwargs)

Quelqu'un le sait-il?


3 commentaires

J'ai actuellement le même problème - essayer de produire des données avec un schéma pour la valeur mais pas de schéma pour la clé (il suffit qu'il s'agisse d'une chaîne). Avez-vous trouvé une solution de contournement / solution?


Pas encore, j'ai regardé dans le code source et je suppose qu'il ne prend pas en charge cette fonctionnalité. Dans kafka, la clé décide de la partition vers laquelle le message ira, donc désérialiser la clé n'est pas si important.


Problème pour cela ici: github.com/confluentinc/confluent-kafka-python/issues / 428


3 Réponses :


1
votes

Ma solution de contournement était donc simplement de changer le code Python pour ne pas lever l'exception. Je suppose que les auteurs de la bibliothèque ne permettent pas la flexibilité d'utiliser un schéma uniquement pour la clé ou uniquement pour la valeur pour une raison, mais ne savent pas ce que c'est. Pour mon cas d'utilisation de la nécessité de publier ce type de données en développement, j'ai pensé que c'était une solution correcte.

Le changement de code est dans confluent_kafka / avro / __ init__.py et supprime simplement les lignes 87 et 88:

84    if key is not None:
85        if key_schema:
86            key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
87        else:
88            raise KeySerializerError("Avro schema required for key")


2 commentaires

En effet, en regardant Producer.produce (le super d'AvroProducer), la clé peut être un "str" ​​ou "bytes".


Cependant, voyez ma réponse - vous n'avez pas à changer le code de la bibliothèque pour sérialiser une clé sous forme de chaîne.



2
votes

Définissez une chaîne PrimitiveSchema - comme ceci:

key_schema = avro.loads ('{"type": "string"}')

et utilisez-le comme ceci lorsque vous construisez votre producteur:

worker = avro.AvroProducer (config = conf, default_key_schema = key_schema, default_value_schema = your_value_schema)


1 commentaires

Vous pouvez également utiliser from avro import schema et key_schema = schema.parse ('{"type": "string"}') pour définir le key_schema.



0
votes

Vous pouvez également envisager:


0 commentaires