J'ai le cadre de données d'entrée au format désagréable suivant:
(input_df .withColumn('splits', F.split(F.col('data'), ';')) .drop('data') ).show() # +-------------------+ # | splits| # +-------------------+ # |[Alice, Bob, Carol]| # | [12, 13, 14]| # | [5, , 7]| # | [1, , 3]| # | [, , 3]| # +-------------------+
L'entrée réelle est un fichier CSV séparé par des points-virgules, avec une colonne contenant les valeurs pour une personne. Chaque personne peut avoir un nombre différent de valeurs. Ici, Alice a 3 valeurs, Bob n'en a qu'une et Carol a quatre valeurs.
Je voudrais le transformer dans PySpark en une trame de données de sortie contenant un tableau par personne, dans cet exemple, la sortie serait:
result = spark.createDataFrame( [ ("Alice", [12, 5, 1]), ("Bob", [13,]), ("Carol", [14, 7, 3, 3]) ], ['name', 'values'] ) result.show() # +-----+-------------+ # | name| values| # +-----+-------------+ # |Alice| [12, 5, 1]| # | Bob| [13]| # |Carol|[14, 7, 3, 3]| # +-----+-------------+
Comment pourrais-je faire cela? Je pense que ce sera une combinaison de F.arrays_zip ()
, F.split ()
et / ou F.explode ()
, mais je n'arrive pas à comprendre.
Je suis actuellement bloqué ici, c'est ma tentative pour le moment:
from pyspark.sql import SparkSession import pyspark.sql.functions as F spark = SparkSession.builder.master("local").getOrCreate() input_df = spark.createDataFrame( [ ('Alice;Bob;Carol',), ('12;13;14',), ('5;;7',), ('1;;3',), (';;3',) ], ['data'] ) input_df.show() # +---------------+ # | data| # +---------------+ # |Alice;Bob;Carol| # | 12;13;14| # | 5;;7| # | 1;;3| # | ;;3| # +---------------+
3 Réponses :
Solution pour Spark-2.4 +:
Utilisez groupBy
pour rassembler toutes les lignes en une seule ligne en utilisant collect_list
, puis fractionner pour créer une nouvelle colonne.
arrays_zip
pour compresser les tableaux et créer un tableau imbriqué [key,[values
explode
le tableau imbriqué. Example:
df.show() #+---------------+ #| data| #+---------------+ #|Alice;Bob;Carol| #| 12;13;14| #| 5;;7| #| 1;;3| #| ;;3| #+---------------+ from pyspark.sql.functions import * df.agg(split(concat_ws("|",collect_list(col("data"))),"\\|").alias("tmp")).\ withColumn("col1",split(element_at(col("tmp"),1),";")).\ withColumn("col2",split(element_at(col("tmp"),2),";")).\ withColumn("col3",split(element_at(col("tmp"),3),";")).\ withColumn("col4",split(element_at(col("tmp"),4),";")).\ withColumn("zip",arrays_zip(col("col1"),arrays_zip(col("col2"),col("col3"),col("col4")))).\ selectExpr("explode(zip)as tmp").\ selectExpr("tmp.*").\ toDF("name","values").\ show(10,False) #+-----+----------+ #|name |values | #+-----+----------+ #|Alice|[12, 5, 1]| #|Bob |[13, , ] | #|Carol|[14, 7, 3]| #+-----+----------+
Pour spark
utilisez udf pour arrays_zip et utilisez getItem (
au lieu de la fonction element_at
.
Merci beaucoup! Dans mon ensemble de données d'origine, j'ai des centaines de colonnes, c'est-à-dire des centaines de noms comme Alice, Bob et Carol. Existe-t-il un moyen de boucler toutes les lignes qui créent "col1", "col2" etc. dans votre réponse?
@AlexanderEngelhardt, Pour créer dynamiquement des colonnes element_at , vous pouvez utiliser la méthode similaire mentionnée ici: stackoverflow.com/questions/61757408/… stackoverflow.com/questions/48134478/...
Je suggérerais de lire les données comme ;
csv séparé, puis de traiter pour obtenir les colonnes name
et values
comme ci-dessous -
Veuillez noter que ce code est écrit en scala mais un code similaire peut être implémenté dans pyspark avec un minimum de modifications
;
csv séparé val columns = df.columns.map(c => expr(s"named_struct('name', '$c', 'values', collect_list($c))")) df.select(array(columns: _*).as("array")) .selectExpr("inline_outer(array)") .show(false) /** * +-----+-------------+ * |name |values | * +-----+-------------+ * |Alice|[12, 5, 1] | * |Bob |[13] | * |Carol|[14, 7, 3, 3]| * +-----+-------------+ */
nom
et code> colonne
val data = """ |Alice;Bob;Carol | 12;13;14 | 5;;7 | 1;;3 | ;;3 """.stripMargin val stringDS = data.split(System.lineSeparator()) .map(_.split("\\;").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(";")) .toSeq.toDS() val df = spark.read .option("sep", ";") .option("inferSchema", "true") .option("header", "true") .option("nullValue", "null") .csv(stringDS) df.printSchema() df.show(false) /** * root * |-- Alice: integer (nullable = true) * |-- Bob: integer (nullable = true) * |-- Carol: integer (nullable = true) * * +-----+----+-----+ * |Alice|Bob |Carol| * +-----+----+-----+ * |12 |13 |14 | * |5 |null|7 | * |1 |null|3 | * |null |null|3 | * +-----+----+-----+ */
Une approche peut être de lire la première ligne comme en-tête puis de décomposer les données
cols = ','.join([f"'{i[0]}',{i[1]}" for i in zip(df1.columns,df1.columns)]) df1.select(f.expr(f'''stack(3,{cols}) as (Name,Value)''')).groupBy('Name').agg(f.collect_list('value').alias('Value')).orderBy('Name').show() +-----+-------------+ | Name| Value| +-----+-------------+ |Alice| [12, 5, 1]| | Bob| [13]| |Carol|[14, 7, 3, 3]| +-----+-------------+
Pour passer dynamiquement les colonnes, utilisez le code ci-dessous
df1 = spark.createDataFrame([(12,13,14),(5,None,7),(1,None,3),(None,None,3)], ['Alice','Bob','Carol']) df1.show() +-----+----+-----+ |Alice| Bob|Carol| +-----+----+-----+ | 12| 13| 14| | 5|null| 7| | 1|null| 3| | null|null| 3| +-----+----+-----+ df1.select(f.expr('''stack(3,'Alice',Alice,'Bob',Bob,'Carol',Carol) as (Name,Value)'''))\ .groupBy('Name').agg(f.collect_list('value').alias('Value')).orderBy('Name').show() +-----+-------------+ | Name| Value| +-----+-------------+ |Alice| [12, 5, 1]| | Bob| [13]| |Carol|[14, 7, 3, 3]| +-----+-------------+