Je souhaite normaliser mon bloc de données dans pyspark par groupe. La solution proposée ici n'aide pas, car je souhaite transformer chaque colonne en ma trame de données. Le code que j'ai utilisé en python sur un pandas df est le suivant:
columns = ['group','sensor1', 'sensor2', 'sensor3'] vals = [ (a, 0.8, 0, 0.4118), (a, 0.5, 0.1026, 1), (a, 1, 0.615, 0.11), (a, 0, 1, 0) (b, 0.333, 0, 0) (b, 1, 0.333, 1) (b, 0, 1, 0.333) ]
Comment puis-je faire cela dans pyspark avec un df ou avec RDD?
Exemple: entrée:
columns = ['group', 'sensor1', 'sensor2', 'sensor3'] vals = [ (a, 0.8, 0.02, 100), (a, 0.5, 0.1, 200), (a, 1, 0.5, 50), (a, 0, 0.8, 30) (b, 10, 1, 0) (b, 20, 2, 3) (b, 5, 4, 1) ]
sortie souhaitée:
df_norm = (X_df .groupby('group') .transform(lambda x: (x - x.min())/(x.max() - x.min())) .fillna(0))
3 Réponses :
J'utilise spark 2.3.0
. Vous pouvez effectuer les opérations suivantes:
from pyspark.sql.types import * from pyspark.sql.functions import pandas_udf from pyspark.sql.functions import PandasUDFType # group function will use this schema schema = StructType([ StructField("group", StringType()), StructField("sensor1", DoubleType()), StructField("sensor2", DoubleType()), StructField("sensor3", DoubleType()), ]) @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP) def func(df): # you don't need to do this if sensor columns already are float df.iloc[:,1:] = df.iloc[:,1:].astype(float) # select column to normalize cols = df.columns.difference(['group']) # do groupby result = df.groupby('group')[cols].apply(lambda x: (x - x.min())/(x.max() - x.min())) return pd.concat([df['group'], result], axis=1) # apply the function df.groupby('group').apply(func) +-----+------------------+-------------------+-------------------+ |group| sensor1| sensor2| sensor3| +-----+------------------+-------------------+-------------------+ | b|0.3333333333333333| 0.0| 0.0| | b| 1.0| 0.3333333333333333| 1.0| | b| 0.0| 1.0| 0.3333333333333333| | a| 0.8| 0.0| 0.4117647058823529| | a| 0.5|0.10256410256410256| 1.0| | a| 1.0| 0.6153846153846153|0.11764705882352941| | a| 0.0| 1.0| 0.0| +-----+------------------+-------------------+-------------------+
YOLO, pouvons-nous utiliser les fonctions pandas
de cette façon à partir de spark 2.3
?
oui, vous pouvez le faire: spark .apache.org / docs / latest /…
from pyspark.sql.functions import min, max from pyspark.sql.window import Window vals = [('a',0.8,0.02,100),('a',0.5,0.1,200),('a',1.0,0.5,50),('a',0.0,0.8,30), ('b',10.0,1.0,0),('b',20.0,2.0,3),('b',5.0,4.0,1)] df = sqlContext.createDataFrame(vals,['group', 'sensor1', 'sensor2', 'sensor3']) df.show() +-----+-------+-------+-------+ |group|sensor1|sensor2|sensor3| +-----+-------+-------+-------+ | a| 0.8| 0.02| 100| | a| 0.5| 0.1| 200| | a| 1.0| 0.5| 50| | a| 0.0| 0.8| 30| | b| 10.0| 1.0| 0| | b| 20.0| 2.0| 3| | b| 5.0| 4.0| 1| +-----+-------+-------+-------+ w = Window().partitionBy('group') df = df.withColumn('min_sensor1',min(col('sensor1')).over(w))\ .withColumn('max_sensor1',max(col('sensor1')).over(w))\ .withColumn('min_sensor2',min(col('sensor2')).over(w))\ .withColumn('max_sensor2',max(col('sensor2')).over(w))\ .withColumn('min_sensor3',min(col('sensor3')).over(w))\ .withColumn('max_sensor3',max(col('sensor3')).over(w))\ .withColumn('sensor1',((col('sensor1')-col('min_sensor1'))/(col('max_sensor1')-col('min_sensor1'))))\ .withColumn('sensor2',((col('sensor2')-col('min_sensor2'))/(col('max_sensor2')-col('min_sensor2'))))\ .withColumn('sensor3',((col('sensor3')-col('min_sensor3'))/(col('max_sensor3')-col('min_sensor3'))))\ .drop('min_sensor1','max_sensor1','min_sensor2','max_sensor2','min_sensor3','max_sensor3') df.show() +-----+------------------+-------------------+-------------------+ |group| sensor1| sensor2| sensor3| +-----+------------------+-------------------+-------------------+ | b|0.3333333333333333| 0.0| 0.0| | b| 1.0| 0.3333333333333333| 1.0| | b| 0.0| 1.0| 0.3333333333333333| | a| 0.8| 0.0| 0.4117647058823529| | a| 0.5|0.10256410256410256| 1.0| | a| 1.0| 0.6153846153846153|0.11764705882352941| | a| 0.0| 1.0| 0.0| +-----+------------------+-------------------+-------------------+
J'ai 100 colonnes. Pas une solution très pratique.
J'ai fini par le faire de cette façon:
w = Window.partitionBy('group') for c in cols_to_normalize: df = (df.withColumn('mini', F.min(c).over(w)) .withColumn('maxi', F.max(c).over(w)) .withColumn(c, ((F.col(c) - F.col('mini')) / (F.col('maxi') - F.col('mini')))) .drop('mini') .drop('maxi'))
Êtes-vous prêt à utiliser
MinMaxScaler
a >?Oui! pouvez-vous le combiner avec une fonction de fenêtre?
pourriez-vous fournir un exemple reproductible avec de petits exemples de données et la sortie souhaitée?
done :) Je veux juste faire une transformation min-max pour chaque colonne de mon df par groupe