Il existe de nombreuses questions similaires à celle-ci qui posent une question différente en ce qui concerne éviter les colonnes en double dans une jointure ; ce n'est pas ce que je demande ici.
Étant donné que j'ai déjà un DataFrame avec des colonnes ambiguës, comment supprimer une colonne spécifique ?
Par exemple, étant donné:
$ df.dtypes
[('id', 'int'), ('shared', 'double'), ('shared', 'string')]
$ df.schema
StructType(List(StructField(id,IntegerType,true),StructField(shared,DoubleType,true),StructField(shared,StringType,true)))
Je souhaite ne conserver que les colonnes numériques.
Cependant, en essayant de faire quelque chose comme df. select ("id", "shared"). show () aboutit à:
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: "Reference 'shared' is ambiguous, could be: shared, shared.;"
De nombreuses solutions liées à ce problème consistent simplement à éviter d'entrer dans ce problème situation », par exemple. en utilisant ['joinkey'] au lieu de a.joinkey = b.joinkey sur la jointure. Je répète que ce n’est pas le cas ici; il s'agit d'un dataframe qui a déjà été converti dans ce formulaire.
Les métadonnées du DF clarifient ces colonnes:
df = spark.createDataFrame(
spark.sparkContext.parallelize([
[1, 0.0, "ext-0.0"],
[1, 1.0, "ext-1.0"],
[2, 1.0, "ext-2.0"],
[3, 2.0, "ext-3.0"],
[4, 3.0, "ext-4.0"],
]),
StructType([
StructField("id", IntegerType(), True),
StructField("shared", DoubleType(), True),
StructField("shared", StringType(), True),
])
)
Donc les données est conservé en interne ... Je ne vois tout simplement pas comment l'utiliser.
Comment choisir une colonne plutôt qu'une autre?
Je m'attendais à être capable d'utiliser, par exemple. col ('shared # 11') ou similaire ... mais il n'y a rien de tel que je peux voir?
Est-ce simplement impossible dans Spark?
Pour répondre à cette question, je demanderais, s'il vous plaît poster soit a) un extrait de code fonctionnel qui résout le problème ci-dessus, ou b) un lien vers quelque chose d'officiel des développeurs Spark que cela n'est tout simplement pas pris en charge?
3 Réponses :
Il semble que cela soit possible en remplaçant le schéma en utilisant .rdd.toDf () sur le dataframe.
Cependant, j'accepterai toujours toute réponse moins compliquée et ennuyeuse que le un ci-dessous:
+-----------+---------------+---------------+ |id---chjruu|shared---aqboua|shared---ehjxor| +-----------+---------------+---------------+ | 1| 0.0| ext-0.0| | 1| 1.0| ext-1.0| | 2| 1.0| ext-2.0| | 3| 2.0| ext-3.0| | 4| 3.0| ext-4.0| +-----------+---------------+---------------+ +-----------+---------------+ |id---chjruu|shared---aqboua| +-----------+---------------+ | 1| 0.0| | 1| 1.0| | 2| 1.0| | 3| 2.0| | 4| 3.0| +-----------+---------------+ +---+------+ | id|shared| +---+------+ | 1| 0.0| | 1| 1.0| | 2| 1.0| | 3| 2.0| | 4| 3.0| +---+------+
Donne:
import random
import string
from pyspark.sql.types import DoubleType, LongType
def makeId():
return ''.join(random.choice(string.ascii_lowercase) for _ in range(6))
def makeUnique(column):
return "%s---%s" % (column.name, makeId())
def makeNormal(column):
return column.name.split("---")[0]
unique_schema = list(map(makeUnique, df.schema))
df_unique = df.rdd.toDF(schema=unique_schema)
df_unique.show()
numeric_cols = filter(lambda c: c.dataType.__class__ in [LongType, DoubleType], df_unique.schema)
numeric_col_names = list(map(lambda c: c.name, numeric_cols))
df_filtered = df_unique.select(*numeric_col_names)
df_filtered.show()
normal_schema = list(map(makeNormal, df_filtered.schema))
df_fixed = df_filtered.rdd.toDF(schema=normal_schema)
df_fixed.show()
Solution de contournement: renommez simplement les colonnes (dans l'ordre), puis faites ce que vous voulez!
renamed_df = df.toDF("id", "shared_double", "shared_string")
la solution la plus simple à ce problème est de renommer en utilisant df.toDF (... , mais si vous ne vouliez pas changer la colonne nom puis regroupez les colonnes dupliquées par leur type comme struct comme ci-dessous -
Veuillez noter que la solution ci-dessous est écrite en scala, mais un code logiquement similaire peut être implémenté en python. Cette solution fonctionnera également pour toutes les colonnes en double dans le dataframe-
// Use the dataframe without losing any columns
structDF.selectExpr("id", "shared.double as shared").show(false)
/**
* +---+------+
* |id |shared|
* +---+------+
* |1 |2.0 |
* +---+------+
*/
// 3. create struct of all cols
val structCols = df.schema.map(f => f.name -> f ).groupBy(_._1)
.map{case(name, seq) =>
if (seq.length > 1)
struct(
seq.map { case (_, StructField(fName, dt, _, _)) =>
expr(s"`$fName:${dt.simpleString}` as ${dt.simpleString}")
}: _*
).as(name)
else col(name)
}.toSeq
val structDF = renamedDF.select(structCols: _*)
structDF.show(false)
structDF.printSchema()
/**
* +-------------+---+
* |shared |id |
* +-------------+---+
* |[2.0, shared]|1 |
* +-------------+---+
*
* root
* |-- shared: struct (nullable = false)
* | |-- double: double (nullable = false)
* | |-- string: string (nullable = true)
* |-- id: integer (nullable = false)
*/
shared => shared: string, shared: int , sans toucher aux autres noms de colonnes val renamedDF = df
// 2 rename duplicate cols like shared => shared:string, shared:int
.toDF(df.schema
.map{case StructField(name, dt, _, _) =>
if(dupCols.contains(name)) s"$name:${dt.simpleString}" else name}: _*)
// 1. get all the duplicated column names
val findDupCols = (cols: Array[String]) => cols.map((_ , 1)).groupBy(_._1).filter(_._2.length > 1).keys.toSeq
val dupCols = findDupCols(df.columns)
println(dupCols.mkString(", "))
// shared
. val df = Seq((1, 2.0, "shared")).toDF("id", "shared", "shared")
df.show(false)
df.printSchema()
/**
* +---+------+------+
* |id |shared|shared|
* +---+------+------+
* |1 |2.0 |shared|
* +---+------+------+
*
* root
* |-- id: integer (nullable = false)
* |-- shared: double (nullable = false)
* |-- shared: string (nullable = true)
*/
J'espère que cela sera utile à quelqu'un!