J'ai écrit un code de prétraitement de données dans Pandas UDF dans PySpark. J'utilise la fonction lambda pour extraire une partie du texte de tous les enregistrements d'une colonne.
Voici à quoi ressemble mon code:
@pandas_udf("string", PandasUDFType.SCALAR)
def get_X(col):
return col.apply(lambda x: x.split(',')[-1] if len(x.split(',')) > 0 else x)
df = df.withColumn('X', get_first_name(df.Y))
Cela fonctionne bien et donne les résultats souhaités. Mais j'ai besoin d'écrire le même morceau de logique dans un code équivalent Spark. Y a-t-il un moyen de le faire? Merci.
3 Réponses :
Vous pouvez faire de même en utilisant quand pour implémenter if-then -else logic :
Premier split la colonne, puis calculez son taille . Si la taille est supérieure à 0, prenez le dernier élément du tableau divisé . Sinon, renvoyez la colonne d'origine.
df = df.withColumn('First_Name', get_first_name(df.BENF_NME))
df.show()
#+---------+----------+
#| BENF_NME|First_Name|
#+---------+----------+
#|Doe, John| John|
#| Madonna| Madonna|
#+---------+----------+
À titre d'exemple, supposons que vous ayez le DataFrame suivant:
df.show() #+---------+ #| BENF_NME| #+---------+ #|Doe, John| #| Madonna| #+---------+
Vous pouvez appelez la nouvelle fonction comme avant:
from pyspark.sql.functions import split, size, when
def get_first_name(col):
col_split = split(col, ',')
split_size = size(col_split)
return when(split_size > 0, col_split[split_size-1]).otherwise(col)
Je pense qu'une fonction substring_index est suffisant pour cette tâche particulière:
from pyspark.sql.functions import substring_index
df = spark.createDataFrame([(x,) for x in ['f,l', 'g', 'a,b,cd']], ['c1'])
df2.withColumn('c2', substring_index('c1', ',', -1)).show()
+------+---+
| c1| c2|
+------+---+
| f,l| l|
| g| g|
|a,b,cd| cd|
+------+---+
Étant donné le DataFrame suivant df :
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
get_first_name = udf(lambda s: s.split(',')[-1], StringType())
df.withColumn('First_Name', get_first_name(df.BENF_NME)).show()
# +-------------+----------+
# | BENF_NME|First_Name|
# +-------------+----------+
# | Doe, John| John|
# | Foo| Foo|
# |Baz, Quux,Bar| Bar|
# +-------------+----------+
Vous pouvez simplement utiliser regexp_extract () pour sélectionner le prénom:
XXX
Si vous ne vous souciez pas des espaces de début possibles, substring_index () fournit une alternative simple à votre logique d'origine:
df.withColumn(...).collect()[0] # Row(BENF_NME=u'Doe, John', First_Name=u' John'
Dans ce cas, le First_Name de la première ligne est précédé d'un espace:
from pyspark.sql.functions import substring_index
df.withColumn('First_Name', substring_index(df.BENF_NME, ',', -1)).show()
# +-------------+----------+
# | BENF_NME|First_Name|
# +-------------+----------+
# | Doe, John| John|
# | Foo| Foo|
# |Baz, Quux,Bar| Bar|
# +-------------+----------+
Si vous souhaitez toujours utiliser une fonction personnalisée, vous devez créer une fonction définie par l'utilisateur (UDF) en utilisant udf () :
from pyspark.sql.functions import regexp_extract
df.withColumn('First_Name', regexp_extract(df.BENF_NME, r'(?:.*,\s*)?(.*)', 1)).show()
# +-------------+----------+
# | BENF_NME|First_Name|
# +-------------+----------+
# | Doe, John| John|
# | Foo| Foo|
# |Baz, Quux,Bar| Bar|
# +-------------+----------+
Notez que les UDF sont plus lents que les b Fonctions Spark intégrées, en particulier les UDF Python.
Cela n'a rien à voir avec votre problème en soi, mais
len (x.split (','))> 0est toujoursTrueà moins quex code > vautAucun, auquel cas vous ne pouvez pas appelersplit ()dessus.