Je voudrais transformer certaines colonnes de mon dataframe en fonction de la configuration représentée par les cartes Scala.
J'ai 2 cas:
Map [String, Seq [String]] et des colonnes col1, col2, pour transformer col3 s'il y a une entité dans une carte avec key = col1, et col2 est dans cette entité liste de valeurs. Map [String, (Long, Long) et col1, col2, pour transformer col3 si
il y a une entité dans une carte avec key = col1 et col2 est dans une plage décrite par le tuple de Longs comme (début, fin). exemples :
cas 1 ayant cette table, et une carte Map (u1-> Seq (w1, w11), u2 -> Seq (w2, w22))
+------+------+------+ | col1 | col2 | col3 | +------+------+------+ | u1 | 2 | x-v1 | +------+------+------+ | u1 | 6 | v11 | +------+------+------+ | u2 | 3 | x-v3 | +------+------+------+ | u3 | 4 | v3 | +------+------+------+
Je voudrais ajouter " x- "préfixe à col3, uniquement s'il correspond au terme
+------+------+------+ | col1 | col2 | col3 | +------+------+------+ | u1 | 2 | v1 | +------+------+------+ | u1 | 6 | v11 | +------+------+------+ | u2 | 3 | v3 | +------+------+------+ | u3 | 4 | v3 | +------+------+------+
cas 2: Cette table et carte Map ("u1" -> (1,5), u2 -> (2, 4))
+------+------+------+ | col1 | col2 | col3 | +------+------+------+ | u1 | w1 | x-v1 | +------+------+------+ | u2 | w2 | x-v2 | +------+------+------+ | u3 | w3 | v3 | +------+------+------+
La sortie attendue doit être:
+------+------+------+ | col1 | col2 | col3 | +------+------+------+ | u1 | w1 | v1 | +------+------+------+ | u2 | w2 | v2 | +------+------+------+ | u3 | w3 | v3 | +------+------+------+
Cela peut facilement être fait par les UDF, mais pour les performances concernées, je ne voudrais pas les utiliser.
Y a-t-il un moyen d'y parvenir sans cela dans Spark 2.4.2?
Merci
3 Réponses :
Vérifiez le code ci-dessous.
Remarque -
Map ("u1" -> Seq (1,5), u2 -> Seq (2, 4)) carte json , ajout de la carte json en tant que valeurs de colonne à DataFrame, puis application de la logique sur DataFrame Importez les bibliothèques requises.
scala> caseTwoDF
.withColumn("data",caseTwoExpr)
.withColumn("col3",when(expr("array_contains(sequence(data[col1][0],data[col1][1]),col2)"), concat(lit("x-"),$"col3")).otherwise($"col3"))
.drop("data")
.show(false)
+----+----+----+
|col1|col2|col3|
+----+----+----+
|u1 |2 |x-v1|
|u1 |6 |v11 |
|u2 |3 |x-v3|
|u3 |4 |v3 |
+----+----+----+
Cas-1 logique
scala> val caseTwoExpr = from_json(caseTwoJsonMap,caseTwoSchema) caseTwoExpr: org.apache.spark.sql.Column = entries
XXX
scala> val caseTwoSchema = MapType(StringType,ArrayType(IntegerType)) caseTwoSchema: org.apache.spark.sql.types.MapType = MapType(StringType,ArrayType(IntegerType,true),true)
scala> val caseTwoJsonMap = lit(compact(render(caseTwoMap)))
caseTwoJsonMap: org.apache.spark.sql.Column = {"u1":[1,5],"u2":[2,4]}
scala> val caseTwoMap = Map("u1" -> Seq(1,5),"u2" -> Seq(2,4))
caseTwoMap: scala.collection.immutable.Map[String,Seq[Int]] = Map(u1 -> List(1, 5), u2 -> List(2, 4))
Sortie finale du cas 1
scala> val caseTwoDF = Seq(("u1",2,"v1"),("u1",6,"v11"),("u2",3,"v3"),("u3",4,"v3")).toDF("col1","col2","col3")
caseTwoDF: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 1 more field]
Logique du cas 2
scala> dfa
.withColumn("data",caseOneExpr)
.withColumn("col3",when(expr("array_contains(data[col1],col2)"),concat(lit("x-"),$"col3")).otherwise($"col3"))
.drop("data")
.show(false)
+----+----+----+
|col1|col2|col3|
+----+----+----+
|u1 |w1 |x-v1|
|u2 |w2 |x-v2|
|u3 |w3 |v3 |
+----+----+----+
scala> val caseOneExpr = from_json(caseOneJsonMap,caseOneSchema) caseOneExpr: org.apache.spark.sql.Column = entries
scala> val caseOneSchema = MapType(StringType,ArrayType(StringType)) caseOneSchema: org.apache.spark.sql.types.MapType = MapType(StringType,ArrayType(StringType,true),true)
scala> val caseOneJsonMap = lit(compact(render(caseOneMap)))
caseOneJsonMap: org.apache.spark.sql.Column = {"u1":["w1","w11"],"u2":["w2","w22"]}
scala> val caseOneMap = Map("u1" -> Seq("w1","w11"),"u2" -> Seq("w2","w22"))
caseOneMap: scala.collection.immutable.Map[String,Seq[String]] = Map(u1 -> List(w1, w11), u2 -> List(w2, w22))
Sortie finale du cas 2
scala> val caseOneDF = Seq(("u1","w1","v1"),("u2","w2","v2"),("u3","w3","v3")).toDF("col1","col2","col3")
caseOneDF: org.apache.spark.sql.DataFrame = [col1: string, col2: string ... 1 more field]
Cela fonctionne pour moi!!! J'essaie maintenant de me convaincre que cela améliorera en fait mes performances au travail. Merci pour la suggestion incroyable !!!
Une autre alternative -
import org.apache.spark.sql.functions.typedLit
val case2 = Map("u1" -> (1,5), "u2" -> (2, 4))
val p = df2.withColumn("case2", typedLit(case2))
.withColumn("col3",
when(expr("col2 between case2[col1]._1 and case2[col1]._2"), concat(lit("x-"), $"col3"))
.otherwise($"col3")
)
p.show(false)
p.printSchema()
/**
* +----+----+----+----------------------------+
* |col1|col2|col3|case2 |
* +----+----+----+----------------------------+
* |u1 |2 |x-v1|[u1 -> [1, 5], u2 -> [2, 4]]|
* |u1 |6 |v11 |[u1 -> [1, 5], u2 -> [2, 4]]|
* |u2 |3 |x-v3|[u1 -> [1, 5], u2 -> [2, 4]]|
* |u3 |4 |v3 |[u1 -> [1, 5], u2 -> [2, 4]]|
* +----+----+----+----------------------------+
*
* root
* |-- col1: string (nullable = true)
* |-- col2: integer (nullable = true)
* |-- col3: string (nullable = true)
* |-- case2: map (nullable = false)
* | |-- key: string
* | |-- value: struct (valueContainsNull = true)
* | | |-- _1: integer (nullable = false)
* | | |-- _2: integer (nullable = false)
*/
df2.show(false)
df2.printSchema()
/**
* +----+----+----+
* |col1|col2|col3|
* +----+----+----+
* |u1 |2 |v1 |
* |u1 |6 |v11 |
* |u2 |3 |v3 |
* |u3 |4 |v3 |
* +----+----+----+
*
* root
* |-- col1: string (nullable = true)
* |-- col2: integer (nullable = true)
* |-- col3: string (nullable = true)
*/
val case1 = Map("u1" -> Seq("w1","w11"), "u2" -> Seq("w2","w22"))
val p1 = df1.withColumn("case1", typedLit(case1))
.withColumn("col3",
when(array_contains(expr("case1[col1]"), $"col2"), concat(lit("x-"), $"col3"))
.otherwise($"col3")
)
p1.show(false)
p1.printSchema()
/**
* +----+----+----+----------------------------------+
* |col1|col2|col3|case1 |
* +----+----+----+----------------------------------+
* |u1 |w1 |x-v1|[u1 -> [w1, w11], u2 -> [w2, w22]]|
* |u2 |w2 |x-v2|[u1 -> [w1, w11], u2 -> [w2, w22]]|
* |u3 |w3 |v3 |[u1 -> [w1, w11], u2 -> [w2, w22]]|
* +----+----+----+----------------------------------+
*
* root
* |-- col1: string (nullable = true)
* |-- col2: string (nullable = true)
* |-- col3: string (nullable = true)
* |-- case1: map (nullable = false)
* | |-- key: string
* | |-- value: array (valueContainsNull = true)
* | | |-- element: string (containsNull = true)
*/
df1.show(false)
df1.printSchema()
/**
* +----+----+----+
* |col1|col2|col3|
* +----+----+----+
* |u1 |w1 |v1 |
* |u2 |w2 |v2 |
* |u3 |w3 |v3 |
* +----+----+----+
*
* root
* |-- col1: string (nullable = true)
* |-- col2: string (nullable = true)
* |-- col3: string (nullable = true)
*/
Solution bonne et simple .. nouvelle fonction typedLit apprise .. :)
+----+----+----+ |col1|col2|col3| +----+----+----+ |u1 |2 |x-v1| |u1 |6 |v11 | |u2 |3 |x-v3| |u3 |4 |v3 | +----+----+----+
pouvez-vous également ajouter un échantillon d'entrée et une sortie attendue? et quelle version d'étincelle ??
@Srinivas exemples ajoutés, merci
version étincelle ??
@Srinivas Spark 2.4.2
est-ce que ça va, si je convertis cette
Map ("u1" -> (1,5), u2 -> (2, 4))enMap ("u1" -> Seq ( 1,5), u2 -> Seq (2, 4))?sûr @Srinivas, notez simplement que dans ce cas, ce sont des plages. donc la tête est la limite inférieure et la dernière est la limite supérieure