Dans notre application de streaming, qui utilise Flink 1.55 et son API de table, j'ai besoin de détecter et de gérer les éléments tardifs. Je ne parviens pas à trouver d'alternative à la fonctionnalité de l'API DataStream .sideOutputLateData (...)
J'ai essayé de rechercher dans la documentation Flink https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/tableApi. html et google beaucoup et je n'ai rien trouvé d'utile
Exemple:
table .window(Tumble over windowLengthInMinutes.minutes on 'timeStamp as 'timeWindow) .groupBy(..fieds list) .select(..fields)
Le code fourni fonctionne comme prévu. Le problème est que les éléments qui arrivent en retard, comme défini par la taille de la fenêtre et le retard autorisé, sont rejetés. Existe-t-il un moyen de gérer ces derniers éléments de manière native par l'API Table?
3 Réponses :
Depuis Flink 1.8.0, il ne semble pas que l'API Table le prenne actuellement en charge directement. Une façon de contourner ce problème consiste à convertir votre table en DataStream [Row]
et à définir la sortie secondaire sur celle-ci:
val outputTag = OutputTag[String]("side-output") val flink = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(flink) // Make sure the source emits data to the selected side output tableEnv.registerTableSource(...) val table = tableEnv.sqlQuery("QUERY") // Can also be toAppendStream, depending on the underlying table output val dataStream = tableEnv.toRetractStream(table) val sideOutputStream = dataStream.getSideOutput(outputTag)
J'ai trouvé une solution. J'utilisais actuellement BoundedOutOfOrdernessTimestampExtractor, qui fournit les informations d'horodatage du filigrane. J'ai utilisé ces informations pour diviser le flux d'entrée et traiter le flux tardif séparément.
WDYM en "divisant le flux d'entrée"? Peut-être fournissez un extrait de code supplémentaire avec votre réponse.
Voici l'exemple de code pour le fractionnement:
val mainSource = env.addSource(someSource) val splitted = mainSource.split( (x:DataKpi)=> isLate(x.getTimestamp) match { case false =>List("regular") case true =>List("late") } ) val regularSource= splitted select "regular" val lateSource= splitted select "late"
regularSource
et lateSource
sont de nouveaux flux traités séparément plus tard. Avant d'utiliser cette approche, nous rencontrions des doublons. La fonction isLate ()
est une fonction personnalisée qui décide si l'élément est probablement en retard ou non. Cette fonction utilise les informations du filigrane actuel fournies par BoundedOutOfOrdernessTimestampExtractor.getCurrentWatermark.getTimestamp
dans mon cas.
Il ne semble pas que cette fonctionnalité soit prise en charge directement sur l'API de table. Vous pouvez convertir la table en un
DataStream
et définir la sortie secondaire à ce sujet.