Il existe de nombreuses questions similaires à celle-ci qui posent une question différente en ce qui concerne éviter les colonnes en double dans une jointure ; ce n'est pas ce que je demande ici.
Étant donné que j'ai déjà un DataFrame avec des colonnes ambiguës, comment supprimer une colonne spécifique ?
Par exemple, étant donné:
$ df.dtypes [('id', 'int'), ('shared', 'double'), ('shared', 'string')] $ df.schema StructType(List(StructField(id,IntegerType,true),StructField(shared,DoubleType,true),StructField(shared,StringType,true)))
Je souhaite ne conserver que les colonnes numériques.
Cependant, en essayant de faire quelque chose comme df. select ("id", "shared"). show ()
aboutit à:
raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: "Reference 'shared' is ambiguous, could be: shared, shared.;"
De nombreuses solutions liées à ce problème consistent simplement à éviter d'entrer dans ce problème situation », par exemple. en utilisant ['joinkey']
au lieu de a.joinkey = b.joinkey
sur la jointure. Je répète que ce n’est pas le cas ici; il s'agit d'un dataframe qui a déjà été converti dans ce formulaire.
Les métadonnées du DF clarifient ces colonnes:
df = spark.createDataFrame( spark.sparkContext.parallelize([ [1, 0.0, "ext-0.0"], [1, 1.0, "ext-1.0"], [2, 1.0, "ext-2.0"], [3, 2.0, "ext-3.0"], [4, 3.0, "ext-4.0"], ]), StructType([ StructField("id", IntegerType(), True), StructField("shared", DoubleType(), True), StructField("shared", StringType(), True), ]) )
Donc les données est conservé en interne ... Je ne vois tout simplement pas comment l'utiliser.
Comment choisir une colonne plutôt qu'une autre?
Je m'attendais à être capable d'utiliser, par exemple. col ('shared # 11')
ou similaire ... mais il n'y a rien de tel que je peux voir?
Est-ce simplement impossible dans Spark?
Pour répondre à cette question, je demanderais, s'il vous plaît poster soit a) un extrait de code fonctionnel qui résout le problème ci-dessus, ou b) un lien vers quelque chose d'officiel des développeurs Spark que cela n'est tout simplement pas pris en charge?
3 Réponses :
Il semble que cela soit possible en remplaçant le schéma en utilisant .rdd.toDf ()
sur le dataframe.
Cependant, j'accepterai toujours toute réponse moins compliquée et ennuyeuse que le un ci-dessous:
+-----------+---------------+---------------+ |id---chjruu|shared---aqboua|shared---ehjxor| +-----------+---------------+---------------+ | 1| 0.0| ext-0.0| | 1| 1.0| ext-1.0| | 2| 1.0| ext-2.0| | 3| 2.0| ext-3.0| | 4| 3.0| ext-4.0| +-----------+---------------+---------------+ +-----------+---------------+ |id---chjruu|shared---aqboua| +-----------+---------------+ | 1| 0.0| | 1| 1.0| | 2| 1.0| | 3| 2.0| | 4| 3.0| +-----------+---------------+ +---+------+ | id|shared| +---+------+ | 1| 0.0| | 1| 1.0| | 2| 1.0| | 3| 2.0| | 4| 3.0| +---+------+
Donne:
import random import string from pyspark.sql.types import DoubleType, LongType def makeId(): return ''.join(random.choice(string.ascii_lowercase) for _ in range(6)) def makeUnique(column): return "%s---%s" % (column.name, makeId()) def makeNormal(column): return column.name.split("---")[0] unique_schema = list(map(makeUnique, df.schema)) df_unique = df.rdd.toDF(schema=unique_schema) df_unique.show() numeric_cols = filter(lambda c: c.dataType.__class__ in [LongType, DoubleType], df_unique.schema) numeric_col_names = list(map(lambda c: c.name, numeric_cols)) df_filtered = df_unique.select(*numeric_col_names) df_filtered.show() normal_schema = list(map(makeNormal, df_filtered.schema)) df_fixed = df_filtered.rdd.toDF(schema=normal_schema) df_fixed.show()
Solution de contournement: renommez simplement les colonnes (dans l'ordre), puis faites ce que vous voulez!
renamed_df = df.toDF("id", "shared_double", "shared_string")
la solution la plus simple à ce problème est de renommer en utilisant df.toDF (...
, mais si vous ne vouliez pas changer la colonne nom puis regroupez les colonnes dupliquées par leur type comme struct
comme ci-dessous -
Veuillez noter que la solution ci-dessous est écrite en scala, mais un code logiquement similaire peut être implémenté en python. Cette solution fonctionnera également pour toutes les colonnes en double dans le dataframe-
// Use the dataframe without losing any columns structDF.selectExpr("id", "shared.double as shared").show(false) /** * +---+------+ * |id |shared| * +---+------+ * |1 |2.0 | * +---+------+ */
// 3. create struct of all cols val structCols = df.schema.map(f => f.name -> f ).groupBy(_._1) .map{case(name, seq) => if (seq.length > 1) struct( seq.map { case (_, StructField(fName, dt, _, _)) => expr(s"`$fName:${dt.simpleString}` as ${dt.simpleString}") }: _* ).as(name) else col(name) }.toSeq val structDF = renamedDF.select(structCols: _*) structDF.show(false) structDF.printSchema() /** * +-------------+---+ * |shared |id | * +-------------+---+ * |[2.0, shared]|1 | * +-------------+---+ * * root * |-- shared: struct (nullable = false) * | |-- double: double (nullable = false) * | |-- string: string (nullable = true) * |-- id: integer (nullable = false) */
shared => shared: string, shared: int
, sans toucher aux autres noms de colonnes val renamedDF = df // 2 rename duplicate cols like shared => shared:string, shared:int .toDF(df.schema .map{case StructField(name, dt, _, _) => if(dupCols.contains(name)) s"$name:${dt.simpleString}" else name}: _*)
// 1. get all the duplicated column names val findDupCols = (cols: Array[String]) => cols.map((_ , 1)).groupBy(_._1).filter(_._2.length > 1).keys.toSeq val dupCols = findDupCols(df.columns) println(dupCols.mkString(", ")) // shared
.
val df = Seq((1, 2.0, "shared")).toDF("id", "shared", "shared") df.show(false) df.printSchema() /** * +---+------+------+ * |id |shared|shared| * +---+------+------+ * |1 |2.0 |shared| * +---+------+------+ * * root * |-- id: integer (nullable = false) * |-- shared: double (nullable = false) * |-- shared: string (nullable = true) */
J'espère que cela sera utile à quelqu'un!