J'ai le dataframe pyspark (df) ayant ci-dessous un exemple de tableau (table1): id, col1, col2, col3 1, abc, null, def 2, nul, def, abc 3, def, abc, null
J'essaie d'obtenir une nouvelle colonne (finale) en ajoutant toutes les colonnes en ignorant les valeurs nulles. J'ai essayé le code pyspark et utilisé f.array (col1, col2, col3). Les valeurs sont ajoutées mais n'ignorent pas les valeurs nulles. J'ai également essayé UDF d'ajouter uniquement des colonnes non nulles mais cela ne fonctionne pas.
import pyspark.sql.functions as f df = spark.table('table1') df = df.withColumn('final', f.array(col1,col2,col3)) Actual result: id, col1, col2, col3, final 1, abc, null, def, [abc,,def] 2, null, def, abc, [,def, abc] 3, def, abc, null, [def,abc,,] expected result: id, col1, col2, col3, final 1, abc, null, def, [abc,def] 2, null, def, abc, [def, abc] 3, def, abc, null, [def,abc] my col1, col2, col3 schema are as below: where as col1 name is applications applications: struct (nullable = false) applicationid: string (nullable = true) createdat: string (nullable = true) updatedat: string (nullable = true) source_name: string (nullable = true) status: string (nullable = true) creditbureautypeid: string (nullable = true) score: integer (nullable = true) applicationcreditreportid: string (nullable = true) firstname: string (nullable = false) lastname: string (nullable = false) dateofbirth: string (nullable = false) accounts: array (nullable = true) element: struct (containsNull = true) applicationcreditreportaccountid: string (nullable = true) currentbalance: integer (nullable = true) institutionid: string (nullable = true) accounttypeid: string (nullable = true) dayspastdue: integer (nullable = true) institution_name: string (nullable = true) account_type_name: string (nullable = true)
s'il vous plaît laissez-moi savoir si la question n'est pas claire ou si plus d'informations sont nécessaires. Toute aide serait appréciée. :)
3 Réponses :
Vous pouvez définir votre propre UDF
comme suit:
df = spark.table('table1') df = df.withColumn('final', f.udf(only_not_null)(col1,col2,col3))
Et puis appeler:
def only_not_null(st,nd,rd): return [x for x in locals().values() if x is not None] # Take non empty columns
pourquoi utiliseriez-vous locals (). values ()
?
Utiliser également if x
est incorrect car cela filtrera toutes les valeurs Falsey telles que 0
ou ''
Utilisation d'un UDF
from pyspark.sql.functions import udf, array def join_columns(row_list): return [cell_val for cell_val in row_list if cell_val is not None] join_udf = udf(join_columns) df = spark.table('table1') df = df.withColumn('final', join_udf(array(col1,col2,col3))
Fonctionne pour plusieurs colonnes non seulement 3, modifiez simplement les colonnes à l'intérieur du tableau.
Merci pour votre réponse ! son fonctionnement mais mon schéma de colonnes est complexe. J'ai donné la colonne de chaîne comme exemple. Lorsque je définis la fonction, je dois également donner le type de retour. J'ai mentionné mon schéma de colonne.
Depuis Spark 2.4, vous pouvez utiliser des fonctions d'ordre supérieur pour ce faire (aucun UDF n'est nécessaire). Dans PySpark, la requête peut ressembler à ceci:
result = ( df .withColumn("temp", f.array("col1", "col2", "col3")) .withColumn("final", f.expr("FILTER(temp, x -> x is not null)")) .drop("temp") )
Super @David. Cela fonctionne :) J'ai essayé plusieurs solutions mais cela fonctionne à merveille. Merci beaucoup
En fait, la version de pyspark ne fera probablement aucune différence: Comment supprimer les valeurs NULL avec la fonction intégrée array_remove Spark SQL