2
votes

Normaliser la trame de données pyspark par groupe

Je souhaite normaliser mon bloc de données dans pyspark par groupe. La solution proposée ici n'aide pas, car je souhaite transformer chaque colonne en ma trame de données. Le code que j'ai utilisé en python sur un pandas df est le suivant:

columns = ['group','sensor1', 'sensor2', 'sensor3']
vals = [
    (a, 0.8, 0, 0.4118),
    (a, 0.5, 0.1026, 1),
    (a, 1, 0.615, 0.11),
    (a, 0, 1, 0)
    (b, 0.333, 0, 0)
    (b, 1, 0.333, 1)
    (b, 0, 1, 0.333)
]

Comment puis-je faire cela dans pyspark avec un df ou avec RDD?

Exemple: entrée:

columns = ['group', 'sensor1', 'sensor2', 'sensor3']
vals = [
    (a, 0.8, 0.02, 100),
    (a, 0.5, 0.1, 200),
    (a, 1, 0.5, 50),
    (a, 0, 0.8, 30)
    (b, 10, 1, 0)
    (b, 20, 2, 3)
    (b, 5, 4, 1)
]

sortie souhaitée:

df_norm = (X_df
.groupby('group')
.transform(lambda x: (x - x.min())/(x.max() - x.min()))
.fillna(0))


4 commentaires

Êtes-vous prêt à utiliser MinMaxScaler ?


Oui! pouvez-vous le combiner avec une fonction de fenêtre?


pourriez-vous fournir un exemple reproductible avec de petits exemples de données et la sortie souhaitée?


done :) Je veux juste faire une transformation min-max pour chaque colonne de mon df par groupe


3 Réponses :


0
votes

J'utilise spark 2.3.0 . Vous pouvez effectuer les opérations suivantes:

from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

# group function will use this schema
schema = StructType([
    StructField("group", StringType()),
    StructField("sensor1", DoubleType()),
    StructField("sensor2", DoubleType()),
    StructField("sensor3", DoubleType()),
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def func(df):

    # you don't need to do this if sensor columns already are float
    df.iloc[:,1:] = df.iloc[:,1:].astype(float)

    # select column to normalize
    cols = df.columns.difference(['group'])

    # do groupby
    result = df.groupby('group')[cols].apply(lambda x: (x - x.min())/(x.max() - x.min()))

    return pd.concat([df['group'], result], axis=1)

# apply the function
df.groupby('group').apply(func)

+-----+------------------+-------------------+-------------------+
|group|           sensor1|            sensor2|            sensor3|
+-----+------------------+-------------------+-------------------+
|    b|0.3333333333333333|                0.0|                0.0|
|    b|               1.0| 0.3333333333333333|                1.0|
|    b|               0.0|                1.0| 0.3333333333333333|
|    a|               0.8|                0.0| 0.4117647058823529|
|    a|               0.5|0.10256410256410256|                1.0|
|    a|               1.0| 0.6153846153846153|0.11764705882352941|
|    a|               0.0|                1.0|                0.0|
+-----+------------------+-------------------+-------------------+


2 commentaires

YOLO, pouvons-nous utiliser les fonctions pandas de cette façon à partir de spark 2.3 ?


oui, vous pouvez le faire: spark .apache.org / docs / latest /…



0
votes
from pyspark.sql.functions import min, max
from pyspark.sql.window import Window
vals = [('a',0.8,0.02,100),('a',0.5,0.1,200),('a',1.0,0.5,50),('a',0.0,0.8,30),
        ('b',10.0,1.0,0),('b',20.0,2.0,3),('b',5.0,4.0,1)]
df = sqlContext.createDataFrame(vals,['group', 'sensor1', 'sensor2', 'sensor3'])
df.show()
+-----+-------+-------+-------+
|group|sensor1|sensor2|sensor3|
+-----+-------+-------+-------+
|    a|    0.8|   0.02|    100|
|    a|    0.5|    0.1|    200|
|    a|    1.0|    0.5|     50|
|    a|    0.0|    0.8|     30|
|    b|   10.0|    1.0|      0|
|    b|   20.0|    2.0|      3|
|    b|    5.0|    4.0|      1|
+-----+-------+-------+-------+

w = Window().partitionBy('group')
df = df.withColumn('min_sensor1',min(col('sensor1')).over(w))\
       .withColumn('max_sensor1',max(col('sensor1')).over(w))\
       .withColumn('min_sensor2',min(col('sensor2')).over(w))\
       .withColumn('max_sensor2',max(col('sensor2')).over(w))\
       .withColumn('min_sensor3',min(col('sensor3')).over(w))\
       .withColumn('max_sensor3',max(col('sensor3')).over(w))\
       .withColumn('sensor1',((col('sensor1')-col('min_sensor1'))/(col('max_sensor1')-col('min_sensor1'))))\
       .withColumn('sensor2',((col('sensor2')-col('min_sensor2'))/(col('max_sensor2')-col('min_sensor2'))))\
       .withColumn('sensor3',((col('sensor3')-col('min_sensor3'))/(col('max_sensor3')-col('min_sensor3'))))\
       .drop('min_sensor1','max_sensor1','min_sensor2','max_sensor2','min_sensor3','max_sensor3')

df.show()    
+-----+------------------+-------------------+-------------------+
|group|           sensor1|            sensor2|            sensor3|
+-----+------------------+-------------------+-------------------+
|    b|0.3333333333333333|                0.0|                0.0|
|    b|               1.0| 0.3333333333333333|                1.0|
|    b|               0.0|                1.0| 0.3333333333333333|
|    a|               0.8|                0.0| 0.4117647058823529|
|    a|               0.5|0.10256410256410256|                1.0|
|    a|               1.0| 0.6153846153846153|0.11764705882352941|
|    a|               0.0|                1.0|                0.0|
+-----+------------------+-------------------+-------------------+

1 commentaires

J'ai 100 colonnes. Pas une solution très pratique.



2
votes

J'ai fini par le faire de cette façon:

w = Window.partitionBy('group')
for c in cols_to_normalize:
    df = (df.withColumn('mini', F.min(c).over(w))
        .withColumn('maxi', F.max(c).over(w))
        .withColumn(c, ((F.col(c) - F.col('mini')) / (F.col('maxi') - F.col('mini'))))
        .drop('mini')
        .drop('maxi'))


0 commentaires