J'ai un dataframe df comme le suivant
val te = df.select("path").as[String].collect() te.foreach(executeRowCount)
Une remarque: le / file / dir diffère. Tous les fichiers ne sont pas stockés dans le même répertoire. En fait, il y a des centaines de fichiers dans différents répertoires.
Ce que je veux accomplir ici est de lire le fichier dans le chemin de la colonne et de compter les enregistrements dans les fichiers et d'écrire le résultat du nombre de lignes dans un nouvelle colonne d'un dataframe.
J'ai essayé la fonction suivante et udf:
org.apache.spark.SparkException: Failed to execute user defined fu nction($anonfun$1: (string) => bigint) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:28) at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25) ... 19 more
Cela entraîne l'erreur suivante
def executeRowCount(fileCount: String): Long = { val rowCount = spark.read.format("csv").option("header", "false").load(fileCount).count rowCount } val execUdf = udf(executeRowCount _) df.withColumn("row_count", execUdf (col("path"))).show()
J'ai essayé de parcourir la colonne une fois collectée comme
+--------+--------------------+--------+------+ | id| path|somestff| hash1| +--------+--------------------+--------+------+ | 1|/file/dirA/fileA.txt| 58| 65161| | 2|/file/dirB/fileB.txt| 52| 65913| | 3|/file/dirC/fileC.txt| 99|131073| | 4|/file/dirF/fileD.txt| 46|196233| +--------+--------------------+--------+------+
et ici cela fonctionne très bien, mais je veux stocker le résultat dans le df. ..
J'ai essayé plusieurs solutions, mais je suis dans une impasse ici.
4 Réponses :
Cela ne fonctionne pas car les trames de données ne peuvent être créées que dans la JVM du pilote, mais le code UDF est exécuté dans les JVM exécuteurs. Ce que vous pouvez faire est de charger les CSV dans un bloc de données séparé et d'enrichir les données avec une colonne de nom de fichier:
val csvs = spark .read .format("csv") .load("/file/dir/") .withColumn("filename", input_file_name())
, puis de joindre le df
d'origine sur colonne de nom de fichier
Hey ollik1, le / fichier / dir diffère. Tous les fichiers ne sont pas stockés dans le même répertoire. En fait, il existe des centaines de fichiers dans divers répertoires. Je ne peux pas tout charger à partir de hdfs en une seule étape. les fichiers diffèrent en taille de certains hundret mbs à certains gbs ..
@datanin Il est possible de définir plusieurs emplacements stackoverflow.com/questions/24029873/... . Le chargement du fichier est paresseux, il semble donc au moins intéressant d'essayer s'il fonctionne bien
un dataframe est une collection distribuée de données, donc je ne comprends pas pourquoi je ne peux pas utiliser l'UDF sur le df. Néanmoins, la solution de contournement avec une colonne de nom de fichier est une bonne idée et je vais essayer celle-là. Si cela fonctionne, ce sera ma solution. Merci
Salut ollik1, maintenant je comprends le problème avec le conducteur / travailleur. Ce n'est pas le dataframe mais l'UDF qui n'est pas sérialisable.
Et? :
def executeRowCount = udf((fileCount: String) => { spark.read.format("csv").option("header", "false").load(fileCount).count }) df.withColumn("row_count", executeRowCount(col("path"))).show()
Peut-être quelque chose comme ça?
sqlContext .read .format("csv") .load("/tmp/input/") .withColumn("filename", input_file_name()) .groupBy("filename") .agg(count("filename").as("record_count")) .show()
J'ai résolu ce problème de la manière suivante:
val queue = df.select("path").as[String].collect() val countResult = for (item <- queue) yield { val rowCount = (item, spark.read.format("csv").option("header", "false").load(item).count) rowCount } val df2 = spark.createDataFrame(countResult)
Ensuite, j'ai rejoint le df avec df2 ...
Le problème ici est comme @ ollik1 mentionné dans le architecture pilote / travailleur sur udfs. L'UDF n'est pas sérialisable, ce dont j'aurais besoin avec la fonction spark.read.