1
votes

La valeur de l'accumulateur d'étincelles ne s'incrémente pas

J'ai récemment travaillé sur des ensembles de données Spark, j'ai un scénario dans lequel Je dois générer un numéro de ligne pour chaque ligne et le stocker dans une colonne nommée "Ids". Ce numéro de ligne commence à 1 , 2, 3 ... et des incréments basés sur le nombre de lignes dans l'ensemble de données. (Dans mon cas, il y a 10000 à 20000 enregistrements)

Considérez, j'ai un ensemble de données 'empDataset' avec des valeurs:

name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 1

Maintenant, pour l'ensemble de données ci-dessus, je veux ajouter une Colonne 'Ids' avec des valeurs incrémentées de 1,2,3 .. etc.

Le résultat attendu est le suivant

LongAccumulator  accValue = spark.sparkContext().longAccumulator();
long rowNumber = 1;

spark.udf().register("randomNumberGenerator", new UDF1<String, Long>() {

            @Override
            public Long call(String namCol) throws Exception {
                    accum.add(rowNumber);
                    System.out.println("inside" + accum.value());
                    return accum.value();
                }
        }, DataTypes.LongType);

Dataset<Row> empDatasetWithIds= empDataset.withColumn("Ids",callUDF("randomNumberGenerator",
                col(name)));

Dataset<Row> filterDept = empDatasetWithIds.filter(...here filtering with dept...)

I veulent également stocker cette sortie dans un autre ensemble de données et l'utiliser plus avant pour différentes transformations.

Besoin d'aide pour résoudre ce problème. !!

Mon extrait de code:

name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 3

La sortie que j'obtiens est empDatasetWithIds (sortie incorrecte):

name , dept , project
---------------------
Tina, Finance , abc
Leena, Finance , abc
Joe, Marketing , xyz

Le code ci-dessus fonctionne bien lorsqu'il est exécuté sur en mode local mais en mode cluster, les valeurs ne s'incrémentent pas.

J'ai également parcouru les liens ci-dessous: https://community.hortonworks.com/questions/ 36888 / spark-java-accumulator-not-incrementing.html Spark Java Accumulator ne s'incrémente pas

Les accumulateurs d'étincelles nécessitent une action pour déclencher le travail. Dans mon scénario, j'effectue en outre une transformation de filtre sur l'ensemble de données, comment puis-je résoudre ce problème. Besoin d'aide.


3 commentaires

Je ne suis pas sûr que ce soit exactement un double. La raison pour laquelle le code proposé ici ne fonctionne pas est expliquée dans le post qui est censé être dupliqué. Pourtant, le problème réel qui est abordé ici (indexation d'une dataframe en java) ne l'est clairement pas. Ce problème est abordé dans cet autre article stackoverflow.com/questions/55160683/... . Cela vaudrait-il la peine d'avoir une solution en java, étant donné que c'est un peu plus compliqué?


@elisah Pas convaincu que ce soit une duplication, une technique plus mal choisie.


@Oli voir commentaire


3 Réponses :


-1
votes

Pour cette fonctionnalité, vous pouvez utiliser numéro_ligne

import org.apache.spark.sql.expressions.Window
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.row_number;

Dataset<Row> empDatasetWithIds = empDataset.withColumn("Ids", 
    row_number().over(Window.orderBy(col("name"), col("dept"), col("project)))
)

Référence: https://stackoverflow.com/a/31077759

Comme indiqué dans les commentaires à l'aide de Window sans partition est très inefficace. et Doit être évité dans le code de production pour traiter des données volumineuses.

Votre approche avec l'accumulateur ne fonctionne pas (comme expliqué dans Pourquoi le nœud worker ne voit-il pas les mises à jour de l'accumulateur sur un autre nœud worker? ) depuis Spark exécute ce code dans différents exécuteurs (différents processus jvm s'exécutant sur différentes machines), et chacun a sa propre copie si l'accumulateur.


2 commentaires

Je recommande fortement de ne pas faire ça. En effet, l'utilisation d'une fenêtre sans partitionBy entraînera le regroupement de toutes les données dans une seule partition qui ne se met pas à l'échelle du tout (vous obtenez cet avertissement dans spark WARN WindowExec: No Partition Defined for Window opération! Le déplacement de toutes les données vers une seule partition peut entraîner une sérieuse dégradation des performances. )


@Oli Je suis d'accord que c'est inefficace. Je préférerais de loin votre solution en utilisant zipWithIndex .



0
votes

Vous pouvez faire simplement comme ci-dessous, si l'aspect ascendant séquentiel n'est pas un problème:

import org.apache.spark.sql.functions.monotonically_increasing_id 
import spark.implicits._

val ds = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0)),5).toDS   // Just a dummy DS

val newds = ds.withColumn("uniqueIdColumn", monotonically_increasing_id())

newds.show(false)

Essayez-le et adaptez-vous à votre propre situation.

BTW: Wrong utilisation de l'accumulateur.


0 commentaires

2
votes

Les accumulateurs sont des variables utilisées pour accumuler des données dans les exécuteurs et les renvoyer au pilote. Si vous lisez sa valeur à partir de l'exécuteur, le comportement n'est pas défini (AFAIK). Je pense que vous obtiendriez probablement ce qui a été accumulé jusqu'à présent pour la partition locale. En effet, le but de spark est de faire des calculs parallèles. Par conséquent, lors de l'utilisation d'un accumulateur, les données sont accumulées pour chaque partition dans un accumulateur séparé qui sont ensuite fusionnées et renvoyées au pilote (paradigme de réduction de carte). Vous ne pouvez donc pas utiliser un accumulateur pour partager des informations entre les exécuteurs. Ce n'est tout simplement pas ce à quoi cela sert

Ce que vous pouvez faire, c'est utiliser zipWithIndex de l'API RDD si vous avez besoin d'index consécutifs, ou monoticallyIncreasingId de l'API SparkSQL si vous avez juste besoin d'indices croissants. Le premier déclenche un petit travail d'étincelle tandis que le second est presque gratuit (pas de travail d'étincelle).

Option 1 (augmentant mais pas nécessairement les indices consécutifs)

StructType schema = yourDataframe.schema();
schema.add(new StructField("id", DataTypes.LongType, false,null));
JavaRDD<Row> rdd = yourDataframe.toJavaRDD().zipWithIndex()
    .map(x -> {
         Collection<Object> row = JavaConverters.asJavaCollection(x._1.toSeq());
         Long index = x._2;
         row.add(index);
         return RowFactory.create(row);
    });
Dataset<Row> indexedData = spark.createDataFrame(rdd, schema);

Option 2 (indices consécutifs et croissants)

yourDataframe.withColumn("id", functions.monotonicallyIncreasingId());


1 commentaires

Merci !! Option 1: "monotonicallyIncreasingId ()" suggéré a aidé à résoudre le problème :)