1
votes

Comment convertir une trame de données en JSON et écrire dans un sujet kafka avec une clé

J'essaie d'écrire un dataframe dans kafka au format JSON et d'ajouter une clé au data frame dans Scala, je travaille actuellement avec cet exemple de kafka-spark:

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
       .write
       .format("kafka")
       .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
       .option("topic", "topic1")
       .save()

Existe-t-il une méthode to_json qui peut être utilisée (au lieu de l'option json (path) qui, je crois, écrit dans un fichier au format JSON) et y a-t-il un Option key qui peut être utilisée pour remplacer la valeur null par une clé réelle.


0 commentaires

3 Réponses :


-1
votes

Vous pouvez utiliser la méthode toJSON () sur dataframe pour convertir votre enregistrement en message json.

import org.apache.spark.sql.Column;
someDF.withColumn("key",lit("name")).show() // replace "name" with your variable
 someDF.withColumn("key",lit("name")).toJSON.first() // toJSON is available as variable on dataframe in Scala

 someDF.withColumn("key",lit("name")).toJSON.first()
res5: String = {"number":8,"word":"bat","key":"name"} 

Votre code peut être comme ça strong >

from pyspark.sql.functions import lit
df.withColumn('key', lit(datetime.now())).toJSON()
       .write
       .format("kafka")
       .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
       .option("topic", "topic1")
       .save()

Scala :

df = spark.createDataFrame([('user_first_name','user_last_nmae',100)], ['first_name','last_name','ID'])

import json
from datetime import datetime
from pyspark.sql.functions import lit
json_df = json.loads(df.withColumn('date_as_key', lit(datetime.now().date())).toJSON().first())
print json_df

{u'date_as_key': u'2019-07-29', u'first_name': u'user_first_name', u'last_name': u'user_last_nmae', u'ID': 100}


3 commentaires

Je viens de modifier la question pour ajouter que je fais cela dans Scala. Mais peu importe, pour ajouter la clé, que se passe-t-il si la clé est une variable? puis-je analyser les variables dans selectExpr? selectExpr ("CAST (" + variable + "AS STRING)", "CAST (value AS STRING)"). toJSON ()


Je suppose que vous souhaitez ajouter une colonne clé qui est une variable à votre message, a mis à jour la réponse


C'est ce que je vise à faire, mais dans scala: bigdatums.net/2017/05/21/… De plus, il semble que toJSON () ne soit pas une fonction valide dans scala



0
votes

Vous pouvez utiliser la fonction SQL to_json pour convertir vos colonnes en JSON.

Voir le code Scala ci-dessous qui utilise également le Fonction intégrée Spark SQL struct dans la version Spark 2.4.5. Assurez-vous simplement que vous nommez vos colonnes comme clé et valeur ou que vous appliquez les alias correspondants dans votre selectExpr.

kafka-console-consumer --bootstrap-server localhost:9092 --property print.key=true --property print.value=true --topic test-topic
Alice   {"age":3,"name":"Alice"}
Bob     {"age":5,"name":"Bob"}

Cela renverra les données suivantes dans votre sujet Kafka:

import org.apache.spark.sql.functions.{col, struct, to_json}
import org.apache.spark.sql.SparkSession

object Main extends App {

  val spark = SparkSession.builder()
    .appName("myAppName")
    .master("local[*]")
    .getOrCreate()

  // create DataFrame
  import spark.implicits._
  val df = Seq((3, "Alice"), (5, "Bob")).toDF("age", "name")
  df.show(false)

  // convert columns into json string
  val df2 = df.select(col("name"),to_json(struct($"*"))).toDF("key", "value")
  df2.show(false)

  // +-----+------------------------+
  // |key  |value                   |
  // +-----+------------------------+
  // |Alice|{"age":3,"name":"Alice"}|
  // |Bob  |{"age":5,"name":"Bob"}  |
  // +-----+------------------------+

  // write to Kafka with jsonString as value
  df2.selectExpr("key", "value")
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "test-topic")
    .save()

}


0 commentaires

0
votes

Ceci est un exemple minimal dans scala. Supposons que vous ayez un dataframe df avec les colonnes x et y . Voici un exemple minimal:

spark-shell --packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1"

N'oubliez pas que vous avez besoin de la bibliothèque spark-sql-kafka : par exemple pour spark-shell est chargé avec

val dataDS = (
    df
    .select(
        $"x".cast(StringType),
        $"y".cast(StringType)
    )
    .toJSON
    .withColumn("key", lit("keyname"))
)

(
    dataDS   
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", "servername:port")
    .option("topic", "topicname")
    .save()
)


0 commentaires