Dans la documentation officielle Spark, il existe un exemple pour un accumulateur utilisé dans un appel code> pour Acheach code> qui est directement sur un RDD: dans un réglage local, cela fonctionne simplement bien. Cependant, si j'exécute ce travail sur un cluster autonome Spark avec plusieurs machines, les travailleurs jettent un p> sur la ligne qui incrémente l'accumulateur Ma question est la suivante: Les accumulateurs peuvent-ils seulement être utilisés uniquement dans des fonctions anonymes "de haut niveau" qui sont appliquées directement sur RDD et non dans des fonctions imbriquées?
Si oui, pourquoi mon appel réussit-il localement et échoue sur un cluster? P> EDIT STROR>: Verbosité accrue de l'exception. P> P> myCounter code> . P>
3 Réponses :
Et si vous définissez la fonction comme ceci: puis appelez-le comme ceci: p> ? P> p>
Cela semble être correct, vous pouvez transmettre l'accumulateur que vous avez créé sur la méthode
Si vous utilisez "platmap", "myCounter" ne sera pas mise à jour car "platmap" est une fonction paresseuse. Vous pouvez utiliser ce code:
myRDD.foreach(line => foo(line)) def foo(line: String) = {myCounter +=1} println(myCounter.value)
Dans mon cas également, l'accumulateur était NULL dans la fermeture lorsque j'ai utilisé "Extends App 'pour créer une application Spark comme indiqué ci-dessous" PRE> XXX PRE>
I remplacé étend l'application avec la méthode principale () et elle Travaillé dans le cluster de fil dans HDP 2.4 p> travaillé p> p>
Merci! Cela sauve mes 2 jours de débogage. Est-ce que quelqu'un sait pourquoi scala.app ne fonctionne pas ici?
Pourriez-vous poster plus de la trace de travailleur?
Avez-vous essayé
sc.broadcast (myCounter) code>?
Ne
pas de diffusion code> renvoie une valeur en lecture seule? De API officiel Docs : "Diffusion Une variable en lecture seule au cluster, renvoyant un objet de diffusion pour la lire dans des fonctions distribuées. La variable sera envoyée à chaque cluster une seule fois. "
Bon point. Peu importe, il semble que Spark n'envoie pas votre accumulateur au cluster, d'où le pointeur NULL. Je ne sais pas comment travailler autour d'cela sauf modifier un peu votre algorithme.
Vous ne devriez pas utiliser SC.Broadcast () avec des accumulateurs.
@Ptikobj: J'ai essayé de courir (essentiellement) ce code dans le mode de cluster local de Spark, que l'unité d'étincelle teste l'utilisation pour créer un cluster autonome multi-processus. Malheureusement, je n'ai pas pu reproduire ce problème. Quelle version de Spark utilisez-vous? Pouvez-vous isoler le problème à un petit échantillon de code et poster un programme complet qui me permettra de reproduire cette exception? La trace de traçabilité contenait-elle d'autres sections (comme "causées par: ...")?
Le problème semble être dans votre méthode FOO. Que se passe-t-il là?
Votre code contenait-il un objet
objet code> ou une classe
code>?