1
votes

Comment inclure la valeur d'horodatage kafka en tant que colonnes dans le streaming structuré Spark?

Je recherche la solution pour ajouter la valeur d'horodatage de kafka à mon schéma de streaming structuré Spark. J'ai extrait le champ de valeur de kafka et j'ai créé un dataframe. Mon problème est que j'ai besoin d'obtenir le champ d'horodatage (de kafka) également avec les autres colonnes.

Voici mon code actuel:

val kafkaDatademostr = spark
  .readStream 
  .format("kafka")
  .option("kafka.bootstrap.servers","zzzz.xxx.xxx.xxx.com:9002")
  .option("subscribe","csvstream")
  .load

val interval = kafkaDatademostr.select(col("value").cast("string")).alias("csv")
  .select("csv.*")

val xmlData = interval.selectExpr("split(value,',')[0] as ddd" ,
    "split(value,',')[1] as DFW",
    "split(value,',')[2] as DTG",
    "split(value,',')[3] as CDF",
    "split(value,',')[4] as DFO",
    "split(value,',')[5] as SAD",
    "split(value,',')[6] as DER",
    "split(value,',')[7] as time_for",
    "split(value,',')[8] as fort")

Comment puis-je obtenir l'horodatage de kafka et l'ajouter en tant que colonnes avec d'autres colonnes?


0 commentaires

3 Réponses :


2
votes

L'horodatage est inclus dans le schéma source. Ajoutez simplement un "horodatage de sélection" pour obtenir l'horodatage comme ci-dessous.

val interval = kafkaDatademostr.select(col("value").cast("string").alias("csv"), col("timestamp")).select("csv.*", "timestamp")


0 commentaires

1
votes

Sur la page Web officielle d'Apache Spark, vous pouvez trouver le guide: Guide d'intégration Structured Streaming + Kafka (courtier Kafka version 0.10.0 ou supérieure)

Vous y trouverez des informations sur le schéma de DataFrame chargé depuis Kafka.

Chaque La ligne de la source Kafka a les colonnes suivantes:

  • clé - clé de message
  • valeur - valeur du message
  • sujet - nom du sujet du message
  • partition - partitions d'où provient ce message
  • offset - décalage du message
  • horodatage - horodatage
  • type d'horodatage timestampType

Toutes les colonnes ci-dessus peuvent être interrogées. Dans votre exemple, vous n'utilisez que la valeur , donc pour obtenir l'horodatage, il vous suffit d'ajouter horodatage à votre instruction de sélection:

  val allFields = kafkaDatademostr.selectExpr(
    s"CAST(value AS STRING) AS csv",
    s"CAST(key AS STRING) AS key",
    s"topic as topic",
    s"partition as partition",
    s"offset as offset",
    s"timestamp as timestamp",
    s"timestampType as timestampType"
  )

p>


1 commentaires

Que faire si je souhaite ajouter un horodatage à Kafka à partir de Spark au lieu de l'inverse? Je n'ai trouvé aucun guide de ce type dans le lien fourni ici. Y a-t-il une configuration que je devrais vérifier dans Kafka ou Spark?



0
votes

Dans mon cas de Kafka, je recevais les valeurs au format JSON. Qui contient les données réelles avec l'heure de l'événement d'origine et non l'horodatage kafka. Voici le schéma.

import spark.implicits._
val windowedData = df1.withWatermark("time","1 minute")
                      .groupBy(
                          window(col("time"), "1 minute", "30 seconds"),
                          $"close"
                      ).count()

Pour utiliser la fonction filigrane de Spark Structured Streaming, j'ai dû convertir le champ heure dans le champ format d'horodatage.

val df1 = df.selectExpr("CAST(value AS STRING)").as[(String)]
      .select(from_json($"value", mySchema).as("data"))
      .select(col("data.time").cast("timestamp").alias("time"),col("data.close"))

Vous pouvez maintenant utiliser le champ d'heure pour opération de fenêtre ainsi que filigrane objectif.

val mySchema = StructType(Array(
      StructField("time", LongType),
      StructField("close", DoubleType)
    ))

J'espère que cette réponse clarifie.


0 commentaires