1
votes

Dans Spark, parcourez chaque colonne et trouvez la longueur maximale

Je suis nouveau pour déclencher scala et j'ai la situation suivante comme ci-dessous J'ai une table "TEST_TABLE" sur le cluster (peut être une table de ruche) Je convertis cela en dataframe comme:

COLUMN_NAME|MAX_LENGTH
       COL1|3
       COL2|8
       COL3|6

Maintenant, le DF peut être vu comme

scala> testDF.show()

COL1|COL2|COL3  
----------------
abc|abcd|abcdef 
a|BCBDFG|qddfde 
MN|1234B678|sd

Je veux une sortie comme ci-dessous

scala> val testDF = spark.sql("select * from TEST_TABLE limit 10")


0 commentaires

3 Réponses :


7
votes

Clair et simple:

import org.apache.spark.sql.functions._

val df = spark.table("TEST_TABLE")
df.select(df.columns.map(c => max(length(col(c)))): _*)


1 commentaires

pouvez-vous également le convertir en PySpark? Merci



4
votes

Vous pouvez essayer de la manière suivante:

import org.apache.spark.sql.functions.{length, max}
import spark.implicits._

val df = Seq(("abc","abcd","abcdef"),
          ("a","BCBDFG","qddfde"),
          ("MN","1234B678","sd"),
          (null,"","sd")).toDF("COL1","COL2","COL3")
df.cache()
val output = df.columns.map(c => (c, df.agg(max(length(df(s"$c")))).as[Int].first())).toSeq.toDF("COLUMN_NAME", "MAX_LENGTH")
        +-----------+----------+
        |COLUMN_NAME|MAX_LENGTH|
        +-----------+----------+
        |       COL1|         3|
        |       COL2|         8|
        |       COL3|         6|
        +-----------+----------+

Je pense que c'est une bonne idée de mettre en cache le dataframe d'entrée df pour accélérer le calcul.


4 commentaires

Excellente solution, cependant, mes données réelles ont des valeurs nulles et des blancs, donc ".as [Int]" génère une erreur, si je supprime le ".as [Int]", alors il demande une erreur "Encoder": java.lang.UnsupportedOperationException: Aucun encodeur trouvé pour org.apache.spark.sql.Row - champ (classe: "org.apache.spark.sql.Row", nom: "_2") - classe racine: "scala.Tuple2"


Merci, j'essaie de régénérer le problème auquel vous êtes confronté. J'ai modifié ma réponse et j'ai inséré une ligne avec null et vide, mais cela ne montre aucune erreur. Pouvez-vous s'il vous plaît donner des exemples de lignes afin que je puisse régénérer le problème? .


Salut, j'ai simplement changé le .as [Int] en .as [String] qui fonctionnait pour le moment


Pouvez-vous accepter la réponse si cela vous aide? :)



3
votes

Voici une autre façon d'obtenir le rapport des noms de colonnes dans la verticale

scala> val result = df3.flatMap( r => { (0 until r.length).map( i => (columns(i),r.getInt(i)) ) } ).as[(String,Int)].collect.toMap
result: scala.collection.immutable.Map[String,Int] = Map(COL1 -> 3, COL2 -> 8, COL3 -> 6)

scala> result
res47: scala.collection.immutable.Map[String,Int] = Map(COL1 -> 3, COL2 -> 8, COL3 -> 6)

scala>

Pour obtenir les résultats dans les collections Scala, dites Map ()

scala> val df = Seq(("abc","abcd","abcdef"),("a","BCBDFG","qddfde"),("MN","1234B678","sd")).toDF("COL1","COL2","COL3")
df: org.apache.spark.sql.DataFrame = [COL1: string, COL2: string ... 1 more field]

scala> df.show(false)
+----+--------+------+
|COL1|COL2    |COL3  |
+----+--------+------+
|abc |abcd    |abcdef|
|a   |BCBDFG  |qddfde|
|MN  |1234B678|sd    |
+----+--------+------+

scala> val columns = df.columns
columns: Array[String] = Array(COL1, COL2, COL3)

scala> val df2 = columns.foldLeft(df) { (acc,x) => acc.withColumn(x,length(col(x))) }
df2: org.apache.spark.sql.DataFrame = [COL1: int, COL2: int ... 1 more field]

scala> df2.select( columns.map(x => max(col(x))):_* ).show(false)
+---------+---------+---------+
|max(COL1)|max(COL2)|max(COL3)|
+---------+---------+---------+
|3        |8        |6        |
+---------+---------+---------+


scala> df3.flatMap( r => { (0 until r.length).map( i => (columns(i),r.getInt(i)) ) } ).show(false)
+----+---+
|_1  |_2 |
+----+---+
|COL1|3  |
|COL2|8  |
|COL3|6  |
+----+---+


scala>


0 commentaires