J'essaie d'écrire un dataframe dans kafka au format JSON et d'ajouter une clé au data frame dans Scala, je travaille actuellement avec cet exemple de kafka-spark:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
Existe-t-il une méthode to_json qui peut être utilisée (au lieu de l'option json (path) qui, je crois, écrit dans un fichier au format JSON) et y a-t-il un Option key qui peut être utilisée pour remplacer la valeur null par une clé réelle.
3 Réponses :
Vous pouvez utiliser la méthode toJSON () sur dataframe pour convertir votre enregistrement en message json.
import org.apache.spark.sql.Column;
someDF.withColumn("key",lit("name")).show() // replace "name" with your variable
someDF.withColumn("key",lit("name")).toJSON.first() // toJSON is available as variable on dataframe in Scala
someDF.withColumn("key",lit("name")).toJSON.first()
res5: String = {"number":8,"word":"bat","key":"name"}
Votre code peut être comme ça strong >
from pyspark.sql.functions import lit
df.withColumn('key', lit(datetime.now())).toJSON()
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
Scala :
df = spark.createDataFrame([('user_first_name','user_last_nmae',100)], ['first_name','last_name','ID'])
import json
from datetime import datetime
from pyspark.sql.functions import lit
json_df = json.loads(df.withColumn('date_as_key', lit(datetime.now().date())).toJSON().first())
print json_df
{u'date_as_key': u'2019-07-29', u'first_name': u'user_first_name', u'last_name': u'user_last_nmae', u'ID': 100}
Je viens de modifier la question pour ajouter que je fais cela dans Scala. Mais peu importe, pour ajouter la clé, que se passe-t-il si la clé est une variable? puis-je analyser les variables dans selectExpr? selectExpr ("CAST (" + variable + "AS STRING)", "CAST (value AS STRING)"). toJSON ()
Je suppose que vous souhaitez ajouter une colonne clé qui est une variable à votre message, a mis à jour la réponse
C'est ce que je vise à faire, mais dans scala: bigdatums.net/2017/05/21/… De plus, il semble que toJSON () ne soit pas une fonction valide dans scala
Vous pouvez utiliser la fonction SQL to_json pour convertir vos colonnes en JSON.
Voir le code Scala ci-dessous qui utilise également le Fonction intégrée Spark SQL struct dans la version Spark 2.4.5. Assurez-vous simplement que vous nommez vos colonnes comme clé et valeur ou que vous appliquez les alias correspondants dans votre selectExpr.
kafka-console-consumer --bootstrap-server localhost:9092 --property print.key=true --property print.value=true --topic test-topic
Alice {"age":3,"name":"Alice"}
Bob {"age":5,"name":"Bob"}
Cela renverra les données suivantes dans votre sujet Kafka:
import org.apache.spark.sql.functions.{col, struct, to_json}
import org.apache.spark.sql.SparkSession
object Main extends App {
val spark = SparkSession.builder()
.appName("myAppName")
.master("local[*]")
.getOrCreate()
// create DataFrame
import spark.implicits._
val df = Seq((3, "Alice"), (5, "Bob")).toDF("age", "name")
df.show(false)
// convert columns into json string
val df2 = df.select(col("name"),to_json(struct($"*"))).toDF("key", "value")
df2.show(false)
// +-----+------------------------+
// |key |value |
// +-----+------------------------+
// |Alice|{"age":3,"name":"Alice"}|
// |Bob |{"age":5,"name":"Bob"} |
// +-----+------------------------+
// write to Kafka with jsonString as value
df2.selectExpr("key", "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test-topic")
.save()
}
Ceci est un exemple minimal dans scala. Supposons que vous ayez un dataframe df avec les colonnes x et y . Voici un exemple minimal:
spark-shell --packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1"
N'oubliez pas que vous avez besoin de la bibliothèque spark-sql-kafka : par exemple pour spark-shell est chargé avec
val dataDS = (
df
.select(
$"x".cast(StringType),
$"y".cast(StringType)
)
.toJSON
.withColumn("key", lit("keyname"))
)
(
dataDS
.write
.format("kafka")
.option("kafka.bootstrap.servers", "servername:port")
.option("topic", "topicname")
.save()
)