Je voudrais transformer certaines colonnes de mon dataframe en fonction de la configuration représentée par les cartes Scala.
J'ai 2 cas:
Map [String, Seq [String]]
et des colonnes col1, col2, pour transformer col3 s'il y a une entité dans une carte avec key = col1, et col2 est dans cette entité liste de valeurs. Map [String, (Long, Long)
et col1, col2, pour transformer col3 si
il y a une entité dans une carte avec key = col1 et col2 est dans une plage décrite par le tuple de Longs comme (début, fin). exemples :
cas 1 ayant cette table, et une carte Map (u1-> Seq (w1, w11), u2 -> Seq (w2, w22))
+------+------+------+ | col1 | col2 | col3 | +------+------+------+ | u1 | 2 | x-v1 | +------+------+------+ | u1 | 6 | v11 | +------+------+------+ | u2 | 3 | x-v3 | +------+------+------+ | u3 | 4 | v3 | +------+------+------+
Je voudrais ajouter " x- "préfixe à col3, uniquement s'il correspond au terme
+------+------+------+ | col1 | col2 | col3 | +------+------+------+ | u1 | 2 | v1 | +------+------+------+ | u1 | 6 | v11 | +------+------+------+ | u2 | 3 | v3 | +------+------+------+ | u3 | 4 | v3 | +------+------+------+
cas 2: Cette table et carte Map ("u1" -> (1,5), u2 -> (2, 4))
+------+------+------+ | col1 | col2 | col3 | +------+------+------+ | u1 | w1 | x-v1 | +------+------+------+ | u2 | w2 | x-v2 | +------+------+------+ | u3 | w3 | v3 | +------+------+------+
La sortie attendue doit être:
+------+------+------+ | col1 | col2 | col3 | +------+------+------+ | u1 | w1 | v1 | +------+------+------+ | u2 | w2 | v2 | +------+------+------+ | u3 | w3 | v3 | +------+------+------+
Cela peut facilement être fait par les UDF, mais pour les performances concernées, je ne voudrais pas les utiliser.
Y a-t-il un moyen d'y parvenir sans cela dans Spark 2.4.2?
Merci
3 Réponses :
Vérifiez le code ci-dessous.
Remarque -
Map ("u1" -> Seq (1,5), u2 -> Seq (2, 4))
carte json
, ajout de la carte json en tant que valeurs de colonne à DataFrame, puis application de la logique sur DataFrame Importez les bibliothèques requises.
scala> caseTwoDF .withColumn("data",caseTwoExpr) .withColumn("col3",when(expr("array_contains(sequence(data[col1][0],data[col1][1]),col2)"), concat(lit("x-"),$"col3")).otherwise($"col3")) .drop("data") .show(false) +----+----+----+ |col1|col2|col3| +----+----+----+ |u1 |2 |x-v1| |u1 |6 |v11 | |u2 |3 |x-v3| |u3 |4 |v3 | +----+----+----+
Cas-1 logique
scala> val caseTwoExpr = from_json(caseTwoJsonMap,caseTwoSchema) caseTwoExpr: org.apache.spark.sql.Column = entries
XXX
scala> val caseTwoSchema = MapType(StringType,ArrayType(IntegerType)) caseTwoSchema: org.apache.spark.sql.types.MapType = MapType(StringType,ArrayType(IntegerType,true),true)
scala> val caseTwoJsonMap = lit(compact(render(caseTwoMap))) caseTwoJsonMap: org.apache.spark.sql.Column = {"u1":[1,5],"u2":[2,4]}
scala> val caseTwoMap = Map("u1" -> Seq(1,5),"u2" -> Seq(2,4)) caseTwoMap: scala.collection.immutable.Map[String,Seq[Int]] = Map(u1 -> List(1, 5), u2 -> List(2, 4))
Sortie finale du cas 1
scala> val caseTwoDF = Seq(("u1",2,"v1"),("u1",6,"v11"),("u2",3,"v3"),("u3",4,"v3")).toDF("col1","col2","col3") caseTwoDF: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 1 more field]
Logique du cas 2
scala> dfa .withColumn("data",caseOneExpr) .withColumn("col3",when(expr("array_contains(data[col1],col2)"),concat(lit("x-"),$"col3")).otherwise($"col3")) .drop("data") .show(false) +----+----+----+ |col1|col2|col3| +----+----+----+ |u1 |w1 |x-v1| |u2 |w2 |x-v2| |u3 |w3 |v3 | +----+----+----+
scala> val caseOneExpr = from_json(caseOneJsonMap,caseOneSchema) caseOneExpr: org.apache.spark.sql.Column = entries
scala> val caseOneSchema = MapType(StringType,ArrayType(StringType)) caseOneSchema: org.apache.spark.sql.types.MapType = MapType(StringType,ArrayType(StringType,true),true)
scala> val caseOneJsonMap = lit(compact(render(caseOneMap))) caseOneJsonMap: org.apache.spark.sql.Column = {"u1":["w1","w11"],"u2":["w2","w22"]}
scala> val caseOneMap = Map("u1" -> Seq("w1","w11"),"u2" -> Seq("w2","w22")) caseOneMap: scala.collection.immutable.Map[String,Seq[String]] = Map(u1 -> List(w1, w11), u2 -> List(w2, w22))
Sortie finale du cas 2
scala> val caseOneDF = Seq(("u1","w1","v1"),("u2","w2","v2"),("u3","w3","v3")).toDF("col1","col2","col3") caseOneDF: org.apache.spark.sql.DataFrame = [col1: string, col2: string ... 1 more field]
Cela fonctionne pour moi!!! J'essaie maintenant de me convaincre que cela améliorera en fait mes performances au travail. Merci pour la suggestion incroyable !!!
Une autre alternative -
import org.apache.spark.sql.functions.typedLit
val case2 = Map("u1" -> (1,5), "u2" -> (2, 4)) val p = df2.withColumn("case2", typedLit(case2)) .withColumn("col3", when(expr("col2 between case2[col1]._1 and case2[col1]._2"), concat(lit("x-"), $"col3")) .otherwise($"col3") ) p.show(false) p.printSchema() /** * +----+----+----+----------------------------+ * |col1|col2|col3|case2 | * +----+----+----+----------------------------+ * |u1 |2 |x-v1|[u1 -> [1, 5], u2 -> [2, 4]]| * |u1 |6 |v11 |[u1 -> [1, 5], u2 -> [2, 4]]| * |u2 |3 |x-v3|[u1 -> [1, 5], u2 -> [2, 4]]| * |u3 |4 |v3 |[u1 -> [1, 5], u2 -> [2, 4]]| * +----+----+----+----------------------------+ * * root * |-- col1: string (nullable = true) * |-- col2: integer (nullable = true) * |-- col3: string (nullable = true) * |-- case2: map (nullable = false) * | |-- key: string * | |-- value: struct (valueContainsNull = true) * | | |-- _1: integer (nullable = false) * | | |-- _2: integer (nullable = false) */
df2.show(false) df2.printSchema() /** * +----+----+----+ * |col1|col2|col3| * +----+----+----+ * |u1 |2 |v1 | * |u1 |6 |v11 | * |u2 |3 |v3 | * |u3 |4 |v3 | * +----+----+----+ * * root * |-- col1: string (nullable = true) * |-- col2: integer (nullable = true) * |-- col3: string (nullable = true) */
val case1 = Map("u1" -> Seq("w1","w11"), "u2" -> Seq("w2","w22")) val p1 = df1.withColumn("case1", typedLit(case1)) .withColumn("col3", when(array_contains(expr("case1[col1]"), $"col2"), concat(lit("x-"), $"col3")) .otherwise($"col3") ) p1.show(false) p1.printSchema() /** * +----+----+----+----------------------------------+ * |col1|col2|col3|case1 | * +----+----+----+----------------------------------+ * |u1 |w1 |x-v1|[u1 -> [w1, w11], u2 -> [w2, w22]]| * |u2 |w2 |x-v2|[u1 -> [w1, w11], u2 -> [w2, w22]]| * |u3 |w3 |v3 |[u1 -> [w1, w11], u2 -> [w2, w22]]| * +----+----+----+----------------------------------+ * * root * |-- col1: string (nullable = true) * |-- col2: string (nullable = true) * |-- col3: string (nullable = true) * |-- case1: map (nullable = false) * | |-- key: string * | |-- value: array (valueContainsNull = true) * | | |-- element: string (containsNull = true) */
df1.show(false) df1.printSchema() /** * +----+----+----+ * |col1|col2|col3| * +----+----+----+ * |u1 |w1 |v1 | * |u2 |w2 |v2 | * |u3 |w3 |v3 | * +----+----+----+ * * root * |-- col1: string (nullable = true) * |-- col2: string (nullable = true) * |-- col3: string (nullable = true) */
Solution bonne et simple .. nouvelle fonction typedLit apprise .. :)
+----+----+----+ |col1|col2|col3| +----+----+----+ |u1 |2 |x-v1| |u1 |6 |v11 | |u2 |3 |x-v3| |u3 |4 |v3 | +----+----+----+
pouvez-vous également ajouter un échantillon d'entrée et une sortie attendue? et quelle version d'étincelle ??
@Srinivas exemples ajoutés, merci
version étincelle ??
@Srinivas Spark 2.4.2
est-ce que ça va, si je convertis cette
Map ("u1" -> (1,5), u2 -> (2, 4))
enMap ("u1" -> Seq ( 1,5), u2 -> Seq (2, 4))
?sûr @Srinivas, notez simplement que dans ce cas, ce sont des plages. donc la tête est la limite inférieure et la dernière est la limite supérieure