Je charge mon fichier CSV dans un bloc de données et je peux le faire, mais je dois ignorer les trois lignes de départ du fichier.
J'ai essayé la commande .option ()
en donnant l'en-tête est vrai mais il ignore la seule première ligne.
val df = spark.sqlContext.read .schema(Myschema) .option("header",true) .option("delimiter", "|") .csv(path)
J'ai pensé à donner l'en-tête en 3 lignes mais je n'ai pas trouvé le moyen de le faire. p >
Autre pensée: ignorez ces 3 lignes du bloc de données
Veuillez m'aider avec ceci. Merci d'avance.
3 Réponses :
Vous pouvez essayer cette option
df.withColumn("Index",monotonically_increasing_id()) .filter(col("Index") > 2) .drop("Index")
Malheureusement, cela ne fonctionnera que si les 3 premières lignes sont dans la première partition. La plupart du temps, ce sera le cas, mais ce n'est pas très sûr à supposer. Essayez de l'exécuter dans un shell par exemple spark.range (4) .withColumn ("Index", monotone_increasing_i d ()). Show ()
Il n'est même pas garanti de fonctionner sur les versions futures, quelles que soient les partitions. Cela fonctionne aujourd'hui, (javadoc, c'est moi qui souligne: "La implémentation actuelle met l'ID de partition dans les 31 bits supérieurs, et le numéro d'enregistrement dans chaque partition dans les 33 bits inférieurs."), Mais le seul le contrat établi par l'API est le suivant: "L'ID généré est garanti d'être monotone croissant et unique, mais pas consécutif".
Une manière générique de gérer votre problème serait d'indexer la trame de données et de filtrer les indices supérieurs à 2.
Approche simple:
Comme suggéré dans un autre réponse, vous pouvez essayer d'ajouter un index avec monotonically_increasing_id
.
scala> zipWithIndex(spark.range(4).toDF("id"), "index").show() +---+-----+ | id|index| +---+-----+ | 0| 0| | 1| 1| | 2| 2| | 3| 3| +---+-----+
Pourtant, cela ne fonctionnera que si les 3 premières lignes sont dans la première partition . De plus, comme mentionné dans les commentaires, c'est le cas aujourd'hui mais ce code peut rompre complètement avec d'autres versions ou étincelles et cela serait très difficile à déboguer. En effet, le contrat dans l'API est juste "L'ID généré est garanti d'être monotone croissant et unique, mais pas consécutif". Il n'est donc pas très sage de supposer qu'ils partiront toujours de zéro. Il pourrait même y avoir d'autres cas dans la version actuelle où cela ne fonctionne pas (je ne suis pas sûr cependant).
Pour illustrer ma première préoccupation, jetez un œil à ceci:
def zipWithIndex(df : DataFrame, name : String) : DataFrame = { val rdd = df.rdd.zipWithIndex .map{ case (row, i) => Row.fromSeq(row.toSeq :+ i) } val newSchema = df.schema .add(StructField(name, LongType, false)) df.sparkSession.createDataFrame(rdd, newSchema) } zipWithIndex(df, "index").where('index > 2).drop("index")
Nous ne supprimerions que deux lignes ...
Approche sûre:
L'approche précédente fonctionnera la plupart du temps mais pour être sûr, vous pouvez utiliser zipWithIndex
de l'API RDD pour obtenir des index consécutifs.
scala> spark.range(4).withColumn("Index",monotonically_increasing_id()).show() +---+----------+ | id| Index| +---+----------+ | 0| 0| | 1| 1| | 2|8589934592| | 3|8589934593| +---+----------+
Nous pouvons vérifier que c'est plus sûr:
df.withColumn("Index",monotonically_increasing_id) .filter('Index > 2) .drop("Index")
Merci beaucoup ... Oli !!. Votre approche Safe a fonctionné pour moi.
Vous pouvez essayer de modifier wrt dans votre schéma.
import org.apache.spark.sql.Row val sqlContext = new org.apache.spark.sql.SQLContext(sc) //Read CSV val file = sc.textFile("csvfilelocation") //Remove first 3 lines val data = file.mapPartitionsWithIndex{ (idx, iter) => if (idx == 0) iter.drop(3) else iter } //Create RowRDD by mapping each line to the required fields val rowRdd = data.map(x=>Row(x(0), x(1))) //create dataframe by calling sqlcontext.createDataframe with rowRdd and your schema val df = sqlContext.createDataFrame(rowRdd, schema)
Merci Deepa ... votre code fonctionne bien mais mon exigence doit également supprimer les deux dernières lignes. Pouvez-vous me suggérer un blog ou des informations concernant la fonction mapPartitionsWithIndex.
val partitions = file.getNumPartitions et ajoutez cette condition après la condition if pour idx = 0, sinon if (idx == partitions - 3) iter.sliding (2) .map (_. head)
Ajoutez un champ de numéro de ligne dans le fichier csv et filtrez les lignes dont le numéro de ligne est supérieur à 3.
Je vois que votre délimiteur est un
|
. Lisez-vous un fichier qui ressemble à ce quedf.show ()
imprimerait? Si tel est le cas, essayez d'ajouter.option ("header", true)
et.option ("comment", "+")
. Sinon, y a-t-il quelque chose qui distingue ces trois lignes?