6
votes

Pourquoi les décalages du groupe de consommateurs (app-id) de mon application Kafka Streams sont-ils réinitialisés après le redémarrage de l'application?

J'ai une application Kafka Streams pour laquelle, chaque fois que je la redémarre, les décalages pour le sujet qu'elle consomme sont réinitialisés. Par conséquent, pour toutes les partitions, les retards augmentent et l'application doit retraiter toutes les données.

MISE À JOUR: Le sujet de sortie reçoit une rafale d'événements qui ont déjà été traités après le redémarrage de l'application, ce n'est pas que les décalages de sujet d'entrée soient réinitialisés comme je l'ai dit dans le paragraphe précédent. Cependant, les décalages de sujet interne ( KTABLE-SUPPRESS-STATE-STORE ) sont en cours de réinitialisation, voir les commentaires ci-dessous.

Je me suis assuré que le décalage est de 1 pour chaque partition avant le redémarrage (ceci est pour le sujet de sortie). Tous les consommateurs qui appartiennent à cet identifiant de groupe de consommateurs (identifiant d'application) sont actifs. Le redémarrage est immédiat, cela prend environ 30 secondes.

L'application utilise exactement une fois comme garantie de traitement.

J'ai lu cette réponse Comment un offset expire-t-il pour un groupe de consommateurs Apache Kafka? . p >

J'ai essayé avec auto.offset.reset = latest et auto.offset.reset = early .

Il semble que les décalages pour ces sujets ne sont pas effectivement validés, (mais je ne suis pas sûr à ce sujet).

Je suppose qu'après le redémarrage, l'application devrait reprendre le dernier décalage engagé pour ce groupe de consommateurs. p >

MISE À JOUR: Je suppose cela pour le sujet interne ( KTABLE-SUPPRESS-STATE-STORE )

L'API Kafka Stream s'assure-t-elle de valider tout le décalage consommé avant de s'arrêter? (après avoir appelé streams.close())

J'apprécierais vraiment tout indice à ce sujet.

MISE À JOUR : p>

Voici le code que l'application exécute:

final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
        .stream(inputTopicNames, Consumed.with(..., ...)
        .withTimestampExtractor(...);

events
    .filter((k, v) -> ...)
    .flatMapValues(v -> ...)
    .flatMapValues(v -> ...)
    .selectKey((k, v) -> v)
    .groupByKey(Grouped.with(..., ...))
    .windowedBy(
        TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))              
            .advanceBy(Duration.ofSeconds(windowSizeInSecs))
            .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
    .reduce((agg, new) -> {
        ...
        return agg;
    })
    .suppress(Suppressed.untilWindowCloses(
                  Suppressed.BufferConfig.unbounded()))
    .toStream()
    .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));

La réinitialisation du décalage se produit juste et toujours (après le redémarrage) avec le KTABLE-SUPPRESS-STATE- STORE thème interne créé par l'API Kafka Stream.

J'ai essayé avec la garantie de traitement exactement une fois et au moins une fois .

Encore une fois, j'apprécierai vraiment tout indice à ce sujet.

MISE À JOUR: Ce problème a été résolu dans la version 2.2.1 ( https: //issues.apache.org/jira/browse/KAFKA-7895 )


10 commentaires

Vous dites que vous n'êtes pas sûr que les compensations soient réellement engagées. Avez-vous regardé le sujet consumer__offsets pour vérifier cela? Cet article de blog peut vous aider à découvrir ceci: medium.com/@felipedutratine/ … .


Avez-vous essayé si cela fonctionne sans la garantie de traitement exactement une fois? C'est une fonctionnalité relativement nouvelle.


Merci pour le commentaire @ user152468, oui j'ai vérifié avec l'outil bin / kafka-consumer-groups.sh . Je n'ai pas essayé avec d'autres garanties de traitement car dans mon cas, j'en ai besoin d'exactement une, mais cela vaut la peine d'essayer de supprimer cela pourrait être le problème. Cependant, je serai surpris si c'est le cas.


Les décalages doivent être validés sur streams.close () - essayez de le vérifier. De plus, les décalages doivent être validés tous les 100 ms par défaut si une seule fois est activée. Je ne sais pas pourquoi les décalages ne sont pas repris au démarrage. Je recommanderais d'inspecter les journaux - peut-être augmenter le niveau de journalisation à DEBUG pour obtenir plus d'informations.


Bonjour @ MatthiasJ.Sax merci pour la réponse, j'ai trouvé ceci dans les journaux: INFO [MI-APP-ID-xxx-StreamThread-4] internals.StoreChangelogReader (StoreChangelogReader.java:215) - stream-thread [MI-APP -ID-xxx-StreamThread-4] Aucun point de contrôle trouvé pour la tâche 1_5 magasin d'état KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog MI-APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011- changelog-5 avec EOS activé. Réinitialisation de la tâche et restauration de son état depuis le début . ... suivre le commentaire suivant


INFO [MI-APP-ID-XXXStreamThread-4] internals.Fetcher (Fetcher.java:583) - [Consumer clientId = MI-APP-ID-XXX-StreamThread-4-restore-consumer, groupId = ] Remise à zéro de l'offset de la partition MI-APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-5 à l'offset 0 . Est clair que réinitialise le décalage pour le changelog, mais je ne sais pas pourquoi, est probablement quelque chose d'évident que je ne sais pas. Le fait que groupId = soit vide est normal ?. Le journal Aucun point de contrôle trouvé est le point de contrôle dans le magasin local, n'est-ce pas? est-ce lié à la Réinitialisation du décalage pour la partition ... au décalage 0 ?


Sur KafkaStreams # close () un fichier de point de contrôle local est écrit. Si ce fichier de point de contrôle n'est pas trouvé au démarrage, KafkaStreams doit effacer le magasin et le recréer à partir de zéro. Il utilise un deuxième Consumer sans identifiant de groupe pour effectuer la récupération du magasin. La question est, pourquoi il n'y a pas de fichier de point de contrôle? Lorsque vous signalez que vos décalages de rubrique d'entrée ne sont pas non plus validés, cela indique que votre close () ne se termine pas proprement?


Je ne vois aucune erreur ou exception après avoir appelé KafkaStreams # close () : INFO [Thread-3] streams.KafkaStreams (KafkaStreams.java:902) - stream-client [MY-APP -ID-XXX] Le client Streams s'est complètement arrêté . Pensez-vous que je pourrais être confronté à ce bug @ MatthiasJ.Sax issues.apache.org/jira / parcourir / KAFKA-7672


Pas certain. Dur à dire.


Bonjour @ MatthiasJ.Sax J'ai ajouté l'application de code à la question au cas où vous pourriez me guider ou tout indice serait apprécié.


3 Réponses :


-2
votes

La fréquence de validation est contrôlée par le paramètre commit.interval.ms . Vérifiez si vos offsets sont bien engagés. Par défaut, les décalages sont validés toutes les 100 ms ou 30 secondes, selon votre configuration de garantie de traitement. Vérifiez ceci


4 commentaires

Merci pour la réponse @senseiwu. Oui, la valeur de commit.interval.ms , dans ce cas, est 100ms (par défaut, une seule fois). Je vérifie la position du consommateur avec bin / kafka-consumer-groups.sh, avant de redémarrer, le LAG est 1, après le redémarrage, le LAG va sur le toit.


Consultez ce message, en particulier les commentaires de Matthias J. Sax 21 juin 18 à 17:24 ce qui explique un peu en détail le mécanisme de validation d'offset


Ce n'est pas une réponse à la question.


Merci de l'avoir signalé. Mais d'après la question, il semble que l'OP ne soit pas sûr de la façon dont les commits périodiques basés sur commit.interval.ms fonctionnent réellement. Je ne sais toujours pas si c'est là qu'il a un problème ou si les compensations sont réinitialisées comme il le spécule davantage.



2
votes

La réinitialisation du décalage se produit juste et toujours (après le redémarrage) avec le sujet interne KTABLE-SUPPRESS-STATE-STORE créé par l'API Kafka Stream.

Il s'agit actuellement du comportement attendu (version 2.1), car l'opérateur suppress () ne fonctionne qu'en mémoire. Ainsi, au redémarrage, le tampon de suppression doit être recréé à partir du sujet du journal des modifications avant que le traitement ne puisse démarrer.

Attention, il est prévu de laisser suppress () écrire sur le disque dans les prochaines versions (cf. https://issues.apache.org/jira/browse/KAFKA-7224 ). Cela évitera la surcharge liée à la recréation du tampon à partir du sujet du journal des modifications.


12 commentaires

Merci @ MatthiasJ.Sax, donc, il est normal de s'attendre à ce que l'application lise le sujet KTABLE-SUPPRESS-STATE-STORE à partir de offset 0 après le redémarrage, d'où , passera en aval aux événements de rubrique de sortie qui ont déjà été traités?


Les données déjà traitées ne seront pas renvoyées en aval. Les données du journal des modifications sont simplement ajoutées au magasin de suppression avant le début du traitement. Si les données ne sont pas ajoutées au magasin, vous pourriez avoir des résultats manquants (pensez, un enregistrement a été ajouté au tampon de suppression, mais jamais émis, et aucune mise à jour nécessaire pour la même clé ne se produit). Notez également que la suppression de l'expulsion est basée sur l'heure de l'événement, il n'est pas nécessaire de supprimer quoi que ce soit si le tampon est rechargé au redémarrage (l'heure de l'événement est toujours la même).


Merci encore @ MatthiasJ.Sax, les données du journal des modifications sont ajoutées au magasin de suppression (je suppose en mémoire) avant le début du traitement, la lecture de la rubrique KTABLE-SUPPRESS-STATE-STORE toujours à partir de décalage 0 ?. Je demande parce que les journaux montrent toujours que le décalage est réinitialisé à 0 (pour les partitions de rubrique KTABLE-SUPPRESS-STATE-STORE ). Les décalages pour les partitions de rubrique KTABLE-SUPPRESS-STATE-STORE ne sont pas validés après une suppression réussie ou la fermeture d'une fenêtre? Je m'attendrais à ajouter au tampon à nouveau des enregistrements qui n'ont jamais été émis, ou ce n'est pas comme ça? Merci beaucoup d'avance @ MatthiasJ.Sax.


(1) les décalages des sujets du journal des modifications ne sont jamais validés (2) parce que le tampon est en mémoire, même si le décalage serait validé, il serait nécessaire de l'ignorer et de recharger de toute façon à partir du décalage zéro - sinon, le tampon ne serait pas chargé correctement (rien n'est écrit sur le disque local - c'est un atm de tampon en mémoire). "Je m'attendrais à ajouter à nouveau au tampon des enregistrements qui n'ont jamais été émis, ou ce n'est pas comme ça?" - c'est possible bien sûr.


Oh, beaucoup plus clair @ MatthiasJ.Sax, merci! si j'ai bien compris puisque les décalages du sujet du journal des modifications supprimer ne sont jamais validés, il n'y a aucun moyen pour le moment pour l'application Kafka Stream de savoir quels enregistrements ont déjà été émis (puisque l'application doit être rejouée depuis offset 0 ) à moins qu'il y ait une sorte de métadonnées dans les enregistrements stockés dans la rubrique suppress , n'est-ce pas ?. Encore une fois, merci beaucoup Matthias pour votre aide, votre temps et vos explications.


Correct - cependant, les décalages ne concernent pas les enregistrements déjà émis. L'émission est basée sur l'heure de l'événement et chaque enregistrement dans la mémoire tampon a son heure d'événement stockée dans le champ d'horodatage du message.


Merci @ MatthiasJ.Sax, ça a du sens. 1.- L'application répond à toute la rubrique du journal des modifications suppress pour mettre en mémoire tampon le magasin d'état avant le début du traitement. 2.- Une fois l'ensemble du magasin recréé (mis en mémoire tampon), le traitement des événements commence. 3.- Puisque le processus d'émission est basé sur l'heure de l'événement, lorsque de nouveaux enregistrements non encore émis arrivent (je suppose par rapport à l'heure de l'événement du dernier enregistrement émis situé dans le tampon), la suppression recommence jusqu'à la fin des fenêtres. Si c'est correct, mon application ne le fait pas. Après le redémarrage, de nombreux enregistrements déjà traités aboutissent dans la rubrique de sortie.


Cela semble correct. - À propos de votre dernière déclaration: vous voulez dire que vous obtenez des enregistrements de sortie en double? Ce serait un bug.


Oui @ MatthiasJ.Sax, bien que ce ne soit pas déterministe, après le redémarrage, les enregistrements déjà traités arrivent dans le sujet de sortie de l'ordre de grandeur de 10, env. J'ai lu le code, mais pas facile de repérer s'il y a un bug. Si vous pouvez me guider sur quoi ou deviez vérifier, je l'apprécierai beaucoup. Merci!


Si un enregistrement est dans le tampon et qu'il est émis, il doit être supprimé du tampon (cf. InMemoryTimeOrderedKeyValueBuffer # evictWhile () ) - l'ensemble dirtyKey est écrit dans le sujet du journal des modifications sur flush () et devrait supprimer les enregistrements émis. Ainsi, au redémarrage, ils doivent être à nouveau émis, car ils doivent être rechargés dans le tampon.


Merci @ MatthiasJ.Sax, je vais vérifier ça :)


J ai exactement le même problème :(



0
votes

Je pense que la réponse de @Matthias J. Sax couvre la plupart des éléments internes de la suppression. Une chose que je dois clarifier cependant: quand vous dites "redémarrer l'application", qu'avez-vous fait exactement? Avez-vous arrêté l'ensemble de l'application en douceur, puis l'avez-vous redémarré?


5 commentaires

Bonjour @GuozhangWang merci pour la réponse. Le redémarrage implique arrêter l'application appelant KafkaStreams # .close () dans un hook d'arrêt, puis démarrer l'application appelant le java. .. Commande .


Cela semble raisonnable. Je pense que le dernier commentaire de Matthias explique alors la situation.


Salut @GuozhangWang J'ai fait face au problème similaire (v2.3.1) lors du redémarrage de l'application, le sujet de répartition a bytesin mais pas d'octets alors que le sujet changelog a un énorme bytesout, pas d'octetsin. J'ai vérifié le journal et j'ai remarqué que le décalage de suppression des modifications est réinitialisé à 0. Et l'état ne cesse de changer entre PARTITIONS_REVOKED et PARTITIONS_ASSIGNED. Est-ce prévu? Je n'ai pas vu le statut redevenu RUNNING. Veuillez aider! Merci beaucoup!!


Quelle est la taille de votre état pour redémarrer? D'après la description, il semble que le tampon de suppression a été restauré à partir de zéro et prend beaucoup de temps et par conséquent, le consommateur est à nouveau exclu du groupe.


Salut @GuozhangWang, le trafic est d'environ 150 000 à 180 000 messages / s. Donc je suppose que c'est un grand état. Chaque fois que je redéploie la version, cela se produit. Y a-t-il quelque chose que je puisse faire pour empêcher la restauration de la totalité du tampon de suppression? Merci beaucoup!