1
votes

Conversion d'horodatage en époque dans Spark (Java)

J'ai une colonne de type Horodatage au format aaaa-MM-jj HH: mm: ss dans un dataframe.

La colonne est triée par heure où la date antérieure est à la ligne précédente

Quand j'ai exécuté cette commande

[670] : 1550967304 (2019-02-23 04:30:15)
[671] : 1420064100 (2019-02-24 08:15:04)

Je suis confronté à un problème étrange où la valeur de la date ultérieure est plus petite que la date antérieure. Exemple:

List<Row> timeRows = df.withColumn(ts, df.col(ts).cast("long")).select(ts).collectAsList();

Est-ce la bonne façon de convertir en Epoch ou y a-t-il une autre façon?


2 commentaires

Version Spark? Comment chargez-vous df? À partir du fichier?


Spark 2.4.0. Chargement du df à partir de CSV


3 Réponses :


0
votes

Vous devez utiliser la fonction intégrée unix_timestamp () dans org.apache.spark.sql.functions

https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/functions.html#unix_timestamp ()


0 commentaires

2
votes

Essayez d'utiliser unix_timestamp pour convertir la chaîne date / heure en horodatage. Selon le document:

unix_timestamp (Colonne s, Chaîne p) Convertit la chaîne de temps avec motif (voir [ http://docs.oracle.com/javase/tutorial/i18n /format/simpleDateFormat.html ]) à l'horodatage Unix (en secondes), renvoie null en cas d'échec.

import org.apache.spark.functions._  

val format = "yyyy-MM-dd HH:mm:ss"
df.withColumn("epoch_sec", unix_timestamp($"ts", format)).select("epoch_sec").collectAsList()

Voir également https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-datetime.html


0 commentaires

0
votes

Je pense que vous envisagez d'utiliser: unix_timestamp()

À partir duquel vous pouvez importer:

...
[994] : 1551997326 (2019-03-07 14:22:06)
[995] : 1551997329 (2019-03-07 14:22:09)
[996] : 1551997330 (2019-03-07 14:22:10)
[997] : 1551997332 (2019-03-07 14:22:12)
[998] : 1551997333 (2019-03-07 14:22:13)
[999] : 1551997335 (2019-03-07 14:22:15)

Et utiliser comme:

package net.jgp.books.spark.ch12.lab990_others;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.from_unixtime;
import static org.apache.spark.sql.functions.unix_timestamp;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * Use of from_unixtime() and unix_timestamp().
 * 
 * @author jgp
 */
public class EpochTimestampConversionApp {

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    EpochTimestampConversionApp app = new EpochTimestampConversionApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("expr()")
        .master("local")
        .getOrCreate();

    StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "event",
            DataTypes.IntegerType,
            false),
        DataTypes.createStructField(
            "original_ts",
            DataTypes.StringType,
            false) });

    // Building a df with a sequence of chronological timestamps
    List<Row> rows = new ArrayList<>();
    long now = System.currentTimeMillis() / 1000;
    for (int i = 0; i < 1000; i++) {
      rows.add(RowFactory.create(i, String.valueOf(now)));
      now += new Random().nextInt(3) + 1;
    }
    Dataset<Row> df = spark.createDataFrame(rows, schema);
    df.show();
    df.printSchema();

    // Turning the timestamps to Timestamp datatype
    df = df.withColumn(
        "date",
        from_unixtime(col("original_ts")).cast(DataTypes.TimestampType));
    df.show();
    df.printSchema();

    // Turning back the timestamps to epoch
    df = df.withColumn(
        "epoch",
        unix_timestamp(col("date")));
    df.show();
    df.printSchema();

    // Collecting the result and printing out
    List<Row> timeRows = df.collectAsList();
    for (Row r : timeRows) {
      System.out.printf("[%d] : %s (%s)\n",
          r.getInt(0),
          r.getAs("epoch"),
          r.getAs("date"));
    }
  }
}

Et voici un exemple complet, où j'ai essayé d'imiter votre cas d'utilisation:

df = df.withColumn(
    "epoch",
    unix_timestamp(col("date")));

Et la sortie devrait be:

import static org.apache.spark.sql.functions.unix_timestamp;

J'espère que cela vous aidera.


2 commentaires

cette méthode fonctionnera si la colonne ts est au format d'une chaîne affichant déjà l'époque. Je recherche une solution qui convertit un type Timestamp en un type Long (Epoch)


Hey Yorel, mon mauvais, je suis allé un peu trop vite, regardez l'exemple et dites-moi s'il correspond mieux à vos attentes (et alors votez? :)) - Merci