2
votes

lire uniquement des messages spécifiques du sujet kafka

Scénario:

J'écris des données d'objets JSON dans le sujet kafka pendant la lecture. Je veux lire uniquement un ensemble spécifique de messages en fonction de la valeur présente dans le message. J'utilise la bibliothèque kafka-python.

exemples de messages:

{flow_status: "completed", value: 1, active: yes}
{flow_status:"failure",value 2, active:yes}

Ici, je veux lire uniquement les messages dont flow_Status est terminé.


0 commentaires

4 Réponses :


0
votes

Vous pouvez créer deux sujets différents; un pour l'état terminé et un autre pour l'état d'échec. Et puis lisez les messages des sujets terminés pour les gérer.

Sinon, si vous voulez qu'ils soient dans un seul sujet et que vous ne voulez lire que ceux terminés, je pense que vous devez les lire tous et ignorer ceux qui ont échoué en utilisant une simple condition if-else.


0 commentaires

1
votes

Dans Kafka, il n'est pas possible de faire quelque chose comme ça. Le consommateur consomme les messages un par un, l'un après l'autre à partir du dernier offset engagé (ou depuis le début, ou en cherchant un offset spécifique). Dépend de votre cas d'utilisation, peut-être que vous pourriez avoir un flux différent dans votre scénario: le message prenant le processus à faire entre dans un sujet mais ensuite l'application qui traite l'action, puis écrit le résultat (terminé ou échoué) dans deux sujets différents : de cette façon, vous avez tous terminé séparés de l'échec. Une autre façon est d'utiliser une application Kafka Streams pour faire le filtrage mais en tenant compte du fait que ce n'est qu'un sucre, en réalité l'application streams lira toujours tous les messages mais vous permettra de filtrer les messages facilement.


4 commentaires

donc je peux avoir 3 sujets, 1 pour le journal entier, 1 pour l'état terminé, 1 pour l'état d'échec ... le travail écrira dans le sujet 1, puis filtrera les données en fonction du statut vers un autre sujet.


exactement, en quelque sorte, le statut pour vous est le type de message qui mérite un sujet différent dans ce cas d'utilisation (un pour terminé et un pour échec)


est-ce une bonne approche, d'avoir un seul sujet avec deux partitons (un pour terminé, un pour l'échec), tandis que l'envoi gardera la logique du producteur pour envoyer des données aux partitions respectives ... du côté du consommateur, créera des groupes de consommateurs séparés, un groupe à lire depuis la partition défaillante et autre pour lire depuis la partition terminée


le côté producteur pourrait être bon oui mais vous devez implémenter un partitionneur personnalisé pour ce faire. Du côté des consommateurs, c'est tout le contraire, deux consommateurs doivent être dans le même groupe de consommateurs pour avoir une partition attribuée à chacun. S'ils font partie de différents groupes de consommateurs, ils recevront tous les messages des deux partitions. Dans tous les cas, cela ne fonctionne pas bien, car si un consommateur tombe en panne, l'autre recevra l'autre partition (recevant les messages terminés et échoués). Vous pouvez éviter d'utiliser des groupes de consommateurs mais une attribution directe de partitions.



0
votes

Le consommateur Kafka ne prend pas en charge ce type de fonctionnalité dès le départ. Vous devrez consommer tous les événements de manière séquentielle, filtrer les événements d'état terminés et les placer quelque part. Au lieu de cela, vous pouvez envisager d'utiliser l'application Kafka Streams où vous pouvez lire les données sous forme de flux et filtrer les événements où flow_status = "terminé" et publier dans une rubrique de sortie ou une autre destination.

Exemple:

KStream<String,JsonNode> inputStream= builder.stream(inputTopic);
KStream<String,JsonNode> completedFlowStream = inputStream.filter(value-> value.get("flow_status").equals("completed"));

P.S. Kafka n'a pas de version officielle pour l'API Python pour KStream mais il existe un projet open source: https: / /github.com/wintoncode/winton-kafka-streams


0 commentaires

0
votes

À ce jour, il n'est pas possible de l'atteindre du côté du courtier, il y a une demande de fonctionnalité Jira ouverte à apache kafka pour que cette fonctionnalité soit implémentée, vous pouvez la suivre ici, j'espère qu'ils l'implémenteront dans un proche avenir: https://issues.apache.org/jira/browse/KAFKA-6020

Je pense que le meilleur moyen est d'utiliser une interface RecordFilterStrategy (Java / spring) et de la filtrer côté consommateur.


0 commentaires