1
votes

Transformer les colonnes dans Spark DataFrame en fonction de la carte sans utiliser les UDF

Je voudrais transformer certaines colonnes de mon dataframe en fonction de la configuration représentée par les cartes Scala.

J'ai 2 cas:

  1. Réception d'une carte Map [String, Seq [String]] et des colonnes col1, col2, pour transformer col3 s'il y a une entité dans une carte avec key = col1, et col2 est dans cette entité liste de valeurs.
  2. Réception d'une carte Map [String, (Long, Long) et col1, col2, pour transformer col3 si il y a une entité dans une carte avec key = col1 et col2 est dans une plage décrite par le tuple de Longs comme (début, fin).

exemples :

cas 1 ayant cette table, et une carte Map (u1-> Seq (w1, w11), u2 -> Seq (w2, w22))

+------+------+------+
| col1 | col2 | col3 |
+------+------+------+
| u1   | 2    | x-v1 |
+------+------+------+
| u1   | 6    | v11  |
+------+------+------+
| u2   | 3    | x-v3 |
+------+------+------+
| u3   | 4    | v3   |
+------+------+------+

Je voudrais ajouter " x- "préfixe à col3, uniquement s'il correspond au terme

+------+------+------+
| col1 | col2 | col3 |
+------+------+------+
| u1   | 2    | v1   |
+------+------+------+
| u1   | 6    | v11  |
+------+------+------+
| u2   | 3    | v3   |
+------+------+------+
| u3   | 4    | v3   |
+------+------+------+

cas 2: Cette table et carte Map ("u1" -> (1,5), u2 -> (2, 4))

+------+------+------+
| col1 | col2 | col3 |
+------+------+------+
| u1   | w1   | x-v1 |
+------+------+------+
| u2   | w2   | x-v2 |
+------+------+------+
| u3   | w3   | v3   |
+------+------+------+

La sortie attendue doit être:

+------+------+------+
| col1 | col2 | col3 | 
+------+------+------+
| u1   | w1   | v1   |
+------+------+------+
| u2   | w2   | v2   |
+------+------+------+
| u3   | w3   | v3   |
+------+------+------+

Cela peut facilement être fait par les UDF, mais pour les performances concernées, je ne voudrais pas les utiliser.

Y a-t-il un moyen d'y parvenir sans cela dans Spark 2.4.2?

Merci


6 commentaires

pouvez-vous également ajouter un échantillon d'entrée et une sortie attendue? et quelle version d'étincelle ??


@Srinivas exemples ajoutés, merci


version étincelle ??


@Srinivas Spark 2.4.2


est-ce que ça va, si je convertis cette Map ("u1" -> (1,5), u2 -> (2, 4)) en Map ("u1" -> Seq ( 1,5), u2 -> Seq (2, 4)) ?


sûr @Srinivas, notez simplement que dans ce cas, ce sont des plages. donc la tête est la limite inférieure et la dernière est la limite supérieure


3 Réponses :


2
votes

Vérifiez le code ci-dessous.

Remarque -

  • J'ai changé la valeur de votre deuxième carte de cas en Map ("u1" -> Seq (1,5), u2 -> Seq (2, 4))
  • Conversion des valeurs de la carte en carte json , ajout de la carte json en tant que valeurs de colonne à DataFrame, puis application de la logique sur DataFrame
  • Si possible, vous pouvez directement ajouter des valeurs dans la carte json afin d'éviter la conversion de la carte en carte json.

Importez les bibliothèques requises.

scala> caseTwoDF
.withColumn("data",caseTwoExpr)
.withColumn("col3",when(expr("array_contains(sequence(data[col1][0],data[col1][1]),col2)"), concat(lit("x-"),$"col3")).otherwise($"col3"))
.drop("data")
.show(false)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|u1  |2   |x-v1|
|u1  |6   |v11 |
|u2  |3   |x-v3|
|u3  |4   |v3  |
+----+----+----+

Cas-1 logique

scala> val caseTwoExpr = from_json(caseTwoJsonMap,caseTwoSchema)
caseTwoExpr: org.apache.spark.sql.Column = entries

XXX

scala> val caseTwoSchema = MapType(StringType,ArrayType(IntegerType))
caseTwoSchema: org.apache.spark.sql.types.MapType = MapType(StringType,ArrayType(IntegerType,true),true)
scala> val caseTwoJsonMap = lit(compact(render(caseTwoMap)))
caseTwoJsonMap: org.apache.spark.sql.Column = {"u1":[1,5],"u2":[2,4]}
scala> val caseTwoMap = Map("u1" -> Seq(1,5),"u2" -> Seq(2,4))
caseTwoMap: scala.collection.immutable.Map[String,Seq[Int]] = Map(u1 -> List(1, 5), u2 -> List(2, 4))

Sortie finale du cas 1

scala> val caseTwoDF = Seq(("u1",2,"v1"),("u1",6,"v11"),("u2",3,"v3"),("u3",4,"v3")).toDF("col1","col2","col3")
caseTwoDF: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 1 more field]

Logique du cas 2

scala> dfa
.withColumn("data",caseOneExpr)
.withColumn("col3",when(expr("array_contains(data[col1],col2)"),concat(lit("x-"),$"col3")).otherwise($"col3"))
.drop("data")
.show(false)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|u1  |w1  |x-v1|
|u2  |w2  |x-v2|
|u3  |w3  |v3  |
+----+----+----+
scala> val caseOneExpr = from_json(caseOneJsonMap,caseOneSchema)
caseOneExpr: org.apache.spark.sql.Column = entries
scala> val caseOneSchema = MapType(StringType,ArrayType(StringType))
caseOneSchema: org.apache.spark.sql.types.MapType = MapType(StringType,ArrayType(StringType,true),true)
scala> val caseOneJsonMap = lit(compact(render(caseOneMap)))
caseOneJsonMap: org.apache.spark.sql.Column = {"u1":["w1","w11"],"u2":["w2","w22"]}
scala> val caseOneMap = Map("u1" -> Seq("w1","w11"),"u2" -> Seq("w2","w22"))
caseOneMap: scala.collection.immutable.Map[String,Seq[String]] = Map(u1 -> List(w1, w11), u2 -> List(w2, w22))

Sortie finale du cas 2

scala> val caseOneDF = Seq(("u1","w1","v1"),("u2","w2","v2"),("u3","w3","v3")).toDF("col1","col2","col3")
caseOneDF: org.apache.spark.sql.DataFrame = [col1: string, col2: string ... 1 more field]


1 commentaires

Cela fonctionne pour moi!!! J'essaie maintenant de me convaincre que cela améliorera en fait mes performances au travail. Merci pour la suggestion incroyable !!!



2
votes

Une autre alternative -

import org.apache.spark.sql.functions.typedLit

Case-1

val case2 = Map("u1" -> (1,5), "u2" -> (2, 4))
    val p = df2.withColumn("case2", typedLit(case2))
      .withColumn("col3",
        when(expr("col2 between case2[col1]._1 and case2[col1]._2"), concat(lit("x-"), $"col3"))
          .otherwise($"col3")
      )
    p.show(false)
    p.printSchema()

    /**
      * +----+----+----+----------------------------+
      * |col1|col2|col3|case2                       |
      * +----+----+----+----------------------------+
      * |u1  |2   |x-v1|[u1 -> [1, 5], u2 -> [2, 4]]|
      * |u1  |6   |v11 |[u1 -> [1, 5], u2 -> [2, 4]]|
      * |u2  |3   |x-v3|[u1 -> [1, 5], u2 -> [2, 4]]|
      * |u3  |4   |v3  |[u1 -> [1, 5], u2 -> [2, 4]]|
      * +----+----+----+----------------------------+
      *
      * root
      * |-- col1: string (nullable = true)
      * |-- col2: integer (nullable = true)
      * |-- col3: string (nullable = true)
      * |-- case2: map (nullable = false)
      * |    |-- key: string
      * |    |-- value: struct (valueContainsNull = true)
      * |    |    |-- _1: integer (nullable = false)
      * |    |    |-- _2: integer (nullable = false)
      */
df2.show(false)
    df2.printSchema()
    /**
      * +----+----+----+
      * |col1|col2|col3|
      * +----+----+----+
      * |u1  |2   |v1  |
      * |u1  |6   |v11 |
      * |u2  |3   |v3  |
      * |u3  |4   |v3  |
      * +----+----+----+
      *
      * root
      * |-- col1: string (nullable = true)
      * |-- col2: integer (nullable = true)
      * |-- col3: string (nullable = true)
      */

Case-2

 val case1 = Map("u1" -> Seq("w1","w11"), "u2" -> Seq("w2","w22"))

    val p1 = df1.withColumn("case1", typedLit(case1))
      .withColumn("col3",
        when(array_contains(expr("case1[col1]"), $"col2"), concat(lit("x-"), $"col3"))
          .otherwise($"col3")
      )
    p1.show(false)
    p1.printSchema()
    /**
      * +----+----+----+----------------------------------+
      * |col1|col2|col3|case1                             |
      * +----+----+----+----------------------------------+
      * |u1  |w1  |x-v1|[u1 -> [w1, w11], u2 -> [w2, w22]]|
      * |u2  |w2  |x-v2|[u1 -> [w1, w11], u2 -> [w2, w22]]|
      * |u3  |w3  |v3  |[u1 -> [w1, w11], u2 -> [w2, w22]]|
      * +----+----+----+----------------------------------+
      *
      * root
      * |-- col1: string (nullable = true)
      * |-- col2: string (nullable = true)
      * |-- col3: string (nullable = true)
      * |-- case1: map (nullable = false)
      * |    |-- key: string
      * |    |-- value: array (valueContainsNull = true)
      * |    |    |-- element: string (containsNull = true)
      */
df1.show(false)
    df1.printSchema()
    /**
      * +----+----+----+
      * |col1|col2|col3|
      * +----+----+----+
      * |u1  |w1  |v1  |
      * |u2  |w2  |v2  |
      * |u3  |w3  |v3  |
      * +----+----+----+
      *
      * root
      * |-- col1: string (nullable = true)
      * |-- col2: string (nullable = true)
      * |-- col3: string (nullable = true)
      */


1 commentaires

Solution bonne et simple .. nouvelle fonction typedLit apprise .. :)



0
votes
+----+----+----+
|col1|col2|col3|
+----+----+----+
|u1  |2   |x-v1|
|u1  |6   |v11 |
|u2  |3   |x-v3|
|u3  |4   |v3  |
+----+----+----+

0 commentaires