6
votes

KafkaStreams: Obtenir les résultats finaux de la fenêtre

Est-il possible d'obtenir fenêtre résultat final dans Kafka Streams en supprimant les résultats intermédiaires.

Je ne peux pas atteindre cet objectif. Quel est le problème avec mon code?

Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001: 
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String

Cela conduit à cette erreur:

    val builder = StreamsBuilder()
    builder.stream<String,Double>(inputTopic)
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
            .count()
            .suppress(Suppressed.untilWindowCloses(unbounded())) // not working)
            .toStream()
            .print(Printed.toSysOut())

Détails du code / erreur: https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380


0 commentaires

3 Réponses :


0
votes

Ajoutez Consommé , lorsque vous créez un flux: builder.stream (inputTopic, Consumé. avec (Serdes.String (), Serdes .Double ())


2 commentaires

Cela n'aidera pas car le problème vient de la méthode .suppress . Pour cette clé, Serde doit être défini sur State Store


@NishuTayal Dans mon cas, c'était suffisant. Vous pouvez également lire ici: stackoverflow.com/questions/54036328/...



5
votes

Le problème vient du KeySerde. Étant donné que l'opération WindowedBy entraîne une clé de type Windowed mais .suppress () utilise un type de clé par défaut.

Par conséquent, vous avez besoin pour définir KeySerde sur le magasin d'état en appelant la méthode de comptage comme indiqué ci-dessous:

      builder.stream<String,Double>inputTopic)
      .groupByKey()
      .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
      .count(Materialized.<String, Long, WindowStore<Bytes,byte[]>>as("count").withCachingDisabled().withKeySerde(Serdes.String()))
      .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
      .toStream()
      . print(Printed.toSysOut());


0 commentaires

18
votes

Le problème est une asymétrie déroutante dans la façon dont Streams encapsule automatiquement les serdes explicites pendant le fenêtrage, mais n'enveloppe pas automatiquement le serde par défaut. À mon humble avis, c'est un oubli qui devrait être corrigé, j'ai donc déposé: https: // issues.apache.org/jira/browse/KAFKA-7806

Comme d'autres l'ont noté, la solution est de définir explicitement le serde de clé en amont et de ne pas compter sur le serde de clé par défaut. Vous pouvez soit:

Définir les serdes sur l'agrégation fenêtrée avec Materialized

properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

(comme Nishu recommandé) p >

(notez qu'il n'est pas nécessaire de nommer l'opération count , ce qui a pour effet secondaire de la rendre interrogeable)

Ou définir les serdes plus en amont, par exemple sur l'entrée:

.windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ZERO))

(comme le recommande wardziniak)

Le choix vous appartient; Je pense que dans ce cas, ce n'est pas trop différent dans les deux cas. Si vous faisiez une agrégation différente de count , vous définiriez probablement la valeur serde via Materialized de toute façon, alors peut-être que le premier serait un style plus uniforme. p>

J'ai également remarqué que votre définition de fenêtre n'a pas de période de grâce définie. L'heure de fermeture de la fenêtre est définie comme fin de fenêtre + période de grâce , et la valeur par défaut est de 24 heures, vous ne verrez donc rien émis de la suppression tant que 24 heures de données n'auront pas été exécutées via l'application .

Pour vos efforts de test, je vous recommande d'essayer:

val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic, Consumed.with(Serdes.String(), Serdes.Double()))
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
        .count()
        .suppress(Suppressed.untilWindowCloses(unbounded())))
        .toStream()
        .print(Printed.toSysOut())

En production, vous voudrez sélectionner une période de grâce qui équilibre le montant du retard d'événement que vous attendez dans votre flux avec la quantité de promptitude d'émission que vous souhaitez voir de la suppression.

Une dernière note, j'ai remarqué dans votre résumé que vous n'avez pas changé l'intervalle de mise en cache ou de validation par défaut . En conséquence, vous remarquerez que l'opérateur count lui-même mettra en mémoire tampon les mises à jour pendant 30 secondes par défaut avant de les passer en suppression. C'est une bonne configuration pour la production afin de ne pas créer de goulot d'étranglement sur votre disque local ou sur le courtier Kafka. Mais cela pourrait vous surprendre pendant que vous testez.

En règle générale, pour les tests (ou pour essayer des choses de manière interactive), je désactiverai la mise en cache et définirai un intervalle de validation court pour une santé optimale du développeur:

XXX

Désolé pour la surveillance du serde. J'espère que le KAFKA-7806 sera bientôt traité.

J'espère que cela vous aidera!


1 commentaires

Explication incroyable! Merci d'avoir inclus les informations sur les intervalles de validation, je viens de faire ma journée!