2
votes

Échec de l'exécution de la fonction définie par l'utilisateur sur une trame de données dans Spark (Scala)

J'ai un dataframe df comme le suivant

val te = df.select("path").as[String].collect()
te.foreach(executeRowCount)

Une remarque: le / file / dir diffère. Tous les fichiers ne sont pas stockés dans le même répertoire. En fait, il y a des centaines de fichiers dans différents répertoires.

Ce que je veux accomplir ici est de lire le fichier dans le chemin de la colonne et de compter les enregistrements dans les fichiers et d'écrire le résultat du nombre de lignes dans un nouvelle colonne d'un dataframe.

J'ai essayé la fonction suivante et udf:

org.apache.spark.SparkException: Failed to execute user defined fu
nction($anonfun$1: (string) => bigint)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:28)
        at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)
        ... 19 more

Cela entraîne l'erreur suivante

def executeRowCount(fileCount: String): Long = {
  val rowCount = spark.read.format("csv").option("header", "false").load(fileCount).count
  rowCount
}

val execUdf = udf(executeRowCount _)

df.withColumn("row_count", execUdf (col("path"))).show()

J'ai essayé de parcourir la colonne une fois collectée comme

+--------+--------------------+--------+------+
|      id|                path|somestff| hash1|
+--------+--------------------+--------+------+
|       1|/file/dirA/fileA.txt|      58| 65161|
|       2|/file/dirB/fileB.txt|      52| 65913|
|       3|/file/dirC/fileC.txt|      99|131073|
|       4|/file/dirF/fileD.txt|      46|196233|
+--------+--------------------+--------+------+

et ici cela fonctionne très bien, mais je veux stocker le résultat dans le df. ..

J'ai essayé plusieurs solutions, mais je suis dans une impasse ici.


0 commentaires

4 Réponses :


2
votes

Cela ne fonctionne pas car les trames de données ne peuvent être créées que dans la JVM du pilote, mais le code UDF est exécuté dans les JVM exécuteurs. Ce que vous pouvez faire est de charger les CSV dans un bloc de données séparé et d'enrichir les données avec une colonne de nom de fichier:

val csvs = spark
 .read
 .format("csv")
 .load("/file/dir/")
 .withColumn("filename", input_file_name())

, puis de joindre le df d'origine sur colonne de nom de fichier


4 commentaires

Hey ollik1, le / fichier / dir diffère. Tous les fichiers ne sont pas stockés dans le même répertoire. En fait, il existe des centaines de fichiers dans divers répertoires. Je ne peux pas tout charger à partir de hdfs en une seule étape. les fichiers diffèrent en taille de certains hundret mbs à certains gbs ..


@datanin Il est possible de définir plusieurs emplacements stackoverflow.com/questions/24029873/... . Le chargement du fichier est paresseux, il semble donc au moins intéressant d'essayer s'il fonctionne bien


un dataframe est une collection distribuée de données, donc je ne comprends pas pourquoi je ne peux pas utiliser l'UDF sur le df. Néanmoins, la solution de contournement avec une colonne de nom de fichier est une bonne idée et je vais essayer celle-là. Si cela fonctionne, ce sera ma solution. Merci


Salut ollik1, maintenant je comprends le problème avec le conducteur / travailleur. Ce n'est pas le dataframe mais l'UDF qui n'est pas sérialisable.



0
votes

Et? :

def executeRowCount = udf((fileCount: String) => {
  spark.read.format("csv").option("header", "false").load(fileCount).count
})

df.withColumn("row_count", executeRowCount(col("path"))).show()


0 commentaires

0
votes

Peut-être quelque chose comme ça?

  sqlContext
    .read
    .format("csv")
    .load("/tmp/input/")
    .withColumn("filename", input_file_name())
    .groupBy("filename")
    .agg(count("filename").as("record_count"))
    .show()


0 commentaires

0
votes

J'ai résolu ce problème de la manière suivante:

val queue = df.select("path").as[String].collect()
val countResult = for (item <- queue) yield {
    val rowCount = (item, spark.read.format("csv").option("header", "false").load(item).count)
    rowCount
}

val df2 = spark.createDataFrame(countResult)

Ensuite, j'ai rejoint le df avec df2 ...

Le problème ici est comme @ ollik1 mentionné dans le architecture pilote / travailleur sur udfs. L'UDF n'est pas sérialisable, ce dont j'aurais besoin avec la fonction spark.read.


0 commentaires