2
votes

Je dois ignorer trois lignes du dataframe lors du chargement à partir d'un fichier CSV dans scala

Je charge mon fichier CSV dans un bloc de données et je peux le faire, mais je dois ignorer les trois lignes de départ du fichier.

J'ai essayé la commande .option () en donnant l'en-tête est vrai mais il ignore la seule première ligne.

val df = spark.sqlContext.read
    .schema(Myschema)
    .option("header",true)
    .option("delimiter", "|")
    .csv(path)

J'ai pensé à donner l'en-tête en 3 lignes mais je n'ai pas trouvé le moyen de le faire. p >

Autre pensée: ignorez ces 3 lignes du bloc de données

Veuillez m'aider avec ceci. Merci d'avance.


2 commentaires

Ajoutez un champ de numéro de ligne dans le fichier csv et filtrez les lignes dont le numéro de ligne est supérieur à 3.


Je vois que votre délimiteur est un | . Lisez-vous un fichier qui ressemble à ce que df.show () imprimerait? Si tel est le cas, essayez d'ajouter .option ("header", true) et .option ("comment", "+") . Sinon, y a-t-il quelque chose qui distingue ces trois lignes?


3 Réponses :


-1
votes

Vous pouvez essayer cette option

df.withColumn("Index",monotonically_increasing_id())
        .filter(col("Index") > 2)
        .drop("Index")


2 commentaires

Malheureusement, cela ne fonctionnera que si les 3 premières lignes sont dans la première partition. La plupart du temps, ce sera le cas, mais ce n'est pas très sûr à supposer. Essayez de l'exécuter dans un shell par exemple spark.range (4) .withColumn ("Index", monotone_increasing_i‌ d ()). Show ()


Il n'est même pas garanti de fonctionner sur les versions futures, quelles que soient les partitions. Cela fonctionne aujourd'hui, (javadoc, c'est moi qui souligne: "La implémentation actuelle met l'ID de partition dans les 31 bits supérieurs, et le numéro d'enregistrement dans chaque partition dans les 33 bits inférieurs."), Mais le seul le contrat établi par l'API est le suivant: "L'ID généré est garanti d'être monotone croissant et unique, mais pas consécutif".



1
votes

Une manière générique de gérer votre problème serait d'indexer la trame de données et de filtrer les indices supérieurs à 2.

Approche simple:

Comme suggéré dans un autre réponse, vous pouvez essayer d'ajouter un index avec monotonically_increasing_id.

scala> zipWithIndex(spark.range(4).toDF("id"), "index").show()
+---+-----+
| id|index|
+---+-----+
|  0|    0|
|  1|    1|
|  2|    2|
|  3|    3|
+---+-----+

Pourtant, cela ne fonctionnera que si les 3 premières lignes sont dans la première partition . De plus, comme mentionné dans les commentaires, c'est le cas aujourd'hui mais ce code peut rompre complètement avec d'autres versions ou étincelles et cela serait très difficile à déboguer. En effet, le contrat dans l'API est juste "L'ID généré est garanti d'être monotone croissant et unique, mais pas consécutif". Il n'est donc pas très sage de supposer qu'ils partiront toujours de zéro. Il pourrait même y avoir d'autres cas dans la version actuelle où cela ne fonctionne pas (je ne suis pas sûr cependant).

Pour illustrer ma première préoccupation, jetez un œil à ceci:

def zipWithIndex(df : DataFrame, name : String) : DataFrame = {
  val rdd = df.rdd.zipWithIndex
    .map{ case (row, i) => Row.fromSeq(row.toSeq :+ i) }
  val newSchema = df.schema
    .add(StructField(name, LongType, false))
  df.sparkSession.createDataFrame(rdd, newSchema)
}
zipWithIndex(df, "index").where('index > 2).drop("index")

Nous ne supprimerions que deux lignes ...

Approche sûre:

L'approche précédente fonctionnera la plupart du temps mais pour être sûr, vous pouvez utiliser zipWithIndex de l'API RDD pour obtenir des index consécutifs.

scala> spark.range(4).withColumn("Index",monotonically_increasing_id()).show()
+---+----------+
| id|     Index|
+---+----------+
|  0|         0|
|  1|         1|
|  2|8589934592|
|  3|8589934593|
+---+----------+

Nous pouvons vérifier que c'est plus sûr:

df.withColumn("Index",monotonically_increasing_id)
  .filter('Index > 2)
  .drop("Index")


1 commentaires

Merci beaucoup ... Oli !!. Votre approche Safe a fonctionné pour moi.



-1
votes

Vous pouvez essayer de modifier wrt dans votre schéma.

 import org.apache.spark.sql.Row
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)

  //Read CSV
  val file = sc.textFile("csvfilelocation")

  //Remove first 3 lines
  val data = file.mapPartitionsWithIndex{ (idx, iter) => if (idx == 0) iter.drop(3) else iter }

  //Create RowRDD by mapping each line to the required fields 
  val rowRdd = data.map(x=>Row(x(0), x(1)))

  //create dataframe by calling sqlcontext.createDataframe with rowRdd and your schema   
  val df = sqlContext.createDataFrame(rowRdd, schema)


2 commentaires

Merci Deepa ... votre code fonctionne bien mais mon exigence doit également supprimer les deux dernières lignes. Pouvez-vous me suggérer un blog ou des informations concernant la fonction mapPartitionsWithIndex.


val partitions = file.getNumPartitions et ajoutez cette condition après la condition if pour idx = 0, sinon if (idx == partitions - 3) iter.sliding (2) .map (_. head)