Nous avons observé un comportement étrange lorsque nous avons essayé de démarrer un certain nombre de contrats à terme dans la méthode de réception d'un acteur. Si nous utilisons nos répartiteurs configurés comme exécutionContext, l'avenir exécuté sur le même thread et de manière séquentielle. Si nous utilisons exécutionContext.implicits.global, l'avenir exécuté parallèlement comme prévu.
Nous avons fait bouillir le code à l'exemple suivant (un exemple plus complet est ci-dessous): P>
import akka.actor.ActorSystem import scala.concurrent.{ExecutionContext, Future} object WhyNotParallelExperiment extends App { val actorSystem = ActorSystem(s"Experimental") // Futures not started in future: running in parallel startFutures(runInFuture = false)(actorSystem.dispatcher) Thread.sleep(5000) // Futures started in future: running in sequentially. Why???? startFutures(runInFuture = true)(actorSystem.dispatcher) Thread.sleep(5000) actorSystem.terminate() private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = { if (runInFuture) { Future{ println(s"Start Futures on thread ${Thread.currentThread().getName()}") (1 to 9).foreach(startFuture) println(s"Started Futures on thread ${Thread.currentThread().getName()}") } } else { (11 to 19).foreach(startFuture) } } private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{ println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}") Thread.sleep(500) println(s"Future $id finished on thread ${Thread.currentThread().getName()}") } }
3 Réponses :
Cela a à voir avec le paramètre "débit" pour le répartiteur. J'ai ajouté un "fant-réveil" à appliquer.conf pour démontrer ceci: Voici votre exemple avec quelques modifications pour utiliser le répartiteur équitable pour les contrats à terme et imprimer la valeur actuelle du paramètre de débit: p> sortie: p> Comme vous pouvez le constater, le répartiteur équitable utilise des threads différents pour la plupart des futurs. < / p> Le répartiteur par défaut est optimisé pour les acteurs afin que le débit soit défini sur 5 pour minimiser les commutateurs de contexte pour améliorer le débit de traitement des messages tout en maintenant une certaine équité. p> Le seul changement de mon répartiteur équitable est le débit: 1, c'est-à-dire que chaque demande d'exécution ASYNC est donnée son propre fil si possible (jusqu'à parallélisme-max). P> Je voudrais Recommander de créer des dispatchers distincts pour les futurs utilisés à des fins différentes. Par exemple. Un répartiteur (piscine thread) pour appeler des services Web, un autre pour bloquer l'accès à la base de données DB, etc. Cela vous donnerait un contrôle plus précis sur celui-ci en modifiant les paramètres de répartiteur personnalisés. P> regardez un coup d'oeil à Découvrez également les paramètres de référence AKKA (Dispatchers par défaut en particulier), il existe un tas de commentaires utiles là-bas: https://github.com/akka/akka/blob/master/akka-actor/src/main/resources/ référence.conf p> p>
Merci pour votre réponse. Malheureusement, cela ne résout pas notre problème. Dans votre correctif proposé, les futurs fonctionnent en parallèle parce qu'ils utilisent un répartiteur différent. Si vous utilisez le même «répartiteur équitable» pour le futur ci-joint, ils fonctionnent toujours séquentiellement.
Après quelques recherches, j'ai découvert que le Ceci semble être raisonnable pour le Quelqu'un peut-il confirmer que cette solution va bien?
Y a-t-il de meilleurs moyens de résoudre ce problème?
Est la mise en œuvre actuelle de Dispatcher Code> Classe Implememts
akka.dispatch.batchingexecutor code>. Pour des raisons de performances, cette classe vérifie que les tâches doivent être lâchées sur le même thread.
futur.map code> crée en interne un
scala.concurrent.oncompleterunnable code> qui est lotré dans le
BatchingExecutor code>.
Carte () Code> /
Flatmap () Code> Lorsqu'une tâche génère une tâche ultérieure, mais pas pour de nouveaux contrats à terme explicites utilisés pour travaux de fourche.
Interne,
futur.apply code> est implémenté par
futur.succassé (). Mapper code> et est donc lotré. Ma solution de contournement est maintenant de créer des contrats à terme d'une manière différente: p>
futureestarter code> -Runnables ne sont pas lots et fonctionnent ainsi en parallèle. P >
futur code> /
BatchingExecutor code> recherché ou est-ce un bug? P> p>
de la description de l'interne d'Akka BatchingExecutor code>
(emphasis mine):
trait de mixine pour un exécuteur exécutant qui
Groupes multiples imbriqués Runnable.run () code> appelle à un seul exécutable transmis à l'exécuteur exécutif d'origine forte>. Cela peut être une optimisation utile car elle contourne la file d'attente du contexte original et conserve le code associé (imbriqué) sur un seul fil qui peut améliorer l'affinité de la CPU. Toutefois, si les tâches transmises à l'exécuteur de l'exécuteur bloquent ou coûteuses, cette optimisation peut empêcher le vol-oeuvre et la performance pire ... Un exécuteur de commandes peut créer des blocages si le code n'utilise pas
scala.concurrent.blocking code > Lorsque cela devrait, car les tâches créées dans d'autres tâches bloqueront la tâche externe terminée. P> blockQuote>
Si vous utilisez un répartiteur qui se mélange dans
BatchingExecutor code> - à savoir une sous-classe de
MessageDisPatcher code>
peut utiliser lescala.concurrent.blocking code> construire sur Activer le parallélisme avec des contrats à terme imbriqués: p>
xxx pré> dans votre exemple, vous ajouteriez
bloquant code> dans la méthode
Startfuture CODE>: P>
Start Futures on thread Experimental-akka.actor.default-dispatcher-2 Started Futures on thread Experimental-akka.actor.default-dispatcher-2 Future 1 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2 Future 3 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3 Future 5 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6 Future 7 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7 Future 4 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5 Future 9 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10 Future 6 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8 Future 8 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9 Future 2 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4 Future 1 finished on thread Experimental-akka.actor.default-dispatcher-2 Future 3 finished on thread Experimental-akka.actor.default-dispatcher-3 Future 5 finished on thread Experimental-akka.actor.default-dispatcher-6 Future 4 finished on thread Experimental-akka.actor.default-dispatcher-5 Future 8 finished on thread Experimental-akka.actor.default-dispatcher-9 Future 7 finished on thread Experimental-akka.actor.default-dispatcher-7 Future 9 finished on thread Experimental-akka.actor.default-dispatcher-10 Future 6 finished on thread Experimental-akka.actor.default-dispatcher-8 Future 2 finished on thread Experimental-akka.actor.default-dispatcher-4
N'a pas eu scala.concurrent.blocking code> sur le radar - cela semble faire le travail. Merci encore pour vos commentaires!