4
votes

Spark SQL Window sur un intervalle entre deux limites de temps spécifiées - il y a entre 3 heures et 2 heures

Quelle est la bonne façon de spécifier l'intervalle de fenêtre dans Spark SQL, en utilisant deux limites prédéfinies?

J'essaie de résumer les valeurs de ma table sur une fenêtre de "il y a entre 3 heures et il y a 2 heures".

/ p>

Quand j'exécute cette requête:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 preceding
) as sum_value
from my_temp_table;

Cela fonctionne. J'obtiens des résultats que j'attends, c'est-à-dire des sommes de valeurs qui tombent dans une fenêtre glissante de 2 heures.

Maintenant, ce dont j'ai besoin, c'est que cette fenêtre déroulante ne soit pas liée à la ligne courante mais qu'elle prenne en compte les lignes entre il y a 3 heures et il y a 2 heures. J'ai essayé avec:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and interval 2 hours preceding
) as sum_value
from my_temp_table;

Mais j'obtiens une entrée superflue 'heures' en attendant une erreur {'PRECEDING', 'FOLLOWING'} .

J'ai aussi essayé avec:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 hours preceding
) as sum_value
from my_temp_table;

mais j'obtiens une erreur différente scala.MatchError: CalendarIntervalType (de classe org.apache.spark.sql.types. CalendarIntervalType $)

La troisième option que j'ai essayée est:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 2 hours preceding and current row
) as sum_value
from my_temp_table;

et cela ne fonctionne pas comme prévu: ne peut pas résoudre "RANGE BETWEEN interval 3 hours PRECEDING AND 2 PRECEDING" en raison d'une incompatibilité de type de données

J'ai des difficultés à trouver les documents pour le type d'intervalle comme ce lien n'en dit pas assez et d'autres informations sont un peu à moitié cuites. Au moins ce que j'ai trouvé.


2 commentaires

Les intervalles de plage AFAIK ne fonctionnent pas correctement dans SparkSQL pour le moment, seuls les intervalles basés sur le nombre de lignes sont robustes. Voir ce ticket JIRA issues.apache.org/jira/browse/SPARK-25842. La dépréciation est également signalée dans l'API Scala github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/‌ org /…


Je vois. Ok, donc je suis parti pour trouver une approche alternative.


3 Réponses :


0
votes

Étant donné que les intervalles de distance ne fonctionnaient pas, j'ai dû me tourner vers une approche alternative. Cela ressemble à quelque chose comme ceci:

  • préparer une liste d'intervalles pour lesquels le calcul doit être effectué
  • pour chacun des intervalles, lancez le calcul
    • chacune de ces itérations produit un bloc de données
  • après les itérations, nous avons une liste de blocs de données
  • union des blocs de données de la liste en un seul bloc de données plus grand
  • rédigez les résultats

Dans mon cas, je devais exécuter des calculs pour chaque heure de la journée et combiner ces résultats "horaires", c'est-à-dire une liste de 24 blocs de données, en un seul bloc de données "quotidien". p>

Le code, du point de vue de très haut niveau, ressemble à ceci:

val hourlyDFs = for ((hourStart, hourEnd) <- (hoursToStart, hoursToEnd).zipped) yield {
    val data = data.where($"hour" <= lit(hourEnd) && $"hour" >= lit(hourStart))
    // do stuff
    // return a data frame
}
hourlyDFs.toSeq().reduce(_.union(_))


0 commentaires

0
votes

J'ai eu le même problème et trouvé une solution simple. Voilà:

select unix_timestamp(date_format(current_timestamp, 'HH:mm:ss'), 'HH:mm:ss') <
       unix_timestamp('03:00:00', 'HH:mm:ss') --Used timestamp for readibility

Vous pouvez également utiliser l'horodatage pour la lisibilité. (Je me demande si nécessaire):

unix_timestamp(datestamp) - unix_timestamp(datestamp) < 10800 --3 hours in seconds 


0 commentaires

0
votes

Une solution de contournement pour obtenir le même résultat serait de calculer la somme de la valeur au cours des 3 dernières heures, puis de soustraire la somme de la valeur au cours des 2 dernières heures:

select *, 
sum(value) over (
     partition by a, b
     order by cast(time_value as timestamp)
     range between interval 3 hours preceding and current row) 
- 
sum(value) over (
     partition by a, b
     order by cast(time_value as timestamp)
     range between interval 2 hours preceding and current row) 
as sum_value
from my_temp_table;


0 commentaires