0
votes

spark: comment fusionner des lignes dans un tableau de jsons

Input:

{ "name":"EAN","value":"5057723043", "EPID": "1299", "id1": "xxx", "id2": "yyy" }

Je veux:

pi_df.groupBy(col("id1"), col("id2"))
  //.agg(collect_list(to_json(struct(col("name"), col("value"))).alias("item_specifics"))) // => not working
  .agg(collect_list(struct(col("name"),col("value"))).alias("item_specifics"))

J'ai essayé les deux solutions suivantes à partir de Comment agréger des colonnes dans un tableau json? et comment fusionner des lignes dans la colonne de spark dataframe comme vaild json pour l'écrire dans mysql :

{         "id1": "xxx",
          "id2": "yyy",
          "item_specifics": [
            {
              "name": "EAN",
              "value": "5057723043"
            },
            {
              "name": "MPN",
              "value": "EVBD"
            },
            {
              "name": "EPID",
              "value": "1299"
            }
          ]
}

Mais j'ai:

id1   id2    name   value           epid
"xxx" "yyy"  "EAN"  "5057723043"    "1299"
"xxx" "yyy"  "MPN"  "EVBD"          "1299"

Comment résoudre ce problème? Merci


4 commentaires

Pourriez-vous s'il vous plaît ajouter un exemple d'entrée?


La sortie n'est pas json non valide, sauf si vous mettez xxx et yyy entre guillemets.


Quelle est la version Spark que vous utilisez?


étincelle 2.3.1 .....


3 Réponses :


0
votes

Vous êtes assez proche. Je pense que vous cherchez quelque chose comme ceci:

val pi_df2 = pi_df.withColumn("name", lit("EPID")).
withColumnRenamed("epid", "value").
select("id1", "id2", "name","value")

pi_df.select("id1", "id2", "name","value").
union(pi_df2).withColumn("item_specific", struct(col("name"), col("value"))).
groupBy(col("id1"), col("id2")).
agg(collect_list(col("item_specific")).alias("item_specifics")).
write.json(...)

Le syndicat devrait ramener epid dans les spécificités des articles


4 commentaires

Merci. J'ai essayé la première solution et j'ai trouvé qu'il y avait beaucoup de structures name et value dupliquées dans la colonne "item_specific". Je n'ai aucune idée de pourquoi, mais je suis en train de l'examiner. btw, il est possible d'ajouter epid par udf ?


S'il y a beaucoup de doublons, c'est probablement parce que les données elles-mêmes ont des doublons. Vous pouvez également utiliser collect_set si vous n'en voulez pas, ou faire un df.distinct () avant groupBy. Je ne connais pas de moyen simple d'ajouter epid à udf


pourquoi je n'ai pas vu epid ?


J'ai édité la réponse pour plus de clarté. epid devrait être réintégré pour apparaître dans item_specifics



0
votes

Voici ce que vous devez faire

{
    "id1": "xxx",
    "id2": "yyy",
    "item_specifics": [{
        "name": "MPN",
        "value": "EVBD"
    }, {
        "name": "EAN",
        "value": "5057723043"
    }, {
        "name": "epid",
        "value": "1299"
    }]
}

Contenu de la colonne / sortie item_specifics

    import scala.util.parsing.json.JSONObject
    import scala.collection.mutable.WrappedArray

    //Define udf
    val jsonFun = udf((id1 : String, id2 : String, item_specifics: WrappedArray[Map[String, String]], epid: String)=> {
 //Add epid to item_specifics json
val item_withEPID = item_specifics :+ Map("epid" -> epid)

val item_specificsArray = item_withEPID.map(m => ( Array(Map("name" -> m.keys.toSeq(0), "value" -> m.values.toSeq(0))))).map(m => m.map( mi => JSONObject(mi).toString().replace("\\",""))).flatten.mkString("[",",","]")

 //Add id1 and id2 to output json
val m = Map("id1"-> id1, "id2"-> id2, "item_specifics" -> item_specificsArray.toSeq )
JSONObject(m).toString().replace("\\","")
})

val pi_df = Seq( ("xxx","yyy","EAN","5057723043","1299"), ("xxx","yyy","MPN","EVBD","1299")).toDF("id1","id2","name","value","epid")

//Add epid as part of group by column else the column will not be available after group by and aggregation
val df = pi_df.groupBy(col("id1"), col("id2"), col("epid")).agg(collect_list(map(col("name"), col("value")) as "map").as("item_specifics")).withColumn("item_specifics",jsonFun($"id1",$"id2",$"item_specifics",$"epid"))

df.show(false)

scala> df.show(false)
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id1|id2|epid|item_specifics                                                                                                                                                      |
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|xxx|yyy|1299|{"id1" : "xxx", "id2" : "yyy", "item_specifics" : [{"name" : "MPN", "value" : "EVBD"},{"name" : "EAN", "value" : "5057723043"},{"name" : "epid", "value" : "1299"}]}|
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+


0 commentaires

5
votes

Pour Spark Vous pouvez créer 2 dataframes, une avec nom et valeur et l'autre avec epic comme nom et valeur epic comme valeur et les unir ensemble. Puis agrégez-les comme collect_set et créez un json. Le code devrait ressembler à ceci.
val jsonDF = df.withColumn("map1", struct(col("name"), col("value")))
  .withColumn("map2", struct(lit("epid").as("name"), col("epid").as("value")))
  .groupBy("id1", "id2")
    .agg(collect_set("map1").as("item_specifics1"),
      collect_set("map2").as("item_specifics2"))
  .withColumn("item_specifics", array_union(col("item_specifics1"), col("item_specifics2")))
  .withColumn("json", to_json(struct("id1", "id2", "item_specifics2")))

Pour Spark = 2.4

Il fournit une méthode array_union. Cela pourrait être utile de le faire sans union. Cependant, je ne l'ai pas essayé.

//Creating Test Data
val df = Seq(("xxx","yyy" ,"EAN" ,"5057723043","1299"), ("xxx","yyy" ,"MPN" ,"EVBD", "1299") )
  .toDF("id1", "id2", "name", "value", "epid")

df.show(false)

+---+---+----+----------+----+
|id1|id2|name|value     |epid|
+---+---+----+----------+----+
|xxx|yyy|EAN |5057723043|1299|
|xxx|yyy|MPN |EVBD      |1299|
+---+---+----+----------+----+

val df1 = df.withColumn("map", struct(col("name"), col("value")))
  .select("id1", "id2", "map")

val df2 = df.withColumn("map", struct(lit("EPID").as("name"), col("epid").as("value")))
  .select("id1", "id2", "map")

val jsonDF = df1.union(df2).groupBy("id1", "id2")
  .agg(collect_set("map").as("item_specifics"))
  .withColumn("json", to_json(struct("id1", "id2", "item_specifics")))

jsonDF.select("json").show(false)

+---------------------------------------------------------------------------------------------------------------------------------------------+
|json                                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------------+
|{"id1":"xxx","id2":"yyy","item_specifics":[{"name":"MPN","value":"EVBD"},{"name":"EAN","value":"5057723043"},{"name":"EPID","value":"1299"}]}|
+---------------------------------------------------------------------------------------------------------------------------------------------+

2 commentaires

Merci. mais collect_set ne peut pas supprimer les doublons.


collect_set supprime les doublons et conserve des valeurs uniques. Veuillez vérifier les résultats.