1
votes

Retour du consommateur Kafka en l'absence de message

Je souhaite traiter un sujet au démarrage de l'application à l'aide du client Confluent dotnet . Supposons l'exemple suivant:

    while (true)
    {
        try
        {
            var cr = c.Consume();
            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"Error occured: {e.Error.Reason}");
        }
    }

Lorsqu'il n'y a pas de nouveau message dans Kafka, c.Consume sera bloqué. Parce que je veux l'utiliser pour le démarrage de l'application (comme le réchauffement du cache), je veux continuer mon code lorsque je trouve qu'il n'y a pas de nouveau message.

Je sais qu'il y a une surcharge pour définir le délai d'expiration comme c.Consume (timeout) mais le problème avec cette approche est que si vous avez un message dans votre sujet et la durée de lecture du message dépassait votre délai, vous recevez une sortie nulle, ce qui n'est pas souhaitable.


2 commentaires

Alors, quelle est votre approche souhaitable pour arrêter de consommer?


Existe-t-il une méthode qui indique que nous sommes en fin de partition?


3 Réponses :


1
votes

Le ou les consommateurs ne sont pas censés connaître le (s) producteur (s).

Maintenant, si vous voulez savoir que vous avez tout lu dans le sujet à partir du moment où vous commencez à consommer, vous pouvez:

  1. Chargez le dernier décalage avant de commencer à consommer.
  2. Ensuite, commencez à consommer des messages.
  3. Si le décalage du message est le même que le dernier décalage que vous avez chargé auparavant, arrêtez de consommer.

Je ne suis pas un développeur C # mais d'après ce que j'ai lu dans la documentation dotnet confluent, vous pouvez appeler QueryWatermarkOffsets sur le consommateur pour obtenir le décalage le plus ancien et le plus récent. https: // docs. confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Message.html#Confluent_Kafka_Message_Offset


2 commentaires

Merci. Cela me donne le décalage final et fonctionne très bien! Pourriez-vous ajouter la méthode QueryWatermarkOffsets à votre réponse? Je reçois le filigrane par la requête suivante: var watermark = c.QueryWatermarkOffsets (new TopicPartition ("mytopic", new Partition (0)), TimeSpan.FromMilliseconds (1));


Bonjour, j'ai mis à jour ma réponse pour mentionner QueryWatermarkOffsets au lieu de GetWatermarkOffsets . Heureux que cela vous ait aidé.



2
votes

Vous pouvez utiliser l'événement OnPartitionEOF qui indique que vous avez atteint la fin de la partition.

CancellationTokenSource source = new CancellationTokenSource();
bool isContinue = true;

c.OnPartitionEOF += (o, e) =>
    {
        Console.WriteLine($"You have reached end of partition");
        isContinue = false;
        source.Cancel();
    };    
while (isContinue)
{
    try
    {
        var cr = c.Consume(source.Token);
        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
    }
    catch (ConsumeException e)
    {
        Console.WriteLine($"Error occured: {e.Error.Reason}");
    }
}


4 commentaires

Merci. Mais OnPartitionEOF a été supprimé comme décrit dans github.com/confluentinc/confluent-kafka-dotnet/blob/master/...


Vous êtes le bienvenu. Ainsi, vous pouvez détecter la fin de la partition en vérifiant cr.IsPartitionEOF après avoir consommé.


IsPartitionEOF devrait fonctionner, mais je ne sais pas pourquoi il renvoie toujours false (je viens de créer un sujet avec un seul message). De plus, je dois attendre le premier appel Consume (qui pourrait être bloqué) pour vérifier IsPartitionEOF . Peut-être qu'une version stable résout le problème.


Je n'ai pas travaillé car je ne l'ai pas activé dans config: var config = new ConsumerConfig {EnablePartitionEof = true}; Pourriez-vous modifier votre réponse pour refléter la fonctionnalité IsPartitionEOF afin que d'autres utilisateurs puissent l'utiliser?



0
votes

J'ai trouvé le Consumer.IsPartitionEOF utile.


1 commentaires

Veuillez essayer d'expliquer correctement la réponse.