Je souhaite normaliser mon bloc de données dans pyspark par groupe. La solution proposée ici n'aide pas, car je souhaite transformer chaque colonne en ma trame de données. Le code que j'ai utilisé en python sur un pandas df est le suivant:
columns = ['group','sensor1', 'sensor2', 'sensor3']
vals = [
(a, 0.8, 0, 0.4118),
(a, 0.5, 0.1026, 1),
(a, 1, 0.615, 0.11),
(a, 0, 1, 0)
(b, 0.333, 0, 0)
(b, 1, 0.333, 1)
(b, 0, 1, 0.333)
]
Comment puis-je faire cela dans pyspark avec un df ou avec RDD?
Exemple: entrée:
columns = ['group', 'sensor1', 'sensor2', 'sensor3']
vals = [
(a, 0.8, 0.02, 100),
(a, 0.5, 0.1, 200),
(a, 1, 0.5, 50),
(a, 0, 0.8, 30)
(b, 10, 1, 0)
(b, 20, 2, 3)
(b, 5, 4, 1)
]
sortie souhaitée:
df_norm = (X_df
.groupby('group')
.transform(lambda x: (x - x.min())/(x.max() - x.min()))
.fillna(0))
3 Réponses :
J'utilise spark 2.3.0 . Vous pouvez effectuer les opérations suivantes:
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
# group function will use this schema
schema = StructType([
StructField("group", StringType()),
StructField("sensor1", DoubleType()),
StructField("sensor2", DoubleType()),
StructField("sensor3", DoubleType()),
])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def func(df):
# you don't need to do this if sensor columns already are float
df.iloc[:,1:] = df.iloc[:,1:].astype(float)
# select column to normalize
cols = df.columns.difference(['group'])
# do groupby
result = df.groupby('group')[cols].apply(lambda x: (x - x.min())/(x.max() - x.min()))
return pd.concat([df['group'], result], axis=1)
# apply the function
df.groupby('group').apply(func)
+-----+------------------+-------------------+-------------------+
|group| sensor1| sensor2| sensor3|
+-----+------------------+-------------------+-------------------+
| b|0.3333333333333333| 0.0| 0.0|
| b| 1.0| 0.3333333333333333| 1.0|
| b| 0.0| 1.0| 0.3333333333333333|
| a| 0.8| 0.0| 0.4117647058823529|
| a| 0.5|0.10256410256410256| 1.0|
| a| 1.0| 0.6153846153846153|0.11764705882352941|
| a| 0.0| 1.0| 0.0|
+-----+------------------+-------------------+-------------------+
YOLO, pouvons-nous utiliser les fonctions pandas de cette façon à partir de spark 2.3 ?
oui, vous pouvez le faire: spark .apache.org / docs / latest /…
from pyspark.sql.functions import min, max
from pyspark.sql.window import Window
vals = [('a',0.8,0.02,100),('a',0.5,0.1,200),('a',1.0,0.5,50),('a',0.0,0.8,30),
('b',10.0,1.0,0),('b',20.0,2.0,3),('b',5.0,4.0,1)]
df = sqlContext.createDataFrame(vals,['group', 'sensor1', 'sensor2', 'sensor3'])
df.show()
+-----+-------+-------+-------+
|group|sensor1|sensor2|sensor3|
+-----+-------+-------+-------+
| a| 0.8| 0.02| 100|
| a| 0.5| 0.1| 200|
| a| 1.0| 0.5| 50|
| a| 0.0| 0.8| 30|
| b| 10.0| 1.0| 0|
| b| 20.0| 2.0| 3|
| b| 5.0| 4.0| 1|
+-----+-------+-------+-------+
w = Window().partitionBy('group')
df = df.withColumn('min_sensor1',min(col('sensor1')).over(w))\
.withColumn('max_sensor1',max(col('sensor1')).over(w))\
.withColumn('min_sensor2',min(col('sensor2')).over(w))\
.withColumn('max_sensor2',max(col('sensor2')).over(w))\
.withColumn('min_sensor3',min(col('sensor3')).over(w))\
.withColumn('max_sensor3',max(col('sensor3')).over(w))\
.withColumn('sensor1',((col('sensor1')-col('min_sensor1'))/(col('max_sensor1')-col('min_sensor1'))))\
.withColumn('sensor2',((col('sensor2')-col('min_sensor2'))/(col('max_sensor2')-col('min_sensor2'))))\
.withColumn('sensor3',((col('sensor3')-col('min_sensor3'))/(col('max_sensor3')-col('min_sensor3'))))\
.drop('min_sensor1','max_sensor1','min_sensor2','max_sensor2','min_sensor3','max_sensor3')
df.show()
+-----+------------------+-------------------+-------------------+
|group| sensor1| sensor2| sensor3|
+-----+------------------+-------------------+-------------------+
| b|0.3333333333333333| 0.0| 0.0|
| b| 1.0| 0.3333333333333333| 1.0|
| b| 0.0| 1.0| 0.3333333333333333|
| a| 0.8| 0.0| 0.4117647058823529|
| a| 0.5|0.10256410256410256| 1.0|
| a| 1.0| 0.6153846153846153|0.11764705882352941|
| a| 0.0| 1.0| 0.0|
+-----+------------------+-------------------+-------------------+
J'ai 100 colonnes. Pas une solution très pratique.
J'ai fini par le faire de cette façon:
w = Window.partitionBy('group')
for c in cols_to_normalize:
df = (df.withColumn('mini', F.min(c).over(w))
.withColumn('maxi', F.max(c).over(w))
.withColumn(c, ((F.col(c) - F.col('mini')) / (F.col('maxi') - F.col('mini'))))
.drop('mini')
.drop('maxi'))
Êtes-vous prêt à utiliser
MinMaxScalera >?Oui! pouvez-vous le combiner avec une fonction de fenêtre?
pourriez-vous fournir un exemple reproductible avec de petits exemples de données et la sortie souhaitée?
done :) Je veux juste faire une transformation min-max pour chaque colonne de mon df par groupe