J'ai une application où les événements sont envoyés sur un sujet Kafka en fonction des actions de l'utilisateur comme la connexion de l'utilisateur, les actions intermédiaires de l'utilisateur (facultatives) et la déconnexion de l'utilisateur. Chaque événement a des informations dans un objet événement avec userId, par exemple un événement de connexion a loginTime; Ajouter une note a des notes (actions intermédiaires). De même, un événement de déconnexion a logoutTime. L'exigence est d'agréger les informations de tous ces événements en un seul objet après avoir reçu l'événement de déconnexion pour chaque utilisateur et de l'envoyer en aval.
Pour certaines raisons (retard du réseau, producteur d'événements multiples), les événements peuvent ne pas se produire (l'événement de déconnexion de l'utilisateur peut venir avant l'événement intermédiaire), la question est donc de savoir comment gérer de tels scénarios? Je ne peux pas attendre les événements intermédiaires après avoir reçu l'événement de déconnexion de l'utilisateur car les événements intermédiaires sont facultatifs en fonction des actions de l'utilisateur.
La seule option que je pense ici est d'attendre un certain temps après avoir reçu l'événement de déconnexion de l'utilisateur, de traiter les événements intermédiaires s'ils sont reçus dans ce délai d'attente et d'envoyer l'événement traité, mais encore une fois, je ne sais pas comment y parvenir.
3 Réponses :
Kafka
ne garantit pas la commande sur topic
, il garantit la commande sur partition
. Une rubrique peut avoir plus d'une partition, de sorte que chaque consommateur qui consomme votre rubrique consommera une partition. C'est ainsi que kafka atteint l'évolutivité. Donc, ce que vous rencontrez est un comportement normal (ce n'est pas un bug ou un retard du réseau ou quelque chose du genre). Ce que vous pouvez faire, c'est vous assurer que tous les messages que vous souhaitez traiter dans l'ordre sont envoyés sur la même partition. Vous pouvez le faire en définissant le nombre de partitions sur 1, c'est la manière la plus stupide. Lorsque vous envoyez un message avec le producteur, par défaut, kafka examine la clé, en prend le hachage et par ce hachage sait sur quelle partition doit envoyer un message. Vous pouvez vous assurer que pour tous les messages, la clé est la même. De cette façon, tous les hachages de clés seront les mêmes et tous les messages iront sur la même partition. En outre, vous pouvez implémenter un partitionneur personnalisé et remplacer la manière par défaut par laquelle kafka choisit sur quelle partition le message ira. De cette façon, tous les messages arriveront dans l'ordre. Si vous ne pouvez effectuer aucune de ces actions, vous recevrez des événements dans le désordre et vous devrez réfléchir à un moyen de les consommer dans le désordre, mais ce n'est pas une question liée à kafka.
mais ce n'est pas une question liée à kafka
- pourquoi pas? C'est absolument lié!
Si vous n'êtes pas en mesure de conserver l'ordre des événements (cette déconnexion sera le dernier événement), vous pouvez répondre à vos besoins en utilisant ProcesorApi sur Kafka Streams. Kafka Streams DSL peut être combiné avec l'API Processor (plus de détails ici ).
Vous pouvez avoir plusieurs partitions, mais tous les événements pour un utilisateur particulier doivent être envoyés à la même partition.
Vous devez implémenter un processeur / transformateur personnalisé. Votre processeur placera chaque événement / activité dans le magasin d'état (regroupez tous les événements d'un utilisateur particulier sous la même clé). L'API du processeur vous permet de créer une sorte de planificateur ( Ponctuateur ). Vous pouvez programmer pour vérifier toutes les X secondes événements pour un utilisateur particulier. Si Logout date de il y a longtemps , vous obtenez tous les événements / activités, effectuez une certaine agrégation et envoyez les résultats aux aval.
Pouvez-vous expliquer en détail ce que vous essayez de dire?
@Swapnil, veuillez d'abord lire les liens ci-joints, puis si vous avez des questions, veuillez les poser
Je me demande si l'API Supress confluent.io/blog / kafka-streams-take-on-watermarks-and-trigg ers devrait permettre aux utilisateurs de transmettre leur logique personnalisée (tout comme l'agrégation fenêtrée)
Comme indiqué dans d'autres réponses, l'ordre Kafka est maintenu par partition.
Puisque vous parlez d'événements utilisateur, pourquoi ne définissez-vous pas UserID comme clé de rubrique Kafka? Ainsi, tous les événements liés à un utilisateur spécifique seront toujours commandés (à condition qu'ils soient produits par un seul producteur).
Vous devez vous assurer (par conception ) qu’un seul producteur Kafka pousse tous les événements de modification utilisateur vers le sujet donné. De cette façon, vous pouvez éviter les messages hors service dus à plusieurs producteurs.
À partir des flux, vous pouvez également consulter Windows dans les flux Kafka. Tumbling windows par exemple est de taille fixe et sans chevauchement. Vous regroupez les enregistrements sur une période donnée.
Maintenant, vous voudrez peut-être trier les agrégés par leur horodatage (ou vous avez dit que vous avez l'heure de déconnexion, l'heure de connexion, etc.) et agir en conséquence.
Solution simple et efficace
Utilisez l'envoi synchrone et définissez delivery.timeout.ms
et tentatives
sur une valeur maximale.
Pour garantir la tolérance aux pannes, définissez acks = all
avec min.insync.replicas = 2
(configuration du sujet) et utilisez un seul producteur pour pousser vers ce sujet.
Vous devez également définir max.block.ms
sur une valeur maximale afin que votre send ()
ne retourne pas immédiatement en cas d'erreur lors de la récupération des métadonnées (par exemple, quand Kafka est en panne).
Comparez l'envoi synchrone avec votre taux et vérifiez s'il répond à vos exigences ou à votre numéro de référence.
Cela garantit qu'un message arrivé en premier est d'abord envoyé à Kafka, puis le message suivant n'est pas envoyé tant que le message précédent n'a pas été reconnu avec succès.
Si votre chiffre de référence n'est pas atteint, essayez d'avoir une contre-pression mécanisme comme la file d'attente en mémoire / persistante.
producteur.send (...). get ()
dans Thread-2