Je recherche la solution pour ajouter la valeur d'horodatage de kafka à mon schéma de streaming structuré Spark. J'ai extrait le champ de valeur de kafka et j'ai créé un dataframe. Mon problème est que j'ai besoin d'obtenir le champ d'horodatage (de kafka) également avec les autres colonnes.
Voici mon code actuel:
val kafkaDatademostr = spark .readStream .format("kafka") .option("kafka.bootstrap.servers","zzzz.xxx.xxx.xxx.com:9002") .option("subscribe","csvstream") .load val interval = kafkaDatademostr.select(col("value").cast("string")).alias("csv") .select("csv.*") val xmlData = interval.selectExpr("split(value,',')[0] as ddd" , "split(value,',')[1] as DFW", "split(value,',')[2] as DTG", "split(value,',')[3] as CDF", "split(value,',')[4] as DFO", "split(value,',')[5] as SAD", "split(value,',')[6] as DER", "split(value,',')[7] as time_for", "split(value,',')[8] as fort")
Comment puis-je obtenir l'horodatage de kafka et l'ajouter en tant que colonnes avec d'autres colonnes?
3 Réponses :
L'horodatage est inclus dans le schéma source. Ajoutez simplement un "horodatage de sélection" pour obtenir l'horodatage comme ci-dessous.
val interval = kafkaDatademostr.select(col("value").cast("string").alias("csv"), col("timestamp")).select("csv.*", "timestamp")
Sur la page Web officielle d'Apache Spark, vous pouvez trouver le guide: Guide d'intégration Structured Streaming + Kafka (courtier Kafka version 0.10.0 ou supérieure)
Vous y trouverez des informations sur le schéma de DataFrame chargé depuis Kafka.
Chaque La ligne de la source Kafka a les colonnes suivantes:
Toutes les colonnes ci-dessus peuvent être interrogées.
Dans votre exemple, vous n'utilisez que la valeur
, donc pour obtenir l'horodatage, il vous suffit d'ajouter horodatage
à votre instruction de sélection:
val allFields = kafkaDatademostr.selectExpr( s"CAST(value AS STRING) AS csv", s"CAST(key AS STRING) AS key", s"topic as topic", s"partition as partition", s"offset as offset", s"timestamp as timestamp", s"timestampType as timestampType" )
p>
Que faire si je souhaite ajouter un horodatage à Kafka à partir de Spark au lieu de l'inverse? Je n'ai trouvé aucun guide de ce type dans le lien fourni ici. Y a-t-il une configuration que je devrais vérifier dans Kafka ou Spark?
Dans mon cas de Kafka, je recevais les valeurs au format JSON. Qui contient les données réelles avec l'heure de l'événement d'origine et non l'horodatage kafka. Voici le schéma.
import spark.implicits._ val windowedData = df1.withWatermark("time","1 minute") .groupBy( window(col("time"), "1 minute", "30 seconds"), $"close" ).count()
Pour utiliser la fonction filigrane de Spark Structured Streaming, j'ai dû convertir le champ heure dans le champ format d'horodatage.
val df1 = df.selectExpr("CAST(value AS STRING)").as[(String)] .select(from_json($"value", mySchema).as("data")) .select(col("data.time").cast("timestamp").alias("time"),col("data.close"))
Vous pouvez maintenant utiliser le champ d'heure pour opération de fenêtre ainsi que filigrane objectif.
val mySchema = StructType(Array( StructField("time", LongType), StructField("close", DoubleType) ))
J'espère que cette réponse clarifie.