4
votes

Comment compresser deux colonnes de tableau dans Spark SQL

J'ai un dataframe Pandas. J'ai essayé de joindre d'abord deux colonnes contenant des valeurs de chaîne dans une liste, puis en utilisant zip, j'ai joint chaque élément de la liste avec '_'. Mon ensemble de données est comme ci-dessous:

from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split

crash.withColumn("column_1",
    split(col("column_1"), ",\s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
    split(col("column_2"), ",\s*").cast(ArrayType(StringType())).alias("column_2")
)

Je voulais joindre ces deux colonnes dans une troisième colonne comme ci-dessous pour chaque ligne de mon dataframe.

df['column_3'] = df['column_2']
for index, row in df.iterrows():
  while index < 3:
    if isinstance(row['column_1'], str):      
      row['column_1'] = list(row['column_1'].split(','))
      row['column_2'] = list(row['column_2'].split(','))
      row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]

J'ai réussi à le faire en python en utilisant le code ci-dessous mais le dataframe est assez volumineux et il faut beaucoup de temps pour l'exécuter pour l'ensemble du dataframe. Je veux faire la même chose dans PySpark pour plus d'efficacité. J'ai lu les données dans Spark Dataframe avec succès, mais j'ai du mal à déterminer comment répliquer les fonctions Pandas avec des fonctions équivalentes à PySpark. Comment puis-je obtenir le résultat souhaité dans PySpark?

df['column_3']: [abc_1.0, def_2.0, ghi_3.0]

J'ai converti les deux colonnes en tableaux dans PySpark en utilisant le code ci-dessous

df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'


5 commentaires

Pourquoi df ['column_1'] et df ['column_2'] sont-ils une seule chaîne au lieu d'une liste d'éléments? Quels étaient-ils à l'origine?


Voilà comment sont les données, que je lis dans le dataframe


@Falconic donc abc , def etc sur une seule ligne ou sur différentes lignes? de même colonne 2 une seule ligne?


@ anky_91 c'est une ligne de dataframe pour la colonne_1 et la colonne_2. Chaque ligne contient plusieurs éléments dans une colonne. C'est la raison pour laquelle j'ai divisé la chaîne et ensuite convertie en liste.


Est-ce que cela répond à votre question? Pyspark: divisez plusieurs colonnes de tableau en lignes


3 Réponses :


4
votes

Vous pouvez également UDF pour compresser les colonnes du tableau fractionné,

df = spark.createDataFrame([('abc,def,ghi','1.0,2.0,3.0')], ['col1','col2']) 
+-----------+-----------+
|col1       |col2       |
+-----------+-----------+
|abc,def,ghi|1.0,2.0,3.0|
+-----------+-----------+ ## Hope this is how your dataframe is

from pyspark.sql import functions as F
from pyspark.sql.types import *

def concat_udf(*args):
    return ['_'.join(x) for x in zip(*args)]

udf1 = F.udf(concat_udf,ArrayType(StringType()))
df = df.withColumn('col3',udf1(F.split(df.col1,','),F.split(df.col2,',')))
df.show(1,False)
+-----------+-----------+---------------------------+
|col1       |col2       |col3                       |
+-----------+-----------+---------------------------+
|abc,def,ghi|1.0,2.0,3.0|[abc_1.0, def_2.0, ghi_3.0]|
+-----------+-----------+---------------------------+


2 commentaires

Merci @suresh. C'est définitivement une solution plus propre. Lorsque je l'applique à mon propre dataframe et que je lance la fonction de collecte, j'obtiens l'erreur TypeError: zip argument # 1 doit prendre en charge l'itération Des idées pourquoi?


L'erreur est due au fait que zip () n'obtient pas d'itération en entrée. pouvez-vous s'il vous plaît poster votre exemple de dataframe d'entrée et son schéma.



11
votes

Un équivalent Spark SQL de Python serait pyspark.sql.functions.arrays_zip :

pyspark.sql.functions.arrays_zip(*cols)

Fonction de collection: renvoie un tableau fusionné de structures dans lequel la N-ième structure contient toutes les N-ièmes valeurs des tableaux d'entrée.

Donc si vous avez déjà deux tableaux:

+---------------------------+
|zipped_concat              |
+---------------------------+
|[abc_1.0, def_2.0, ghi_3.0]|
+---------------------------+

Vous pouvez simplement l'appliquer sur le résultat

df_zipped_concat = df_zipped.withColumn(
    "zipped_concat",
     expr("transform(zipped, x -> concat_ws('_', x.column_1, x.column_2))")
) 

df_zipped_concat.select("zipped_concat").show(truncate=False)
+------------------------------------+
|zipped                              |
+------------------------------------+
|[[abc, 1.0], [def, 2.0], [ghi, 3.0]]|
+------------------------------------+

Maintenant, pour combiner les résultats, vous pouvez transformer ( Comment utiliser transformer une fonction d'ordre supérieur? , TypeError: la colonne ne peut pas être itérée - Comment itérer sur ArrayType ()? ):

from pyspark.sql.functions import arrays_zip

df_zipped = df.withColumn(
  "zipped", arrays_zip("column_1", "column_2")
)

df_zipped.select("zipped").show(truncate=False)
from pyspark.sql.functions import split

df = (spark
    .createDataFrame([('abc, def, ghi', '1.0, 2.0, 3.0')])
    .toDF("column_1", "column_2")
    .withColumn("column_1", split("column_1", "\s*,\s*"))
    .withColumn("column_2", split("column_2", "\s*,\s*")))

Remarque :

Fonctions d'ordre supérieur transform code> et arrays_zip ont été introduits dans Apache Spark 2.4.


1 commentaires

Merci user10465355. Cette solution a fonctionné pour moi mais juste un mot d'avertissement. Il ne gère pas très bien les valeurs nulles dans les listes. J'ai supprimé manuellement les valeurs nulles des deux colonnes avant de les joindre. Deuxièmement, je devais faire chaque étape de mon dataframe d'origine. Avoir plusieurs dataframes avec les mêmes noms de colonnes ne fonctionnait pas très bien avec PySpark. J'ai dû déboguer cela pour voir ce qui n'allait pas dans mon code. Il s'est avéré que j'avais besoin d'utiliser le même dataframe tout au long des différentes opérations.



2
votes

Pour Spark 2.4+, cela peut être fait en utilisant uniquement Fonction zip_with pour compresser une concaténation au même moment:

df.withColumn("column_3", expr("zip_with(column_1, column_2, (x, y) -> concat(x, '_', y))")) 

La fonction d'ordre supérieur prend 2 tableaux pour fusionner, élément par élément, en utilisant un fonction lambda (x, y) -> concat (x, '_', y) .


0 commentaires