1
votes

pyspark function.lag sous condition

J'essaie de résoudre un problème avec pyspark,

J'ai un ensemble de données tel que:

Condition | Date | Lag

0 | 2019/01/10 | NaN
1 | 2019/01/11 | NaN
0 | 2019/01/15 | 2019/01/11
1 | 2019/01/16 | 2019/01/11
1 | 2019/01/19 | 2019/01/16
0 | 2019/01/23 | 2019/01/19
0 | 2019/01/25 | 2019/01/19
1 | 2019/01/29 | 2019/01/19
1 | 2019/01/30 | 2019/01/29

Je voudrais obtenir la dernière valeur de décalage de la date colonne lorsque la condition == 1 était remplie

La sortie souhaitée serait quelque chose comme:

Condition | Date

0 | 2019/01/10
1 | 2019/01/11
0 | 2019/01/15
1 | 2019/01/16
1 | 2019/01/19
0 | 2019/01/23
0 | 2019/01/25
1 | 2019/01/29
1 | 2019/01/30

Comment puis-je effectuer cela?

Veuillez garder à l'esprit qu'il s'agit d'un très grand ensemble de données - que je devrai partitionner et regrouper par un UUID pour que la solution soit quelque peu performante.

Merci,


0 commentaires

3 Réponses :


1
votes

En SQL, vous pouvez utiliser une exécution conditionnelle max():

select t.*,
       max(case when condition = 1 then date end) over (order by date
                                                        rows between unbounded preceding and 1 preceding
                                                       ) as prev_condition_1_date
from t;


1 commentaires

Salut! Merci pour la réponse rapide, j'essaie de l'implémenter sur pyspark mais j'ai un peu de mal.



1
votes

Voici une solution avec Pyspark. La logique reste la même que la solution de @ GordonLinoff avec requête SQL.

+---------+----------+----------+
|Condition|      Date|       Lag|
+---------+----------+----------+
|        0|2019/01/10|      null|
|        1|2019/01/11|      null|
|        0|2019/01/15|2019/01/11|
|        1|2019/01/16|2019/01/11|
|        1|2019/01/19|2019/01/16|
|        0|2019/01/23|2019/01/19|
|        0|2019/01/25|2019/01/19|
|        1|2019/01/29|2019/01/19|
|        1|2019/01/30|2019/01/29|
+---------+----------+----------+

Donne :

w = Window.orderBy("Date").rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
df.withColumn("Lag", max(when(col("Condition") == lit(1), col("Date"))).over(w)).show()


0 commentaires

1
votes

J'aime utiliser SQL pour résoudre ce problème:

from pyspark.sql.functions import expr

display(
  df.withColumn(
    'lag', 
    expr('max(case when Condition == 1 then Date end) over (order by Date rows between unbounded preceding and 1 preceding)'
  )
)


0 commentaires