J'ai essayé deux méthodes pour trouver des rangées distinctes dans le parquet, mais cela ne semble pas fonctionner.
Attemp 1:
Dataset
Mais lance
= SQL == SELECT distinct on timestamp * from df -----------------------------^^^
Attemp 2: J'ai essayé d'exécuter des requêtes SQL:
Dataset<Row> df = sqlContext.read().parquet("location.parquet"); rawLandingDS.createOrReplaceTempView("df"); Dataset<Row> landingDF = sqlContext.sql("SELECT distinct on timestamp * from df");
erreur que j'obtiens:
Cannot have map type columns in DataFrame which calls set operations (intersect, except, etc.), but the type of column canvasHashes is map<string,string>;;
Existe-t-il un moyen d'obtenir des enregistrements distincts lors de la lecture de parquet des dossiers? Toute option de lecture que je peux utiliser.
3 Réponses :
Oui, la syntaxe est incorrecte, elle devrait être:
Dataset<Row> landingDF = sqlContext.sql("SELECT distinct * from df");
Le problème auquel vous êtes confronté est explicitement indiqué dans le message d'exception - car les colonnes MapType
ne sont ni hachables ni triables ne peuvent pas être utilisées dans le cadre d'une expression de regroupement ou de partitionnement.
Votre vision de la solution SQL n'est pas logiquement équivalent à distinct
sur Dataset
. Si vous souhaitez dédupliquer des données en fonction d'un ensemble de colonnes compatibles, vous devez utiliser dropDuplicates
:
SELECT timestamp, c1, c2, ..., cn, -- All columns excluding canvasHashes first(canvasHashes) AS canvasHashes FROM df GROUP BY timestamp, c1, c2, ..., cn -- All columns excluding canvasHashes
qui équivaudrait à
XXX
Malheureusement, si votre objectif est réel DISTINCT
, ce ne sera pas si facile. Une solution possible consiste à tirer parti du hachage Scala * Map
. Vous pouvez définir Scala udf
comme ceci:
df .selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes") .dropDuplicates( // All columns excluding canvasHashes / hash_of_canvas_hashes "timestamp", "c1", "c2", ..., "cn" // Hash used as surrogate of canvasHashes "hash_of_canvas_hashes" )
puis l'utiliser dans votre code Java pour dériver une colonne qui peut être utilisé pour dropDuplicates
:
spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)
avec l'équivalent SQL
SELECT timestamp, first(c1) AS c1, first(c2) AS c2, ..., first(cn) AS cn, first(canvasHashes) AS canvasHashes FROM df GROUP BY timestamp
* S'il vous plaît notez que java.util.Map
avec son hashCode
ne fonctionnera pas, car hashCode
n'est pas cohérent.
1) Si vous voulez différencier en fonction des colonnes, vous pouvez l'utiliser
scala> val df = sc.parallelize(Array((1, 2), (3, 4),(3, 4), (1, 6))).toDF("no", "age") scala> df.show +---+---+ | no|age| +---+---+ | 1| 2| | 3| 4| | 3| 4| | 1| 6| +---+---+ scala> df.dropDuplicates().show() +---+---+ | no|age| +---+---+ | 1| 2| | 3| 4| | 1| 6| +---+---+
2) Si vous voulez un unique sur toutes les colonnes, utilisez dropduplicate
val df = sc.parallelize(Array((1, 2), (3, 4), (1, 6))).toDF("no", "age") scala> df.show +---+---+ | no|age| +---+---+ | 1| 2| | 3| 4| | 1| 6| +---+---+ val distinctValuesDF = df.select(df("no")).distinct scala> distinctValuesDF.show +---+ | no| +---+ | 1| | 3| +---+