Quelle est la bonne façon de spécifier l'intervalle de fenêtre dans Spark SQL, en utilisant deux limites prédéfinies?
J'essaie de résumer les valeurs de ma table sur une fenêtre de "il y a entre 3 heures et il y a 2 heures".
/ p>
Quand j'exécute cette requête:
select *, sum(value) over ( partition by a, b order by cast(time_value as timestamp) range between interval 3 hours preceding and 2 preceding ) as sum_value from my_temp_table;
Cela fonctionne. J'obtiens des résultats que j'attends, c'est-à-dire des sommes de valeurs qui tombent dans une fenêtre glissante de 2 heures.
Maintenant, ce dont j'ai besoin, c'est que cette fenêtre déroulante ne soit pas liée à la ligne courante mais qu'elle prenne en compte les lignes entre il y a 3 heures et il y a 2 heures. J'ai essayé avec:
select *, sum(value) over ( partition by a, b order by cast(time_value as timestamp) range between interval 3 hours preceding and interval 2 hours preceding ) as sum_value from my_temp_table;
Mais j'obtiens une entrée superflue 'heures' en attendant une erreur {'PRECEDING', 'FOLLOWING'}
.
J'ai aussi essayé avec:
select *, sum(value) over ( partition by a, b order by cast(time_value as timestamp) range between interval 3 hours preceding and 2 hours preceding ) as sum_value from my_temp_table;
mais j'obtiens une erreur différente scala.MatchError: CalendarIntervalType (de classe org.apache.spark.sql.types. CalendarIntervalType $)
La troisième option que j'ai essayée est:
select *, sum(value) over ( partition by a, b order by cast(time_value as timestamp) range between interval 2 hours preceding and current row ) as sum_value from my_temp_table;
et cela ne fonctionne pas comme prévu: ne peut pas résoudre "RANGE BETWEEN interval 3 hours PRECEDING AND 2 PRECEDING" en raison d'une incompatibilité de type de données
J'ai des difficultés à trouver les documents pour le type d'intervalle comme ce lien n'en dit pas assez et d'autres informations sont un peu à moitié cuites. Au moins ce que j'ai trouvé.
3 Réponses :
Étant donné que les intervalles de distance ne fonctionnaient pas, j'ai dû me tourner vers une approche alternative. Cela ressemble à quelque chose comme ceci:
Dans mon cas, je devais exécuter des calculs pour chaque heure de la journée et combiner ces résultats "horaires", c'est-à-dire une liste de 24 blocs de données, en un seul bloc de données "quotidien". p>
Le code, du point de vue de très haut niveau, ressemble à ceci:
val hourlyDFs = for ((hourStart, hourEnd) <- (hoursToStart, hoursToEnd).zipped) yield { val data = data.where($"hour" <= lit(hourEnd) && $"hour" >= lit(hourStart)) // do stuff // return a data frame } hourlyDFs.toSeq().reduce(_.union(_))
J'ai eu le même problème et trouvé une solution simple. Voilà:
select unix_timestamp(date_format(current_timestamp, 'HH:mm:ss'), 'HH:mm:ss') < unix_timestamp('03:00:00', 'HH:mm:ss') --Used timestamp for readibility
Vous pouvez également utiliser l'horodatage pour la lisibilité. (Je me demande si nécessaire):
unix_timestamp(datestamp) - unix_timestamp(datestamp) < 10800 --3 hours in seconds
Une solution de contournement pour obtenir le même résultat serait de calculer la somme de la valeur au cours des 3 dernières heures, puis de soustraire la somme de la valeur au cours des 2 dernières heures:
select *, sum(value) over ( partition by a, b order by cast(time_value as timestamp) range between interval 3 hours preceding and current row) - sum(value) over ( partition by a, b order by cast(time_value as timestamp) range between interval 2 hours preceding and current row) as sum_value from my_temp_table;
Les intervalles de plage AFAIK ne fonctionnent pas correctement dans SparkSQL pour le moment, seuls les intervalles basés sur le nombre de lignes sont robustes. Voir ce ticket JIRA issues.apache.org/jira/browse/SPARK-25842. La dépréciation est également signalée dans l'API Scala github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/ org /…
Je vois. Ok, donc je suis parti pour trouver une approche alternative.