J'ai récemment travaillé sur des ensembles de données Spark, j'ai un scénario dans lequel Je dois générer un numéro de ligne pour chaque ligne et le stocker dans une colonne nommée "Ids". Ce numéro de ligne commence à 1 , 2, 3 ... et des incréments basés sur le nombre de lignes dans l'ensemble de données. (Dans mon cas, il y a 10000 à 20000 enregistrements)
Considérez, j'ai un ensemble de données 'empDataset' avec des valeurs:
name , dept , project ,Ids -------------------------- Tina, Finance , abc , 1 Leena, Finance , abc , 2 Joe, Marketing , xyz , 1
Maintenant, pour l'ensemble de données ci-dessus, je veux ajouter une Colonne 'Ids' avec des valeurs incrémentées de 1,2,3 .. etc.
Le résultat attendu est le suivant
LongAccumulator accValue = spark.sparkContext().longAccumulator(); long rowNumber = 1; spark.udf().register("randomNumberGenerator", new UDF1<String, Long>() { @Override public Long call(String namCol) throws Exception { accum.add(rowNumber); System.out.println("inside" + accum.value()); return accum.value(); } }, DataTypes.LongType); Dataset<Row> empDatasetWithIds= empDataset.withColumn("Ids",callUDF("randomNumberGenerator", col(name))); Dataset<Row> filterDept = empDatasetWithIds.filter(...here filtering with dept...)
I veulent également stocker cette sortie dans un autre ensemble de données et l'utiliser plus avant pour différentes transformations.
Besoin d'aide pour résoudre ce problème. !!
Mon extrait de code:
name , dept , project ,Ids -------------------------- Tina, Finance , abc , 1 Leena, Finance , abc , 2 Joe, Marketing , xyz , 3
La sortie que j'obtiens est empDatasetWithIds (sortie incorrecte):
name , dept , project --------------------- Tina, Finance , abc Leena, Finance , abc Joe, Marketing , xyz
Le code ci-dessus fonctionne bien lorsqu'il est exécuté sur en mode local mais en mode cluster, les valeurs ne s'incrémentent pas.
J'ai également parcouru les liens ci-dessous: https://community.hortonworks.com/questions/ 36888 / spark-java-accumulator-not-incrementing.html Spark Java Accumulator ne s'incrémente pas
Les accumulateurs d'étincelles nécessitent une action pour déclencher le travail. Dans mon scénario, j'effectue en outre une transformation de filtre sur l'ensemble de données, comment puis-je résoudre ce problème. Besoin d'aide.
3 Réponses :
Pour cette fonctionnalité, vous pouvez utiliser numéro_ligne
import org.apache.spark.sql.expressions.Window import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.row_number; Dataset<Row> empDatasetWithIds = empDataset.withColumn("Ids", row_number().over(Window.orderBy(col("name"), col("dept"), col("project))) )
Référence: https://stackoverflow.com/a/31077759
Comme indiqué dans les commentaires à l'aide de Window sans partition est très inefficace. et Doit être évité dans le code de production pour traiter des données volumineuses.
Votre approche avec l'accumulateur ne fonctionne pas (comme expliqué dans Pourquoi le nœud worker ne voit-il pas les mises à jour de l'accumulateur sur un autre nœud worker? ) depuis Spark exécute ce code dans différents exécuteurs (différents processus jvm s'exécutant sur différentes machines), et chacun a sa propre copie si l'accumulateur.
Je recommande fortement de ne pas faire ça. En effet, l'utilisation d'une fenêtre sans partitionBy
entraînera le regroupement de toutes les données dans une seule partition qui ne se met pas à l'échelle du tout (vous obtenez cet avertissement dans spark WARN WindowExec: No Partition Defined for Window opération! Le déplacement de toutes les données vers une seule partition peut entraîner une sérieuse dégradation des performances.
)
@Oli Je suis d'accord que c'est inefficace. Je préférerais de loin votre solution en utilisant zipWithIndex
.
Vous pouvez faire simplement comme ci-dessous, si l'aspect ascendant séquentiel n'est pas un problème:
import org.apache.spark.sql.functions.monotonically_increasing_id import spark.implicits._ val ds = sc.parallelize(Seq( ("a", -1.0), ("b", -2.0), ("c", -3.0)),5).toDS // Just a dummy DS val newds = ds.withColumn("uniqueIdColumn", monotonically_increasing_id()) newds.show(false)
Essayez-le et adaptez-vous à votre propre situation.
BTW: Wrong utilisation de l'accumulateur.
Les accumulateurs sont des variables utilisées pour accumuler des données dans les exécuteurs et les renvoyer au pilote. Si vous lisez sa valeur à partir de l'exécuteur, le comportement n'est pas défini (AFAIK). Je pense que vous obtiendriez probablement ce qui a été accumulé jusqu'à présent pour la partition locale. En effet, le but de spark est de faire des calculs parallèles. Par conséquent, lors de l'utilisation d'un accumulateur, les données sont accumulées pour chaque partition dans un accumulateur séparé qui sont ensuite fusionnées et renvoyées au pilote (paradigme de réduction de carte). Vous ne pouvez donc pas utiliser un accumulateur pour partager des informations entre les exécuteurs. Ce n'est tout simplement pas ce à quoi cela sert
Ce que vous pouvez faire, c'est utiliser zipWithIndex
de l'API RDD si vous avez besoin d'index consécutifs, ou monoticallyIncreasingId
de l'API SparkSQL si vous avez juste besoin d'indices croissants. Le premier déclenche un petit travail d'étincelle tandis que le second est presque gratuit (pas de travail d'étincelle).
Option 1 (augmentant mais pas nécessairement les indices consécutifs)
StructType schema = yourDataframe.schema(); schema.add(new StructField("id", DataTypes.LongType, false,null)); JavaRDD<Row> rdd = yourDataframe.toJavaRDD().zipWithIndex() .map(x -> { Collection<Object> row = JavaConverters.asJavaCollection(x._1.toSeq()); Long index = x._2; row.add(index); return RowFactory.create(row); }); Dataset<Row> indexedData = spark.createDataFrame(rdd, schema);
Option 2 (indices consécutifs et croissants)
yourDataframe.withColumn("id", functions.monotonicallyIncreasingId());
Merci !! Option 1: "monotonicallyIncreasingId ()" suggéré a aidé à résoudre le problème :)
Je ne suis pas sûr que ce soit exactement un double. La raison pour laquelle le code proposé ici ne fonctionne pas est expliquée dans le post qui est censé être dupliqué. Pourtant, le problème réel qui est abordé ici (indexation d'une dataframe en java) ne l'est clairement pas. Ce problème est abordé dans cet autre article stackoverflow.com/questions/55160683/... . Cela vaudrait-il la peine d'avoir une solution en java, étant donné que c'est un peu plus compliqué?
@elisah Pas convaincu que ce soit une duplication, une technique plus mal choisie.
@Oli voir commentaire