J'essaye de changer le type de données d'une colonne présente dans un dataframe I que je lis à partir d'une base de données RDBMS. Pour ce faire, j'ai obtenu le schéma du dataframe de la manière ci-dessous:
StructType(StructField(je_header_id,LongType,true), StructField(je_line_num,LongType,true), StructField(last_update_date,TimestampType,true), StructField(last_updated_by,DoubleType,true), StructField(creation_date,TimestampType,true), StructField(created_by,DoubleType,true), StructField(created_by_name,StringType,true), StructField(entered_dr,DoubleType,true), StructField(entered_cr,DoubleType,true))
Pour voir le schéma du dataframe, j'ai utilisé l'instruction ci-dessous:
println(dataSchema.schema) Output: StructType(StructField(je_header_id,LongType,true), StructField(je_line_num,LongType,true), StructField(last_update_date,TimestampType,true), StructField(last_updated_by,DecimalType(15,0),true), StructField(creation_date,TimestampType,true), StructField(created_by,DecimalType(15,0),true), StructField(created_by_name,StringType,true), StructField(entered_dr,DecimalType(38,30),true), StructField(entered_cr,DecimalType(38,30),true))
3 Réponses :
Pour modifier un schéma DataFrame spécifique à un type de données donné, vous pouvez faire correspondre un modèle avec dataType
de StructField , comme indiqué ci-dessous:
val pattern = """DecimalType\(\d+,(\d+)\)""".r val df2 = df.dtypes. collect{ case (dn, dt) if pattern.findFirstMatchIn(dt).map(_.group(1)).getOrElse("0") != "0" => dn }. foldLeft(df)((accDF, c) => accDF.withColumn(c, col(c).cast("Double")))
Cependant, en supposant que votre objectif final est de transformer l'ensemble de données avec le changement de type de colonne, il serait plus facile de simplement parcourir les colonnes du type de données ciblé pour les cast
de manière itérative, comme ci-dessous: p >
import org.apache.spark.sql.functions._ val df2 = df.dtypes. collect{ case (dn, dt) if dt.startsWith("DecimalType") => dn }. foldLeft(df)((accDF, c) => accDF.withColumn(c, col(c).cast("Double"))) df2.printSchema // root // |-- c1: long (nullable = false) // |-- c2: double (nullable = true) // |-- c3: string (nullable = true) // |-- c4: double (nullable = true)
[UPDATE]
Par exigence supplémentaire des commentaires, si vous souhaitez modifier le schéma uniquement pour DecimalType
avec une échelle positive, il suffit d'appliquer une correspondance de motif Regex comme condition de guard
dans la méthode collect
:
import org.apache.spark.sql.types._ val df = Seq( (1L, BigDecimal(12.34), "a", BigDecimal(10.001)), (2L, BigDecimal(56.78), "b", BigDecimal(20.002)) ).toDF("c1", "c2", "c3", "c4") val newSchema = df.schema.fields.map{ case StructField(name, _: DecimalType, nullable, _) => StructField(name, DoubleType, nullable) case field => field } // newSchema: Array[org.apache.spark.sql.types.StructField] = Array( // StructField(c1,LongType,false), StructField(c2,DoubleType,true), // StructField(c3,StringType,true), StructField(c4,DoubleType,true) // )
comment le changer si la colonne est à l'intérieur d'une struct
@ stack0114106, évidemment, la solution existante consiste à transformer uniquement les colonnes de niveau supérieur pour le type de données ciblé, et cela devrait être suffisant pour une table SGBDR typique (qui est la source de données d'OP). Pour gérer une structure de colonne imbriquée arbitrairement, j'envisagerais d'utiliser un parcours récursif similaire à cette réponse SO .
Super! .. merci beaucoup pour la solution récursive.
@Metadata, puisque foldLeft
garde l'ordre des éléments traversés et withColumn (c, col (c) ...)
réutilise le même nom pour la nouvelle colonne, l'ordre des colonnes devrait rester le même. Dans tous les cas, si pour une raison quelconque l’ordre des colonnes est modifié, vous pouvez toujours l’appliquer avec un select
explicite (par exemple df2.select (df.columns.map (col): _ * )
).
@LeoC, un dernier doute. Dans le schéma donné, il y a quelques éléments comme: DecimalType (15,0) ce sont essentiellement des types de données INTEGER de la table RDBMS car il n'y a aucune précision pour eux. Comment ignorer ces types de décimales et ne convertir que celles qui ont une précision comme: DECIMAL (38,30)
@LeoC pourriez-vous s'il vous plaît essayer ma question 54445014
@Metadata, veuillez consulter la réponse développée.
Voici une autre façon:
val newDF = data.selectExpr(newSchema.map( col => s"CAST ( ${col.name} As ${col.dataType.sql}) ${col.name}" ): _*) newDF.printSchema root |-- col1: string (nullable = false) |-- col2: double (nullable = true) |-- col3: string (nullable = true) |-- col4: double (nullable = true) newDF.show(false) +----+-----------+----+------+ |col1|col2 |col3|col4 | +----+-----------+----+------+ |1 |0.0032 |a |23.32 | |2 |78787.99003|c |343.32| +----+-----------+----+------+
Créez un schéma que vous voulez:
Exemple:
val newSchema = StructType( Seq( StructField("col1", StringType, true), StructField("col2", DoubleType, true), StructField("col3", StringType, true), StructField("col4", DoubleType, true) ) )
Convertissez les colonnes dans le type de données requis.
data.show(false) data.printSchema +----+------------------------+----+----------------------+ |col1|col2 |col3|col4 | +----+------------------------+----+----------------------+ |1 |0.003200000000000000 |a |23.320000000000000000 | |2 |78787.990030000000000000|c |343.320000000000000000| +----+------------------------+----+----------------------+ root |-- col1: integer (nullable = false) |-- col2: decimal(38,18) (nullable = true) |-- col3: string (nullable = true) |-- col4: decimal(38,18) (nullable = true)
J'ai essayé votre suggestion et cela a abouti à: utilisé par: java.lang.IllegalArgumentException: exigence échouée: La précision décimale 39 dépasse la précision maximale 38 Pourriez-vous me dire ce qui peut être fait ici?
Il existe un problème connu - issues.apache.org/jira/browse/SPARK-20427 < / a>
La solution acceptée fonctionne très bien, mais elle est très coûteuse en raison du coût énorme de withColumn, et l'analyseur doit analyser le DF entier pour chaque withColumn, et avec un grand nombre de colonnes, c'est très coûteux. Je suggérerais plutôt de faire ceci -
val transformedColumns = inputDataDF.dtypes .collect { case (dn, dt) if (dt.startsWith("DecimalType")) => (dn, DoubleType) } val transformedDF = inputDataDF.select(transformedColumns.map { fieldType => inputDataDF(fieldType._1).cast(fieldType._2) }: _*)
Pour un très petit ensemble de données, cela a pris 1 minute et plus avec l'approche withColumn pour moi dans ma machine et 100 ms avec l'approche avec select.
vous pouvez en savoir plus sur le coût de withColumn ici - https: //medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015
Mappez sur le schéma et renvoyez le nom de la colonne ou le nom de la colonne avec cast, vous pouvez ensuite l'utiliser comme expression de sélection pour appliquer le nouveau schéma