J'ai une application Kafka Streams de base qui lit à partir d'un in_topic
, effectue un agrégat de roulement et effectue une jointure pour publier sur un out_topic
. Cela fonctionne bien depuis des semaines, mais il s'est écrasé ce matin et ne démarrera plus. Je ne pense pas que cela ait quoi que ce soit à voir avec le code. Le journal avant l'erreur est:
2019-01-21 17:46:32,803 localhost org.apache.kafka.clients.producer.KafkaProducer: [Producer clientId=rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1-0_0-producer, transactionalId=rtt-healthscore-stream-0_0] Instantiated a transactional producer. 2019-01-21 17:46:32,803 localhost org.apache.kafka.clients.producer.KafkaProducer: [Producer clientId=rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1-0_0-producer, transactionalId=rtt-healthscore-stream-0_0] Overriding the default acks to all since idempotence is enabled. 2019-01-21 17:46:32,818 localhost org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.0.0 2019-01-21 17:46:32,818 localhost org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 3402a8361b734732 2019-01-21 17:46:32,832 localhost org.apache.kafka.clients.producer.internals.TransactionManager: [Producer clientId=rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1-0_0-producer, transactionalId=rtt-healthscore-stream-0_0] ProducerId set to -1 with epoch -1 2019-01-21 17:47:32,833 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance: {} org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms. 2019-01-21 17:47:32,843 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] partition assignment took 60062 ms. current active tasks: [] current standby tasks: [] previous active tasks: [] 2019-01-21 17:47:32,845 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN 2019-01-21 17:47:32,845 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] Shutting down 2019-01-21 17:47:32,860 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD 2019-01-21 17:47:32,860 localhost org.apache.kafka.streams.KafkaStreams: stream-client [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804] State transition from REBALANCING to ERROR 2019-01-21 17:47:32,860 localhost org.apache.kafka.streams.KafkaStreams: stream-client [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804] All stream threads have died. The instance will be in error state and should be closed. 2019-01-21 17:47:32,860 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] Shutdown complete Exception in thread "rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] Failed to rebalance. at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:870) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:810) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
Aucun des paramètres / configurations de kafka n'a changé et tous les courtiers sont disponibles. Ma version Kafka est 2.0. Je suis capable de lire à partir du in_topic
du consommateur de la console, donc tout avant cette application va bien. Toute aide est appréciée.
3 Réponses :
Notre projet a le même échec de timeout après la mise à niveau vers Kafka 2.1, et nous ne connaissons pas encore la raison.
Notre solution temporaire consiste à désactiver la configuration exactement_once
qui ignore l'état transactionnel d'initialisation.
Nous avons également eu ces erreurs après une mise à niveau vers la version 2.1 (et je pense aussi lors de la mise à niveau précédente vers des versions antérieures.)
Nous fonctionnons dans un environnement kubernetes où, après une mise à niveau progressive, les courtiers peuvent changer d'adresse IP. Depuis le journal du courtier:
[2019-02-20 02:20:20,085] WARN [TransactionCoordinator id=1001] Connection to node 0 (khaki-joey-kafka-0.khaki-joey-kafka-headless.hyperspace-dev/10.233.124.181:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2019-02-20 02:20:57,205] WARN [TransactionCoordinator id=1001] Connection to node 1 (khaki-joey-kafka-1.khaki-joey-kafka-headless.hyperspace-dev/10.233.122.67:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Je peux voir que le coordinateur de transaction utilise toujours des adresses IP périmées pour les 2 courtiers qui ont été redémarrés après lui-même (un jour après la mise à niveau.)
Options possibles:
Problème soulevé Problème KAFKA-7958 - Les transactions sont interrompues avec l'hébergement Kubernetes courtiers
Mise à jour 20/02/2017 Ce problème a peut-être été résolu dans Kafka 2.1.1 (Confluent 5.1.2 ) publié aujourd'hui . Voir le problème associé.
D'après mon expérience (Kafka 2.3.x), ce n'est pas encore résolu (Confluent version 5.3.x).
It's resolved after upgrade https://kafka.apache.org/25/documentation/streams/developer-guide/write-streams.html <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency> <!-- Optionally include Kafka Streams DSL for Scala for Scala 2.12 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams-scala_2.12</artifactId> <version>2.5.0</version> </dependency>
Pouvez-vous vérifier les journaux du courtier pour tout message ERROR ou WARN?
Journaux d'avertissement / d'erreurs : il s'agit de tous les journaux lorsque l'application a cessé de traiter Les données. J'ai essayé de ne changer que l'app_id de l'application défectueuse, et tout fonctionnait bien. Cela semble donc être un problème d'accès lié à app_id. Pourrait-il essayer d'accéder à des données corrompues et rester bloqué / ne pas savoir chercher ailleurs ces données. Nous avons une réplication de 2 et 4 courtiers.
Donc, juste pour faire un suivi avec vous, j'ai essayé une réinitialisation complète de l'application (globale / locale), et j'ai toujours le même problème. Par coïncidence, l'un des nœuds du courtier est tombé en panne à peu près au moment où cette erreur s'est produite.