3
votes

Spark Dataframe: sélectionnez des lignes distinctes

J'ai essayé deux méthodes pour trouver des rangées distinctes dans le parquet, mais cela ne semble pas fonctionner.
Attemp 1: Dataset df = sqlContext.read (). parquet ("location.parquet"). distinct ();
Mais lance

= SQL ==
SELECT distinct on timestamp * from df
-----------------------------^^^

Attemp 2: J'ai essayé d'exécuter des requêtes SQL:

Dataset<Row> df = sqlContext.read().parquet("location.parquet");
    rawLandingDS.createOrReplaceTempView("df");
    Dataset<Row> landingDF = sqlContext.sql("SELECT distinct on timestamp * from df");

erreur que j'obtiens:

Cannot have map type columns in DataFrame which calls set operations
(intersect, except, etc.), 
but the type of column canvasHashes is map<string,string>;;

Existe-t-il un moyen d'obtenir des enregistrements distincts lors de la lecture de parquet des dossiers? Toute option de lecture que je peux utiliser.


0 commentaires

3 Réponses :


2
votes

Oui, la syntaxe est incorrecte, elle devrait être:

Dataset<Row> landingDF = sqlContext.sql("SELECT distinct * from df");


0 commentaires

5
votes

Le problème auquel vous êtes confronté est explicitement indiqué dans le message d'exception - car les colonnes MapType ne sont ni hachables ni triables ne peuvent pas être utilisées dans le cadre d'une expression de regroupement ou de partitionnement.

Votre vision de la solution SQL n'est pas logiquement équivalent à distinct sur Dataset . Si vous souhaitez dédupliquer des données en fonction d'un ensemble de colonnes compatibles, vous devez utiliser dropDuplicates:

SELECT 
  timestamp, c1, c2, ..., cn,   -- All columns excluding canvasHashes
  first(canvasHashes) AS canvasHashes
FROM df GROUP BY
  timestamp, c1, c2, ..., cn    -- All columns excluding canvasHashes

qui équivaudrait à

XXX

Malheureusement, si votre objectif est réel DISTINCT , ce ne sera pas si facile. Une solution possible consiste à tirer parti du hachage Scala * Map . Vous pouvez définir Scala udf comme ceci:

 df
  .selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes")
  .dropDuplicates(
    // All columns excluding canvasHashes / hash_of_canvas_hashes
    "timestamp",  "c1", "c2", ..., "cn" 
    // Hash used as surrogate of canvasHashes
    "hash_of_canvas_hashes"         
  )

puis l'utiliser dans votre code Java pour dériver une colonne qui peut être utilisé pour dropDuplicates:

spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)

avec l'équivalent SQL

SELECT timestamp, first(c1) AS c1, first(c2) AS c2,  ..., first(cn) AS cn,
       first(canvasHashes) AS canvasHashes
FROM df GROUP BY timestamp

* S'il vous plaît notez que java.util.Map avec son hashCode ne fonctionnera pas, car hashCode n'est pas cohérent.


0 commentaires

2
votes

1) Si vous voulez différencier en fonction des colonnes, vous pouvez l'utiliser

scala> val df = sc.parallelize(Array((1, 2), (3, 4),(3, 4), (1, 6))).toDF("no", "age")



scala> df.show

+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  3|  4|
|  1|  6|
+---+---+


scala> df.dropDuplicates().show()
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  1|  6|
+---+---+

2) Si vous voulez un unique sur toutes les colonnes, utilisez dropduplicate

val df = sc.parallelize(Array((1, 2), (3, 4), (1, 6))).toDF("no", "age")


scala> df.show
+---+---+
| no|age|
+---+---+
|  1|  2|
|  3|  4|
|  1|  6|
+---+---+

val distinctValuesDF = df.select(df("no")).distinct

scala> distinctValuesDF.show
+---+
| no|
+---+
|  1|
|  3|
+---+


0 commentaires