6
votes

Données de fichier Json dans la rubrique Kafka

Comment insérer des données de fichier json dans la rubrique kafka à l'aide de kafka-console-producteur? Chaque ensemble de données JSON peut-il être stocké sous forme de message?

exemple-

[{
"id": 1,
  "first_name": "John",
  "last_name": "Lindt",
  "email": "jlindt@gmail.com",
  "gender": "Male",
  "ip_address": "1.2.3.4"
}, {
  "id": 2,
  "first_name": "Peter",
  "last_name": "Friz",
  "email": "Friz3@gmail.com",
  "gender": "Male",
  "ip_address": "4.5.6.7"
}, {
  "id": 3,
  "first_name": "Dell",
  "last_name": "Chang",
  "email": "Dellc@gmail.com",
  "gender": "Female",
  "ip_address": "8.9.10.11"
}, {
"id": 4,
  "first_name": "Lolita",
  "last_name": "John",
  "email": "LolitaJ@gmail.com",
  "gender": "Female",
  "ip_address": "12.13.14.15"
}, {
"id": 5,
  "first_name": "Pele",
  "last_name": "Wang",
  "email": "Pele@gmail.com",
  "gender": "Male",
  "ip_address": "16.17.18.19"
}, {
  "id": 6,
  "first_name": "Rene",
  "last_name": "Charm",
  "email": "Rene3@gmail.com",
  "gender": "Male",
  "ip_address": "20.21.22.23"

Si vous utilisez cette commande -

cat sampledata.json|kafka-console-producer --broker-list localhost:9092 --topic  stream-test-topic

Chaque ligne est considérée comme un message distinct.

Quel est le bon moyen de le faire ?

Merci!

ps-

Le sujet est lu par Elastic Search. exemple de fichier de message json -

{
  "id": 1,
  "first_name": "John",
  "last_name": "Lindt",
  "email": "jlindt@gmail.com",
  "gender": "Male",
  "ip_address": "1.2.3.4"
}


9 commentaires

Quel est votre délimiteur de message? Simplement un nouvel objet JSON? D'où vient le JSON?


Mise à jour de ma question avec un exemple de fichier json.


Vous avez donc un fichier avec un seul tableau JSON et vous souhaitez exploser ce tableau en messages individuels? Est-ce correct?


Et; d'où viennent ces données? Il est presque certain qu'un meilleur modèle ici serait pour le processus de construction du tableau JSON et d'écriture de ce fichier pour l'envoyer directement à Kafka


Oui, chaque donnée json doit être divisée en un message distinct. Ex- un message unique comme - {"id": 1, "first_name": "John", "last_name": "Lindt", "email": "jlindt@gmail.com", "gender": "Male", " adresse_ip ":" 1.2.3.4 "}


6 messages individuels pour cet ensemble de données json.


d'où viennent ces données? - Ce sont des données aléatoires que nous avons placées dans la rubrique en utilisant kafka-console-producteur via un fichier json. Nous voulons que ce sujet soit lu par une recherche élastique. (Un simple poc)


"Ce sont des données aléatoires que nous avons mises dans le sujet en utilisant kafka-console-producteur via un fichier json"… haha, ok, d'accord, remettons ça en place. Vous souhaitez créer une application ou un pipeline, qui transmet des données à Elasticsearch, et vous souhaitez le faire via Kafka. Maintenant, les données qui vont à Elasticsearch, d'où vont-elles provenir? Je ne veux pas dire comment construisez-vous votre prototype, je veux dire, d'où viennent ces données? Pourquoi l'écrivez-vous dans un fichier et non directement dans Kafka avec une bibliothèque client native ou un proxy REST?


Si vous voulez littéralement prendre des données factices d'un fichier et les pousser dans une rubrique à des fins de test, sortez-les simplement du tableau et mettez un objet sur chaque ligne. C'est tout.


3 Réponses :


8
votes

Du point de vue de Kafka, chaque message est un tableau d'octets. C'est à l'application du client (producteur, consommateur, etc.), comment il la traite. Kafka Producer, Consumer utilise Deserializer, Serializer pour transformer de / vers Tableau d'octets vers / depuis un objet métier (String, POJO)

Le problème auquel vous êtes confronté est la manière dont le producteur de la console Kafka lit le message à partir de l'entrée standard. Par défaut, il utilise LineMessageReader , qui traite chaque ligne comme un nouveau message. Vous pouvez implémenter le vôtre, ou avant d'envoyer, traduire chaque nouveau caractère de ligne dans json vers un autre espace blanc.

Par exemple, vous pouvez utiliser la commande suivante:

jq -rc . sampledata.json | kafka-console-producer --broker-list localhost:9092 --topic stream-test-topic


0 commentaires

9
votes

Si vous avez des messages JSON dans le fichier, vous pouvez utiliser la méthode suivante pour écrire dans la rubrique kafka:

kafka-console-producer --broker-list localhost:9092 --topic testTopic--property value.serializer=custom.class.serialization.JsonSerializer 

Les producteurs Kafka lit les messages ligne par ligne à l'aide de LineMessageReader par défaut. Les sérialiseurs de clé et de valeur par défaut sont StringSerializer . Il ne validera pas s'il existe un json approprié ou non, mais le considérera plutôt comme un objet de chaîne brute comme une publication dans une rubrique kafka. Mais si vous souhaitez valider, vous pouvez définir la configuration ci-dessous dans la commande console-producteur.

key.serializer
value.serializer

Exemple :

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic user-timeline < samplerecords.json

Du côté des consommateurs, vous pouvez adopter une approche similaire. Utilisez JsonDeserializer pour lire les données.


1 commentaires

J'ai trouvé que celui-ci ne fonctionnait pas pour moi, même si j'ai également essayé d'utiliser key.serializer.



0
votes

Je suis également nouveau sur Kafka et j'ai eu le même cas d'utilisation que vous. Après quelques R&D, j'ai trouvé une brève réponse qui pourrait vous aider. Vous pouvez écrire quelque chose comme ci-dessous:

bin/kafka-console-producer --broker-list localhost:9092 --topic blogpost
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}.

Pour un aperçu plus détaillé, cliquez ici


0 commentaires