J'essaie d'écrire un dataframe dans kafka au format JSON et d'ajouter une clé au data frame dans Scala, je travaille actuellement avec cet exemple de kafka-spark:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .write .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic1") .save()
Existe-t-il une méthode to_json
qui peut être utilisée (au lieu de l'option json (path)
qui, je crois, écrit dans un fichier au format JSON) et y a-t-il un Option key
qui peut être utilisée pour remplacer la valeur null
par une clé réelle.
3 Réponses :
Vous pouvez utiliser la méthode toJSON ()
sur dataframe pour convertir votre enregistrement en message json.
import org.apache.spark.sql.Column; someDF.withColumn("key",lit("name")).show() // replace "name" with your variable someDF.withColumn("key",lit("name")).toJSON.first() // toJSON is available as variable on dataframe in Scala someDF.withColumn("key",lit("name")).toJSON.first() res5: String = {"number":8,"word":"bat","key":"name"}
Votre code peut être comme ça strong >
from pyspark.sql.functions import lit df.withColumn('key', lit(datetime.now())).toJSON() .write .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic1") .save()
Scala :
df = spark.createDataFrame([('user_first_name','user_last_nmae',100)], ['first_name','last_name','ID']) import json from datetime import datetime from pyspark.sql.functions import lit json_df = json.loads(df.withColumn('date_as_key', lit(datetime.now().date())).toJSON().first()) print json_df {u'date_as_key': u'2019-07-29', u'first_name': u'user_first_name', u'last_name': u'user_last_nmae', u'ID': 100}
Je viens de modifier la question pour ajouter que je fais cela dans Scala. Mais peu importe, pour ajouter la clé, que se passe-t-il si la clé est une variable? puis-je analyser les variables dans selectExpr? selectExpr ("CAST (" + variable + "AS STRING)", "CAST (value AS STRING)"). toJSON ()
Je suppose que vous souhaitez ajouter une colonne clé qui est une variable à votre message, a mis à jour la réponse
C'est ce que je vise à faire, mais dans scala: bigdatums.net/2017/05/21/… De plus, il semble que toJSON ()
ne soit pas une fonction valide dans scala
Vous pouvez utiliser la fonction SQL to_json
pour convertir vos colonnes en JSON.
Voir le code Scala ci-dessous qui utilise également le Fonction intégrée Spark SQL struct
dans la version Spark 2.4.5. Assurez-vous simplement que vous nommez vos colonnes comme clé
et valeur
ou que vous appliquez les alias correspondants dans votre selectExpr
.
kafka-console-consumer --bootstrap-server localhost:9092 --property print.key=true --property print.value=true --topic test-topic Alice {"age":3,"name":"Alice"} Bob {"age":5,"name":"Bob"}
Cela renverra les données suivantes dans votre sujet Kafka:
import org.apache.spark.sql.functions.{col, struct, to_json} import org.apache.spark.sql.SparkSession object Main extends App { val spark = SparkSession.builder() .appName("myAppName") .master("local[*]") .getOrCreate() // create DataFrame import spark.implicits._ val df = Seq((3, "Alice"), (5, "Bob")).toDF("age", "name") df.show(false) // convert columns into json string val df2 = df.select(col("name"),to_json(struct($"*"))).toDF("key", "value") df2.show(false) // +-----+------------------------+ // |key |value | // +-----+------------------------+ // |Alice|{"age":3,"name":"Alice"}| // |Bob |{"age":5,"name":"Bob"} | // +-----+------------------------+ // write to Kafka with jsonString as value df2.selectExpr("key", "value") .write .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", "test-topic") .save() }
Ceci est un exemple minimal dans scala. Supposons que vous ayez un dataframe df
avec les colonnes x
et y
. Voici un exemple minimal:
spark-shell --packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1"
N'oubliez pas que vous avez besoin de la bibliothèque spark-sql-kafka
: par exemple pour spark-shell
est chargé avec
val dataDS = ( df .select( $"x".cast(StringType), $"y".cast(StringType) ) .toJSON .withColumn("key", lit("keyname")) ) ( dataDS .write .format("kafka") .option("kafka.bootstrap.servers", "servername:port") .option("topic", "topicname") .save() )