3
votes

Kafka Streams - Erreur de rééquilibrage impossible

J'ai une application Kafka Streams de base qui lit à partir d'un in_topic , effectue un agrégat de roulement et effectue une jointure pour publier sur un out_topic . Cela fonctionne bien depuis des semaines, mais il s'est écrasé ce matin et ne démarrera plus. Je ne pense pas que cela ait quoi que ce soit à voir avec le code. Le journal avant l'erreur est:

2019-01-21 17:46:32,803 localhost org.apache.kafka.clients.producer.KafkaProducer: [Producer clientId=rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1-0_0-producer, transactionalId=rtt-healthscore-stream-0_0] Instantiated a transactional producer.
2019-01-21 17:46:32,803 localhost org.apache.kafka.clients.producer.KafkaProducer: [Producer clientId=rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1-0_0-producer, transactionalId=rtt-healthscore-stream-0_0] Overriding the default acks to all since idempotence is enabled.
2019-01-21 17:46:32,818 localhost org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.0.0
2019-01-21 17:46:32,818 localhost org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 3402a8361b734732
2019-01-21 17:46:32,832 localhost org.apache.kafka.clients.producer.internals.TransactionManager: [Producer clientId=rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1-0_0-producer, transactionalId=rtt-healthscore-stream-0_0] ProducerId set to -1 with epoch -1
2019-01-21 17:47:32,833 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance: {}
org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
2019-01-21 17:47:32,843 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] partition assignment took 60062 ms.
    current active tasks: []
    current standby tasks: []
    previous active tasks: []

2019-01-21 17:47:32,845 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN
2019-01-21 17:47:32,845 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] Shutting down
2019-01-21 17:47:32,860 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
2019-01-21 17:47:32,860 localhost org.apache.kafka.streams.KafkaStreams: stream-client [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804] State transition from REBALANCING to ERROR
2019-01-21 17:47:32,860 localhost org.apache.kafka.streams.KafkaStreams: stream-client [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804] All stream threads have died. The instance will be in error state and should be closed.
2019-01-21 17:47:32,860 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] Shutdown complete
Exception in thread "rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] Failed to rebalance.
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:870)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:810)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.

Aucun des paramètres / configurations de kafka n'a changé et tous les courtiers sont disponibles. Ma version Kafka est 2.0. Je suis capable de lire à partir du in_topic du consommateur de la console, donc tout avant cette application va bien. Toute aide est appréciée.


3 commentaires

Pouvez-vous vérifier les journaux du courtier pour tout message ERROR ou WARN?


Journaux d'avertissement / d'erreurs : il s'agit de tous les journaux lorsque l'application a cessé de traiter Les données. J'ai essayé de ne changer que l'app_id de l'application défectueuse, et tout fonctionnait bien. Cela semble donc être un problème d'accès lié à app_id. Pourrait-il essayer d'accéder à des données corrompues et rester bloqué / ne pas savoir chercher ailleurs ces données. Nous avons une réplication de 2 et 4 courtiers.


Donc, juste pour faire un suivi avec vous, j'ai essayé une réinitialisation complète de l'application (globale / locale), et j'ai toujours le même problème. Par coïncidence, l'un des nœuds du courtier est tombé en panne à peu près au moment où cette erreur s'est produite.


3 Réponses :


7
votes

Notre projet a le même échec de timeout après la mise à niveau vers Kafka 2.1, et nous ne connaissons pas encore la raison.

Notre solution temporaire consiste à désactiver la configuration exactement_once qui ignore l'état transactionnel d'initialisation.


0 commentaires

1
votes

Nous avons également eu ces erreurs après une mise à niveau vers la version 2.1 (et je pense aussi lors de la mise à niveau précédente vers des versions antérieures.)

Nous fonctionnons dans un environnement kubernetes où, après une mise à niveau progressive, les courtiers peuvent changer d'adresse IP. Depuis le journal du courtier:

[2019-02-20 02:20:20,085] WARN [TransactionCoordinator id=1001] Connection 
to node 0 (khaki-joey-kafka-0.khaki-joey-kafka-headless.hyperspace-dev/10.233.124.181:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2019-02-20 02:20:57,205] WARN [TransactionCoordinator id=1001] Connection to node 1 (khaki-joey-kafka-1.khaki-joey-kafka-headless.hyperspace-dev/10.233.122.67:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Je peux voir que le coordinateur de transaction utilise toujours des adresses IP périmées pour les 2 courtiers qui ont été redémarrés après lui-même (un jour après la mise à niveau.)

Options possibles:

  • Comme le dit cette réponse , désactivez Exactly Once pour votre streamer. Il n'utilise alors pas de transactions et tout semble fonctionner correctement. Inutile si vous avez besoin d'EOS ou d'un autre code client nécessitant des transactions.
  • redémarrez tous les courtiers qui signalent des avertissements pour les forcer à résoudre à nouveau l'adresse IP. Ils devraient être redémarrés de manière à ne pas modifier eux-mêmes leur adresse IP. Ce n'est généralement pas possible dans kubernetes.

Problème soulevé Problème KAFKA-7958 - Les transactions sont interrompues avec l'hébergement Kubernetes courtiers

Mise à jour 20/02/2017 Ce problème a peut-être été résolu dans Kafka 2.1.1 (Confluent 5.1.2 ) publié aujourd'hui . Voir le problème associé.


1 commentaires

D'après mon expérience (Kafka 2.3.x), ce n'est pas encore résolu (Confluent version 5.3.x).



0
votes
It's resolved after upgrade
https://kafka.apache.org/25/documentation/streams/developer-guide/write-streams.html

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.0</version>
    </dependency>
    <!-- Optionally include Kafka Streams DSL for Scala for Scala 2.12 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-scala_2.12</artifactId>
        <version>2.5.0</version>
    </dependency>

0 commentaires