2
votes

Comment lire tous les enregistrements dans un sujet Kafka

J'utilise kafka: kafka_2.12-2.1.0, spring kafka du côté client et je suis coincé avec un problème.

J'ai besoin de charger une carte en mémoire en lisant tous les messages existants dans un sujet kafka. Je l'ai fait en créant un nouveau consommateur (avec un identifiant de groupe de consommateurs unique et en définissant le décalage au earliest ). Ensuite, j'effectue une itération sur le consommateur (méthode de sondage) pour obtenir tous les messages et je m'arrête lorsque les enregistrements du consommateur deviennent vides.

Mais j'ai remarqué que, lorsque je commence à interroger, les premières itérations renvoient les enregistrements des consommateurs comme vides, puis il commence à renvoyer les enregistrements réels. Maintenant, cela brise ma logique car notre code pense qu'il n'y a aucun enregistrement dans le sujet.

J'ai essayé quelques autres méthodes (comme utiliser le nombre de décalages) mais je n'ai pas été en mesure de trouver une solution, à part garder un autre enregistrement quelque part qui me dit combien de messages il y a dans le sujet qui doit être lu avant que je m'arrête .

Une idée, s'il vous plaît?


0 commentaires

3 Réponses :


2
votes

À ma connaissance, ce que vous essayez de réaliser est d'avoir une carte construite dans votre application en fonction des valeurs qui sont déjà dans un sujet spécifique.

Pour cette tâche, au lieu d'interroger manuellement la rubrique, vous pouvez utiliser Ktable dans Kafka Streams DSL qui construira automatiquement un magasin de valeurs-clés lisible, tolérant aux pannes, la réplication activée et automatiquement remplie de nouvelles valeurs.

Vous pouvez le faire simplement en appelant groupByKey sur un flux, puis en utilisant l'agrégat.

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> myKStream = builder.stream(Serdes.String(), Serdes.Long(), "topic_name");
KTable<String, Long> totalCount = myKStream.groupByKey().aggregate(this::initializer, this::aggregator);

(Le code réel peut varier en fonction de la version de kafka, de vos configurations, etc.)

En savoir plus sur les concepts de Kafka Stream ici

Ensuite, j'itère sur le consommateur (méthode de sondage) pour obtenir tous les messages et je m'arrête lorsque les enregistrements du consommateur deviennent vides

Kafka est une plateforme de diffusion de messages. Toutes les données que vous diffusez sont mises à jour en permanence et vous ne devriez probablement pas les utiliser d'une manière dont vous vous attendez à ce que la consommation s'arrête après un certain nombre de messages. Comment allez-vous gérer si un nouveau message arrive après avoir arrêté le consommateur?

Aussi la raison pour laquelle vous obtenez des enregistrements nuls peut-être probablement liée à des enregistrements se trouvant dans des partitions différentes, etc.

Quel est votre cas d'utilisation spécifique ici ?, Il pourrait y avoir un bon moyen de le faire avec la sémantique Kafka elle-même.


4 commentaires

Le cas d'utilisation est simplement de créer en mémoire la carte des messages existants dans kafka au moment du démarrage. Une fois créé, ce consommateur ne sert à rien. Il y a d'autres consommateurs qui écoutent kakfa pour des événements en temps réel, mais ils ne peuvent pas commencer tant que la carte n'est pas remplie. Ces consommateurs utilisent le flux kafka, mais ce consommateur en particulier a juste besoin de lire ce qui est présent et d'arrêter de le publier.


Est-ce toujours les premières itérations qui renvoient des enregistrements nuls ?, avez-vous essayé de mettre un délai d'attente plus long pour le sondage ?, pouvez-vous modifier la question d'origine avec un extrait de votre code consommateur?


Oui, ce sont les 2-3 premières itérations. Vous avez raison, si nous maintenons le temps de sondage à 1-2 secondes, nous ne trouvons pas ces enregistrements vides. Mais encore une fois, nous ne pouvons trouver aucune logique pour décider de l'heure car elle peut différer selon les environnements.


Quel est le délai d'expiration actuel que vous avez maintenant?, Il est impossible de suggérer quoi que ce soit sans voir le code.



0
votes

Vous devez utiliser 2 consommateurs un pour charger les offsets et un autre pour lire tous les enregistrements.

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

public class KafkaRecordReader {

    static final Map<String, Object> props = new HashMap<>();
    static {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-client");
    }

    public static void main(String[] args) {
        final Map<TopicPartition, OffsetInfo> partitionOffsetInfos = getOffsets(Arrays.asList("world, sample"));
        final List<ConsumerRecord<byte[], byte[]>> records = readRecords(partitionOffsetInfos);

        System.out.println(partitionOffsetInfos);
        System.out.println("Read : " + records.size() + " records");
    }

    private static List<ConsumerRecord<byte[], byte[]>> readRecords(final Map<TopicPartition, OffsetInfo> offsetInfos) {
        final Properties readerProps = new Properties();
        readerProps.putAll(props);
        readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "record-reader");

        final Map<TopicPartition, Boolean> partitionToReadStatusMap = new HashMap<>();
        offsetInfos.forEach((tp, offsetInfo) -> {
            partitionToReadStatusMap.put(tp, offsetInfo.beginOffset == offsetInfo.endOffset);
        });

        final List<ConsumerRecord<byte[], byte[]>> cachedRecords = new ArrayList<>();
        try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(readerProps)) {
            consumer.assign(offsetInfos.keySet());
            for (final Map.Entry<TopicPartition, OffsetInfo> entry : offsetInfos.entrySet()) {
                consumer.seek(entry.getKey(), entry.getValue().beginOffset);
            }

            boolean close = false;
            while (!close) {
                final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(100));
                for (final ConsumerRecord<byte[], byte[]> record : consumerRecords) {
                    cachedRecords.add(record);
                    final TopicPartition currentTp = new TopicPartition(record.topic(), record.partition());
                    if (record.offset() + 1 == offsetInfos.get(currentTp).endOffset) {
                        partitionToReadStatusMap.put(currentTp, true);
                    }
                }

                boolean done = true;
                for (final Map.Entry<TopicPartition, Boolean> entry : partitionToReadStatusMap.entrySet()) {
                    done &= entry.getValue();
                }
                close = done;
            }
        }
        return cachedRecords;
    }

    private static Map<TopicPartition, OffsetInfo> getOffsets(final List<String> topics) {
        final Properties offsetReaderProps = new Properties();
        offsetReaderProps.putAll(props);
        offsetReaderProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "offset-reader");

        final Map<TopicPartition, OffsetInfo> partitionOffsetInfo = new HashMap<>();
        try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(offsetReaderProps)) {
            final List<PartitionInfo> partitionInfos = new ArrayList<>();
            topics.forEach(topic -> partitionInfos.addAll(consumer.partitionsFor("sample")));
            final Set<TopicPartition> topicPartitions = partitionInfos
                    .stream()
                    .map(x -> new TopicPartition(x.topic(), x.partition()))
                    .collect(Collectors.toSet());
            consumer.assign(topicPartitions);
            final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
            final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);

            for (final TopicPartition tp : topicPartitions) {
                partitionOffsetInfo.put(tp, new OffsetInfo(beginningOffsets.get(tp), endOffsets.get(tp)));
            }
        }
        return partitionOffsetInfo;
    }

    private static class OffsetInfo {

        private final long beginOffset;
        private final long endOffset;

        private OffsetInfo(long beginOffset, long endOffset) {
            this.beginOffset = beginOffset;
            this.endOffset = endOffset;
        }

        @Override
        public String toString() {
            return "OffsetInfo{" +
                    "beginOffset=" + beginOffset +
                    ", endOffset=" + endOffset +
                    '}';
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            OffsetInfo that = (OffsetInfo) o;
            return beginOffset == that.beginOffset &&
                    endOffset == that.endOffset;
        }

        @Override
        public int hashCode() {
            return Objects.hash(beginOffset, endOffset);
        }
    }
}


1 commentaires

Vous devrez peut-être modifier la ligne if (record.offset() + 1 == offsetInfos.get(currentTp).endOffset) - si vous utilisez un producteur transactionnel car ses décalages ne sont pas linéaires.



0
votes

Ajoutant à la réponse ci-dessus de @arshad, la raison pour laquelle vous n'obtenez pas les enregistrements est que vous les avez déjà lus. Voir cette réponse ici en utilisant la plus ancienne ou la plus récente n'a pas d'importance pour le consommateur une fois que vous avez un décalage engagé pour la partition

J'utiliserais une recherche au début ou le décalage particulier si vous connaissiez le décalage de départ.


0 commentaires