Input:
{ "name":"EAN","value":"5057723043", "EPID": "1299", "id1": "xxx", "id2": "yyy" }
Je veux:
pi_df.groupBy(col("id1"), col("id2")) //.agg(collect_list(to_json(struct(col("name"), col("value"))).alias("item_specifics"))) // => not working .agg(collect_list(struct(col("name"),col("value"))).alias("item_specifics"))
J'ai essayé les deux solutions suivantes à partir de Comment agréger des colonnes dans un tableau json? et comment fusionner des lignes dans la colonne de spark dataframe comme vaild json pour l'écrire dans mysql :
{ "id1": "xxx", "id2": "yyy", "item_specifics": [ { "name": "EAN", "value": "5057723043" }, { "name": "MPN", "value": "EVBD" }, { "name": "EPID", "value": "1299" } ] }
Mais j'ai:
id1 id2 name value epid "xxx" "yyy" "EAN" "5057723043" "1299" "xxx" "yyy" "MPN" "EVBD" "1299"
Comment résoudre ce problème? Merci
3 Réponses :
Vous êtes assez proche. Je pense que vous cherchez quelque chose comme ceci:
val pi_df2 = pi_df.withColumn("name", lit("EPID")). withColumnRenamed("epid", "value"). select("id1", "id2", "name","value") pi_df.select("id1", "id2", "name","value"). union(pi_df2).withColumn("item_specific", struct(col("name"), col("value"))). groupBy(col("id1"), col("id2")). agg(collect_list(col("item_specific")).alias("item_specifics")). write.json(...)
Le syndicat devrait ramener epid dans les spécificités des articles
Merci. J'ai essayé la première solution et j'ai trouvé qu'il y avait beaucoup de structures name
et value
dupliquées dans la colonne "item_specific". Je n'ai aucune idée de pourquoi, mais je suis en train de l'examiner. btw, il est possible d'ajouter epid
par udf
?
S'il y a beaucoup de doublons, c'est probablement parce que les données elles-mêmes ont des doublons. Vous pouvez également utiliser collect_set si vous n'en voulez pas, ou faire un df.distinct () avant groupBy. Je ne connais pas de moyen simple d'ajouter epid à udf
pourquoi je n'ai pas vu epid
?
J'ai édité la réponse pour plus de clarté. epid devrait être réintégré pour apparaître dans item_specifics
Voici ce que vous devez faire
{ "id1": "xxx", "id2": "yyy", "item_specifics": [{ "name": "MPN", "value": "EVBD" }, { "name": "EAN", "value": "5057723043" }, { "name": "epid", "value": "1299" }] }
Contenu de la colonne / sortie item_specifics
import scala.util.parsing.json.JSONObject import scala.collection.mutable.WrappedArray //Define udf val jsonFun = udf((id1 : String, id2 : String, item_specifics: WrappedArray[Map[String, String]], epid: String)=> { //Add epid to item_specifics json val item_withEPID = item_specifics :+ Map("epid" -> epid) val item_specificsArray = item_withEPID.map(m => ( Array(Map("name" -> m.keys.toSeq(0), "value" -> m.values.toSeq(0))))).map(m => m.map( mi => JSONObject(mi).toString().replace("\\",""))).flatten.mkString("[",",","]") //Add id1 and id2 to output json val m = Map("id1"-> id1, "id2"-> id2, "item_specifics" -> item_specificsArray.toSeq ) JSONObject(m).toString().replace("\\","") }) val pi_df = Seq( ("xxx","yyy","EAN","5057723043","1299"), ("xxx","yyy","MPN","EVBD","1299")).toDF("id1","id2","name","value","epid") //Add epid as part of group by column else the column will not be available after group by and aggregation val df = pi_df.groupBy(col("id1"), col("id2"), col("epid")).agg(collect_list(map(col("name"), col("value")) as "map").as("item_specifics")).withColumn("item_specifics",jsonFun($"id1",$"id2",$"item_specifics",$"epid")) df.show(false) scala> df.show(false) +---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |id1|id2|epid|item_specifics | +---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |xxx|yyy|1299|{"id1" : "xxx", "id2" : "yyy", "item_specifics" : [{"name" : "MPN", "value" : "EVBD"},{"name" : "EAN", "value" : "5057723043"},{"name" : "epid", "value" : "1299"}]}| +---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
val jsonDF = df.withColumn("map1", struct(col("name"), col("value"))) .withColumn("map2", struct(lit("epid").as("name"), col("epid").as("value"))) .groupBy("id1", "id2") .agg(collect_set("map1").as("item_specifics1"), collect_set("map2").as("item_specifics2")) .withColumn("item_specifics", array_union(col("item_specifics1"), col("item_specifics2"))) .withColumn("json", to_json(struct("id1", "id2", "item_specifics2")))
Il fournit une méthode array_union. Cela pourrait être utile de le faire sans union. Cependant, je ne l'ai pas essayé.
//Creating Test Data val df = Seq(("xxx","yyy" ,"EAN" ,"5057723043","1299"), ("xxx","yyy" ,"MPN" ,"EVBD", "1299") ) .toDF("id1", "id2", "name", "value", "epid") df.show(false) +---+---+----+----------+----+ |id1|id2|name|value |epid| +---+---+----+----------+----+ |xxx|yyy|EAN |5057723043|1299| |xxx|yyy|MPN |EVBD |1299| +---+---+----+----------+----+ val df1 = df.withColumn("map", struct(col("name"), col("value"))) .select("id1", "id2", "map") val df2 = df.withColumn("map", struct(lit("EPID").as("name"), col("epid").as("value"))) .select("id1", "id2", "map") val jsonDF = df1.union(df2).groupBy("id1", "id2") .agg(collect_set("map").as("item_specifics")) .withColumn("json", to_json(struct("id1", "id2", "item_specifics"))) jsonDF.select("json").show(false) +---------------------------------------------------------------------------------------------------------------------------------------------+ |json | +---------------------------------------------------------------------------------------------------------------------------------------------+ |{"id1":"xxx","id2":"yyy","item_specifics":[{"name":"MPN","value":"EVBD"},{"name":"EAN","value":"5057723043"},{"name":"EPID","value":"1299"}]}| +---------------------------------------------------------------------------------------------------------------------------------------------+
Merci. mais collect_set
ne peut pas supprimer les doublons.
collect_set supprime les doublons et conserve des valeurs uniques. Veuillez vérifier les résultats.
Pourriez-vous s'il vous plaît ajouter un exemple d'entrée?
La sortie n'est pas json non valide, sauf si vous mettez xxx et yyy entre guillemets.
Quelle est la version Spark que vous utilisez?
étincelle 2.3.1 .....