1
votes

Comment supprimer une colonne ambiguë dans pyspark?

Il existe de nombreuses questions similaires à celle-ci qui posent une question différente en ce qui concerne éviter les colonnes en double dans une jointure ; ce n'est pas ce que je demande ici.

Étant donné que j'ai déjà un DataFrame avec des colonnes ambiguës, comment supprimer une colonne spécifique ?

Par exemple, étant donné:

$ df.dtypes
[('id', 'int'), ('shared', 'double'), ('shared', 'string')]

$ df.schema
StructType(List(StructField(id,IntegerType,true),StructField(shared,DoubleType,true),StructField(shared,StringType,true)))

Je souhaite ne conserver que les colonnes numériques.

Cependant, en essayant de faire quelque chose comme df. select ("id", "shared"). show () aboutit à:

raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: "Reference 'shared' is ambiguous, could be: shared, shared.;"

De nombreuses solutions liées à ce problème consistent simplement à éviter d'entrer dans ce problème situation », par exemple. en utilisant ['joinkey'] au lieu de a.joinkey = b.joinkey sur la jointure. Je répète que ce n’est pas le cas ici; il s'agit d'un dataframe qui a déjà été converti dans ce formulaire.

Les métadonnées du DF clarifient ces colonnes:

df = spark.createDataFrame(
    spark.sparkContext.parallelize([
        [1, 0.0, "ext-0.0"],
        [1, 1.0, "ext-1.0"],
        [2, 1.0, "ext-2.0"],
        [3, 2.0, "ext-3.0"],
        [4, 3.0, "ext-4.0"],
    ]),
    StructType([
        StructField("id", IntegerType(), True),
        StructField("shared", DoubleType(), True),
        StructField("shared", StringType(), True),
    ])
)

Donc les données est conservé en interne ... Je ne vois tout simplement pas comment l'utiliser.

Comment choisir une colonne plutôt qu'une autre?

Je m'attendais à être capable d'utiliser, par exemple. col ('shared # 11') ou similaire ... mais il n'y a rien de tel que je peux voir?

Est-ce simplement impossible dans Spark?

Pour répondre à cette question, je demanderais, s'il vous plaît poster soit a) un extrait de code fonctionnel qui résout le problème ci-dessus, ou b) un lien vers quelque chose d'officiel des développeurs Spark que cela n'est tout simplement pas pris en charge?


0 commentaires

3 Réponses :


0
votes

Il semble que cela soit possible en remplaçant le schéma en utilisant .rdd.toDf () sur le dataframe.

Cependant, j'accepterai toujours toute réponse moins compliquée et ennuyeuse que le un ci-dessous:

+-----------+---------------+---------------+
|id---chjruu|shared---aqboua|shared---ehjxor|
+-----------+---------------+---------------+
|          1|            0.0|        ext-0.0|
|          1|            1.0|        ext-1.0|
|          2|            1.0|        ext-2.0|
|          3|            2.0|        ext-3.0|
|          4|            3.0|        ext-4.0|
+-----------+---------------+---------------+

+-----------+---------------+
|id---chjruu|shared---aqboua|
+-----------+---------------+
|          1|            0.0|
|          1|            1.0|
|          2|            1.0|
|          3|            2.0|
|          4|            3.0|
+-----------+---------------+

+---+------+
| id|shared|
+---+------+
|  1|   0.0|
|  1|   1.0|
|  2|   1.0|
|  3|   2.0|
|  4|   3.0|
+---+------+

Donne:

import random
import string
from pyspark.sql.types import DoubleType, LongType

def makeId():
    return ''.join(random.choice(string.ascii_lowercase) for _ in range(6))

def makeUnique(column):
    return "%s---%s" % (column.name, makeId())

def makeNormal(column):
    return column.name.split("---")[0]

unique_schema = list(map(makeUnique, df.schema))
df_unique = df.rdd.toDF(schema=unique_schema)
df_unique.show()

numeric_cols = filter(lambda c: c.dataType.__class__ in [LongType, DoubleType], df_unique.schema)
numeric_col_names = list(map(lambda c: c.name, numeric_cols))
df_filtered = df_unique.select(*numeric_col_names)
df_filtered.show()

normal_schema = list(map(makeNormal, df_filtered.schema))
df_fixed = df_filtered.rdd.toDF(schema=normal_schema)
df_fixed.show()


0 commentaires

0
votes

Solution de contournement: renommez simplement les colonnes (dans l'ordre), puis faites ce que vous voulez!

renamed_df = df.toDF("id", "shared_double", "shared_string")


0 commentaires

1
votes

la solution la plus simple à ce problème est de renommer en utilisant df.toDF (... ...) , mais si vous ne vouliez pas changer la colonne nom puis regroupez les colonnes dupliquées par leur type comme struct comme ci-dessous -

Veuillez noter que la solution ci-dessous est écrite en scala, mais un code logiquement similaire peut être implémenté en python. Cette solution fonctionnera également pour toutes les colonnes en double dans le dataframe-

1. Chargez les données de test

    // Use the dataframe without losing any columns
    structDF.selectExpr("id", "shared.double as shared").show(false)
    /**
      * +---+------+
      * |id |shared|
      * +---+------+
      * |1  |2.0   |
      * +---+------+
      */

2. récupère tous les noms de colonnes dupliqués

    // 3. create struct of all cols
    val structCols = df.schema.map(f => f.name -> f  ).groupBy(_._1)
      .map{case(name, seq) =>
        if (seq.length > 1)
          struct(
            seq.map { case (_, StructField(fName, dt, _, _)) =>
              expr(s"`$fName:${dt.simpleString}` as ${dt.simpleString}")
            }: _*
          ).as(name)
        else col(name)
      }.toSeq
     val structDF = renamedDF.select(structCols: _*)

    structDF.show(false)
    structDF.printSchema()

    /**
      * +-------------+---+
      * |shared       |id |
      * +-------------+---+
      * |[2.0, shared]|1  |
      * +-------------+---+
      *
      * root
      * |-- shared: struct (nullable = false)
      * |    |-- double: double (nullable = false)
      * |    |-- string: string (nullable = true)
      * |-- id: integer (nullable = false)
      */

3. renommer les cols en double comme shared => shared: string, shared: int , sans toucher aux autres noms de colonnes

    val renamedDF = df
      // 2 rename duplicate cols like shared => shared:string, shared:int
      .toDF(df.schema
        .map{case StructField(name, dt, _, _) =>
          if(dupCols.contains(name)) s"$name:${dt.simpleString}" else name}: _*)

3. créer une structure de tous les cols

    // 1. get all the duplicated column names
    val findDupCols = (cols: Array[String]) => cols.map((_ , 1)).groupBy(_._1).filter(_._2.length > 1).keys.toSeq
    val dupCols = findDupCols(df.columns)
    println(dupCols.mkString(", "))
    // shared

4. obtenir la colonne par son type en utilisant .

    val df = Seq((1, 2.0, "shared")).toDF("id", "shared", "shared")
    df.show(false)
    df.printSchema()
    /**
      * +---+------+------+
      * |id |shared|shared|
      * +---+------+------+
      * |1  |2.0   |shared|
      * +---+------+------+
      *
      * root
      * |-- id: integer (nullable = false)
      * |-- shared: double (nullable = false)
      * |-- shared: string (nullable = true)
      */

J'espère que cela sera utile à quelqu'un!


0 commentaires