1
votes

Équivalent PySpark pour la fonction lambda dans Pandas UDF

J'ai écrit un code de prétraitement de données dans Pandas UDF dans PySpark. J'utilise la fonction lambda pour extraire une partie du texte de tous les enregistrements d'une colonne.

Voici à quoi ressemble mon code:

@pandas_udf("string", PandasUDFType.SCALAR)
def get_X(col):
      return col.apply(lambda x: x.split(',')[-1] if len(x.split(',')) > 0 else x)

df = df.withColumn('X', get_first_name(df.Y))

Cela fonctionne bien et donne les résultats souhaités. Mais j'ai besoin d'écrire le même morceau de logique dans un code équivalent Spark. Y a-t-il un moyen de le faire? Merci.


1 commentaires

Cela n'a rien à voir avec votre problème en soi, mais len (x.split (','))> 0 est toujours True à moins que x vaut Aucun , auquel cas vous ne pouvez pas appeler split () dessus.


3 Réponses :


0
votes

Vous pouvez faire de même en utilisant quand pour implémenter if-then -else logic :

Premier split la colonne, puis calculez son taille . Si la taille est supérieure à 0, prenez le dernier élément du tableau divisé . Sinon, renvoyez la colonne d'origine.

df = df.withColumn('First_Name', get_first_name(df.BENF_NME))
df.show()
#+---------+----------+
#| BENF_NME|First_Name|
#+---------+----------+
#|Doe, John|      John|
#|  Madonna|   Madonna|
#+---------+----------+

À titre d'exemple, supposons que vous ayez le DataFrame suivant:

df.show()
#+---------+
#| BENF_NME|
#+---------+
#|Doe, John|
#|  Madonna|
#+---------+

Vous pouvez appelez la nouvelle fonction comme avant:

from pyspark.sql.functions import split, size, when

def get_first_name(col):
    col_split = split(col, ',')
    split_size = size(col_split)
    return when(split_size > 0, col_split[split_size-1]).otherwise(col)


0 commentaires

2
votes

Je pense qu'une fonction substring_index est suffisant pour cette tâche particulière:

from pyspark.sql.functions import substring_index

df = spark.createDataFrame([(x,) for x in ['f,l', 'g', 'a,b,cd']], ['c1'])

df2.withColumn('c2', substring_index('c1', ',', -1)).show()                                                                 
+------+---+
|    c1| c2|
+------+---+
|   f,l|  l|
|     g|  g|
|a,b,cd| cd|
+------+---+


0 commentaires

1
votes

Étant donné le DataFrame suivant df :

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
get_first_name = udf(lambda s: s.split(',')[-1], StringType())
df.withColumn('First_Name', get_first_name(df.BENF_NME)).show()
# +-------------+----------+
# |     BENF_NME|First_Name|
# +-------------+----------+
# |    Doe, John|      John|
# |          Foo|       Foo|
# |Baz, Quux,Bar|       Bar|
# +-------------+----------+

Vous pouvez simplement utiliser regexp_extract () pour sélectionner le prénom:

XXX

Si vous ne vous souciez pas des espaces de début possibles, substring_index () fournit une alternative simple à votre logique d'origine:

df.withColumn(...).collect()[0]
# Row(BENF_NME=u'Doe, John', First_Name=u' John'

Dans ce cas, le First_Name de la première ligne est précédé d'un espace:

from pyspark.sql.functions import substring_index
df.withColumn('First_Name', substring_index(df.BENF_NME, ',', -1)).show()
# +-------------+----------+
# |     BENF_NME|First_Name|
# +-------------+----------+
# |    Doe, John|      John|
# |          Foo|       Foo|
# |Baz, Quux,Bar|       Bar|
# +-------------+----------+

Si vous souhaitez toujours utiliser une fonction personnalisée, vous devez créer une fonction définie par l'utilisateur (UDF) en utilisant udf () :

from pyspark.sql.functions import regexp_extract
df.withColumn('First_Name', regexp_extract(df.BENF_NME, r'(?:.*,\s*)?(.*)', 1)).show()
# +-------------+----------+
# |     BENF_NME|First_Name|
# +-------------+----------+
# |    Doe, John|      John|
# |          Foo|       Foo|
# |Baz, Quux,Bar|       Bar|
# +-------------+----------+

Notez que les UDF sont plus lents que les b Fonctions Spark intégrées, en particulier les UDF Python.


0 commentaires