J'ai écrit un code de prétraitement de données dans Pandas UDF dans PySpark. J'utilise la fonction lambda pour extraire une partie du texte de tous les enregistrements d'une colonne.
Voici à quoi ressemble mon code:
@pandas_udf("string", PandasUDFType.SCALAR) def get_X(col): return col.apply(lambda x: x.split(',')[-1] if len(x.split(',')) > 0 else x) df = df.withColumn('X', get_first_name(df.Y))
Cela fonctionne bien et donne les résultats souhaités. Mais j'ai besoin d'écrire le même morceau de logique dans un code équivalent Spark. Y a-t-il un moyen de le faire? Merci.
3 Réponses :
Vous pouvez faire de même en utilisant quand
pour implémenter if-then -else logic :
Premier split
la colonne, puis calculez son taille
. Si la taille est supérieure à 0, prenez le dernier élément du tableau divisé . Sinon, renvoyez la colonne d'origine.
df = df.withColumn('First_Name', get_first_name(df.BENF_NME)) df.show() #+---------+----------+ #| BENF_NME|First_Name| #+---------+----------+ #|Doe, John| John| #| Madonna| Madonna| #+---------+----------+
À titre d'exemple, supposons que vous ayez le DataFrame suivant:
df.show() #+---------+ #| BENF_NME| #+---------+ #|Doe, John| #| Madonna| #+---------+
Vous pouvez appelez la nouvelle fonction comme avant:
from pyspark.sql.functions import split, size, when def get_first_name(col): col_split = split(col, ',') split_size = size(col_split) return when(split_size > 0, col_split[split_size-1]).otherwise(col)
Je pense qu'une fonction substring_index est suffisant pour cette tâche particulière:
from pyspark.sql.functions import substring_index df = spark.createDataFrame([(x,) for x in ['f,l', 'g', 'a,b,cd']], ['c1']) df2.withColumn('c2', substring_index('c1', ',', -1)).show() +------+---+ | c1| c2| +------+---+ | f,l| l| | g| g| |a,b,cd| cd| +------+---+
Étant donné le DataFrame suivant df
:
from pyspark.sql.functions import udf from pyspark.sql.types import StringType get_first_name = udf(lambda s: s.split(',')[-1], StringType()) df.withColumn('First_Name', get_first_name(df.BENF_NME)).show() # +-------------+----------+ # | BENF_NME|First_Name| # +-------------+----------+ # | Doe, John| John| # | Foo| Foo| # |Baz, Quux,Bar| Bar| # +-------------+----------+
Vous pouvez simplement utiliser regexp_extract ()
pour sélectionner le prénom:
XXX
Si vous ne vous souciez pas des espaces de début possibles, substring_index ()
fournit une alternative simple à votre logique d'origine:
df.withColumn(...).collect()[0] # Row(BENF_NME=u'Doe, John', First_Name=u' John'
Dans ce cas, le First_Name
de la première ligne est précédé d'un espace:
from pyspark.sql.functions import substring_index df.withColumn('First_Name', substring_index(df.BENF_NME, ',', -1)).show() # +-------------+----------+ # | BENF_NME|First_Name| # +-------------+----------+ # | Doe, John| John| # | Foo| Foo| # |Baz, Quux,Bar| Bar| # +-------------+----------+
Si vous souhaitez toujours utiliser une fonction personnalisée, vous devez créer une fonction définie par l'utilisateur (UDF) en utilisant udf ()
:
from pyspark.sql.functions import regexp_extract df.withColumn('First_Name', regexp_extract(df.BENF_NME, r'(?:.*,\s*)?(.*)', 1)).show() # +-------------+----------+ # | BENF_NME|First_Name| # +-------------+----------+ # | Doe, John| John| # | Foo| Foo| # |Baz, Quux,Bar| Bar| # +-------------+----------+
Notez que les UDF sont plus lents que les b Fonctions Spark intégrées, en particulier les UDF Python.
Cela n'a rien à voir avec votre problème en soi, mais
len (x.split (','))> 0
est toujoursTrue
à moins quex code > vaut
Aucun
, auquel cas vous ne pouvez pas appelersplit ()
dessus.