2
votes

Comment gérer les éléments tardifs dans la fenêtre API Flink's Table?

Dans notre application de streaming, qui utilise Flink 1.55 et son API de table, j'ai besoin de détecter et de gérer les éléments tardifs. Je ne parviens pas à trouver d'alternative à la fonctionnalité de l'API DataStream .sideOutputLateData (...)

J'ai essayé de rechercher dans la documentation Flink https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/tableApi. html et google beaucoup et je n'ai rien trouvé d'utile

Exemple:

table
  .window(Tumble over windowLengthInMinutes.minutes on 'timeStamp as 'timeWindow)
  .groupBy(..fieds list)
  .select(..fields)

Le code fourni fonctionne comme prévu. Le problème est que les éléments qui arrivent en retard, comme défini par la taille de la fenêtre et le retard autorisé, sont rejetés. Existe-t-il un moyen de gérer ces derniers éléments de manière native par l'API Table?


1 commentaires

Il ne semble pas que cette fonctionnalité soit prise en charge directement sur l'API de table. Vous pouvez convertir la table en un DataStream et définir la sortie secondaire à ce sujet.


3 Réponses :


1
votes

Depuis Flink 1.8.0, il ne semble pas que l'API Table le prenne actuellement en charge directement. Une façon de contourner ce problème consiste à convertir votre table en DataStream [Row] et à définir la sortie secondaire sur celle-ci:

val outputTag = OutputTag[String]("side-output")

val flink = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(flink)

// Make sure the source emits data to the selected side output
tableEnv.registerTableSource(...)
val table = tableEnv.sqlQuery("QUERY")

// Can also be toAppendStream, depending on the underlying table output
val dataStream = tableEnv.toRetractStream(table)
val sideOutputStream = dataStream.getSideOutput(outputTag)


0 commentaires

0
votes

J'ai trouvé une solution. J'utilisais actuellement BoundedOutOfOrdernessTimestampExtractor, qui fournit les informations d'horodatage du filigrane. J'ai utilisé ces informations pour diviser le flux d'entrée et traiter le flux tardif séparément.


1 commentaires

WDYM en "divisant le flux d'entrée"? Peut-être fournissez un extrait de code supplémentaire avec votre réponse.



0
votes

Voici l'exemple de code pour le fractionnement:

  val mainSource = env.addSource(someSource)

  val splitted = mainSource.split(
      (x:DataKpi)=> isLate(x.getTimestamp) match {
        case false =>List("regular")
        case true =>List("late")
      }
    )
  val regularSource= splitted select "regular"
  val lateSource=    splitted select "late"

regularSource et lateSource sont de nouveaux flux traités séparément plus tard. Avant d'utiliser cette approche, nous rencontrions des doublons. La fonction isLate () est une fonction personnalisée qui décide si l'élément est probablement en retard ou non. Cette fonction utilise les informations du filigrane actuel fournies par BoundedOutOfOrdernessTimestampExtractor.getCurrentWatermark.getTimestamp dans mon cas.


0 commentaires