11
votes

L'accumulateur échoue sur le cluster, fonctionne localement

Dans la documentation officielle Spark, il existe un exemple pour un accumulateur utilisé dans un appel pour Acheach qui est directement sur un RDD: xxx p> i Mise en œuvre Mon propre accumulateur: xxx

dans un réglage local, cela fonctionne simplement bien. Cependant, si j'exécute ce travail sur un cluster autonome Spark avec plusieurs machines, les travailleurs jettent un xxx

sur la ligne qui incrémente l'accumulateur myCounter .

Ma question est la suivante: Les accumulateurs peuvent-ils seulement être utilisés uniquement dans des fonctions anonymes "de haut niveau" qui sont appliquées directement sur RDD et non dans des fonctions imbriquées? Si oui, pourquoi mon appel réussit-il localement et échoue sur un cluster?

EDIT : Verbosité accrue de l'exception.


8 commentaires

Pourriez-vous poster plus de la trace de travailleur?


Avez-vous essayé sc.broadcast (myCounter) ?


Ne pas de diffusion renvoie une valeur en lecture seule? De API officiel Docs : "Diffusion Une variable en lecture seule au cluster, renvoyant un objet de diffusion pour la lire dans des fonctions distribuées. La variable sera envoyée à chaque cluster une seule fois. "


Bon point. Peu importe, il semble que Spark n'envoie pas votre accumulateur au cluster, d'où le pointeur NULL. Je ne sais pas comment travailler autour d'cela sauf modifier un peu votre algorithme.


Vous ne devriez pas utiliser SC.Broadcast () avec des accumulateurs.


@Ptikobj: J'ai essayé de courir (essentiellement) ce code dans le mode de cluster local de Spark, que l'unité d'étincelle teste l'utilisation pour créer un cluster autonome multi-processus. Malheureusement, je n'ai pas pu reproduire ce problème. Quelle version de Spark utilisez-vous? Pouvez-vous isoler le problème à un petit échantillon de code et poster un programme complet qui me permettra de reproduire cette exception? La trace de traçabilité contenait-elle d'autres sections (comme "causées par: ...")?


Le problème semble être dans votre méthode FOO. Que se passe-t-il là?


Votre code contenait-il un objet objet ou une classe ?


3 Réponses :


1
votes

Et si vous définissez la fonction comme ceci: xxx

puis appelez-le comme ceci: xxx

?


1 commentaires

Cela semble être correct, vous pouvez transmettre l'accumulateur que vous avez créé sur la méthode



-1
votes

Si vous utilisez "platmap", "myCounter" ne sera pas mise à jour car "platmap" est une fonction paresseuse. Vous pouvez utiliser ce code:

myRDD.foreach(line => foo(line))
def foo(line: String) = {myCounter +=1}
println(myCounter.value)


0 commentaires

3
votes

Dans mon cas également, l'accumulateur était NULL dans la fermeture lorsque j'ai utilisé "Extends App 'pour créer une application Spark comme indiqué ci-dessous" PRE> XXX

I remplacé étend l'application avec la méthode principale () et elle Travaillé dans le cluster de fil dans HDP 2.4 xxx

travaillé


1 commentaires

Merci! Cela sauve mes 2 jours de débogage. Est-ce que quelqu'un sait pourquoi scala.app ne fonctionne pas ici?