Je souhaite traiter un sujet au démarrage de l'application à l'aide du client Confluent dotnet . Supposons l'exemple suivant:
while (true) { try { var cr = c.Consume(); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } }
Lorsqu'il n'y a pas de nouveau message dans Kafka, c.Consume sera bloqué. Parce que je veux l'utiliser pour le démarrage de l'application (comme le réchauffement du cache), je veux continuer mon code lorsque je trouve qu'il n'y a pas de nouveau message.
Je sais qu'il y a une surcharge pour définir le délai d'expiration comme c.Consume (timeout)
mais le problème avec cette approche est que si vous avez un message dans votre sujet et la durée de lecture du message dépassait votre délai, vous recevez une sortie nulle, ce qui n'est pas souhaitable.
3 Réponses :
Le ou les consommateurs ne sont pas censés connaître le (s) producteur (s).
Maintenant, si vous voulez savoir que vous avez tout lu dans le sujet à partir du moment où vous commencez à consommer, vous pouvez:
Je ne suis pas un développeur C #
mais d'après ce que j'ai lu dans la documentation dotnet confluent, vous pouvez appeler QueryWatermarkOffsets
sur le consommateur pour obtenir le décalage le plus ancien et le plus récent. https: // docs. confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Message.html#Confluent_Kafka_Message_Offset
Merci. Cela me donne le décalage final et fonctionne très bien! Pourriez-vous ajouter la méthode QueryWatermarkOffsets
à votre réponse? Je reçois le filigrane par la requête suivante: var watermark = c.QueryWatermarkOffsets (new TopicPartition ("mytopic", new Partition (0)), TimeSpan.FromMilliseconds (1));
Bonjour, j'ai mis à jour ma réponse pour mentionner QueryWatermarkOffsets
au lieu de GetWatermarkOffsets
. Heureux que cela vous ait aidé.
Vous pouvez utiliser l'événement OnPartitionEOF
qui indique que vous avez atteint la fin de la partition.
CancellationTokenSource source = new CancellationTokenSource(); bool isContinue = true; c.OnPartitionEOF += (o, e) => { Console.WriteLine($"You have reached end of partition"); isContinue = false; source.Cancel(); }; while (isContinue) { try { var cr = c.Consume(source.Token); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } }
Merci. Mais OnPartitionEOF
a été supprimé comme décrit dans github.com/confluentinc/confluent-kafka-dotnet/blob/master/...
Vous êtes le bienvenu. Ainsi, vous pouvez détecter la fin de la partition en vérifiant cr.IsPartitionEOF
après avoir consommé.
IsPartitionEOF
devrait fonctionner, mais je ne sais pas pourquoi il renvoie toujours false (je viens de créer un sujet avec un seul message). De plus, je dois attendre le premier appel Consume (qui pourrait être bloqué) pour vérifier IsPartitionEOF
. Peut-être qu'une version stable résout le problème.
Je n'ai pas travaillé car je ne l'ai pas activé dans config: var config = new ConsumerConfig {EnablePartitionEof = true}; Pourriez-vous modifier votre réponse pour refléter la fonctionnalité IsPartitionEOF afin que d'autres utilisateurs puissent l'utiliser?
J'ai trouvé le Consumer.IsPartitionEOF utile.
Veuillez essayer d'expliquer correctement la réponse.
Alors, quelle est votre approche souhaitable pour arrêter de consommer?
Existe-t-il une méthode qui indique que nous sommes en fin de partition?