7
votes

Pourquoi les futurs sont-ils dans les futurs exécutés séquentiellement lorsqu'ils ont commencé sur Akka Dispatcher

Nous avons observé un comportement étrange lorsque nous avons essayé de démarrer un certain nombre de contrats à terme dans la méthode de réception d'un acteur. Si nous utilisons nos répartiteurs configurés comme exécutionContext, l'avenir exécuté sur le même thread et de manière séquentielle. Si nous utilisons exécutionContext.implicits.global, l'avenir exécuté parallèlement comme prévu.

Nous avons fait bouillir le code à l'exemple suivant (un exemple plus complet est ci-dessous): P>

import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}

object WhyNotParallelExperiment extends App {

  val actorSystem = ActorSystem(s"Experimental")   

  // Futures not started in future: running in parallel
  startFutures(runInFuture = false)(actorSystem.dispatcher)
  Thread.sleep(5000)

  // Futures started in future: running in sequentially. Why????
  startFutures(runInFuture = true)(actorSystem.dispatcher)
  Thread.sleep(5000)

  actorSystem.terminate()

  private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
    if (runInFuture) {
      Future{
        println(s"Start Futures on thread ${Thread.currentThread().getName()}")
        (1 to 9).foreach(startFuture)
        println(s"Started Futures on thread ${Thread.currentThread().getName()}")
      }
    } else {
      (11 to 19).foreach(startFuture)
    }
  }

  private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
    println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
    Thread.sleep(500)
    println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
  }


}


0 commentaires

3 Réponses :


0
votes

Cela a à voir avec le paramètre "débit" pour le répartiteur. J'ai ajouté un "fant-réveil" à appliquer.conf pour démontrer ceci: xxx

Voici votre exemple avec quelques modifications pour utiliser le répartiteur équitable pour les contrats à terme et imprimer la valeur actuelle du paramètre de débit: xxx

sortie: xxx

Comme vous pouvez le constater, le répartiteur équitable utilise des threads différents pour la plupart des futurs. < / p>

Le répartiteur par défaut est optimisé pour les acteurs afin que le débit soit défini sur 5 pour minimiser les commutateurs de contexte pour améliorer le débit de traitement des messages tout en maintenant une certaine équité.

Le seul changement de mon répartiteur équitable est le débit: 1, c'est-à-dire que chaque demande d'exécution ASYNC est donnée son propre fil si possible (jusqu'à parallélisme-max).

Je voudrais Recommander de créer des dispatchers distincts pour les futurs utilisés à des fins différentes. Par exemple. Un répartiteur (piscine thread) pour appeler des services Web, un autre pour bloquer l'accès à la base de données DB, etc. Cela vous donnerait un contrôle plus précis sur celui-ci en modifiant les paramètres de répartiteur personnalisés.

regardez un coup d'oeil à https://doc.akka.io/docs/akka/current/dispatchers.html , il est vraiment utile de comprendre les détails.

Découvrez également les paramètres de référence AKKA (Dispatchers par défaut en particulier), il existe un tas de commentaires utiles là-bas: https://github.com/akka/akka/blob/master/akka-actor/src/main/resources/ référence.conf


1 commentaires

Merci pour votre réponse. Malheureusement, cela ne résout pas notre problème. Dans votre correctif proposé, les futurs fonctionnent en parallèle parce qu'ils utilisent un répartiteur différent. Si vous utilisez le même «répartiteur équitable» pour le futur ci-joint, ils fonctionnent toujours séquentiellement.



0
votes

Après quelques recherches, j'ai découvert que le Dispatcher Classe Implememts akka.dispatch.batchingexecutor . Pour des raisons de performances, cette classe vérifie que les tâches doivent être lâchées sur le même thread. futur.map crée en interne un scala.concurrent.oncompleterunnable qui est lotré dans le BatchingExecutor .

Ceci semble être raisonnable pour Carte () / Flatmap () Lorsqu'une tâche génère une tâche ultérieure, mais pas pour de nouveaux contrats à terme explicites utilisés pour travaux de fourche. Interne, futur.apply est implémenté par futur.succassé (). Mapper et est donc lotré. Ma solution de contournement est maintenant de créer des contrats à terme d'une manière différente: xxx

le futureestarter -Runnables ne sont pas lots et fonctionnent ainsi en parallèle.

Quelqu'un peut-il confirmer que cette solution va bien? Y a-t-il de meilleurs moyens de résoudre ce problème? Est la mise en œuvre actuelle de futur / BatchingExecutor recherché ou est-ce un bug?


0 commentaires

3
votes

de la description de l'interne d'Akka BatchingExecutor code> (emphasis mine):

trait de mixine pour un exécuteur exécutant qui Groupes multiples imbriqués Runnable.run () code> appelle à un seul exécutable transmis à l'exécuteur exécutif d'origine forte>. Cela peut être une optimisation utile car elle contourne la file d'attente du contexte original et conserve le code associé (imbriqué) sur un seul fil qui peut améliorer l'affinité de la CPU. Toutefois, si les tâches transmises à l'exécuteur de l'exécuteur bloquent ou coûteuses, cette optimisation peut empêcher le vol-oeuvre et la performance pire ... Un exécuteur de commandes peut créer des blocages si le code n'utilise pas scala.concurrent.blocking code > Lorsque cela devrait, car les tâches créées dans d'autres tâches bloqueront la tâche externe terminée. P> blockQuote>

Si vous utilisez un répartiteur qui se mélange dans BatchingExecutor code> - à savoir une sous-classe de MessageDisPatcher code> peut utiliser le scala.concurrent.blocking code> construire sur Activer le parallélisme avec des contrats à terme imbriqués: p> xxx pré>

dans votre exemple, vous ajouteriez bloquant code> dans la méthode Startfuture CODE>: P>

Start Futures on thread Experimental-akka.actor.default-dispatcher-2
Started Futures on thread Experimental-akka.actor.default-dispatcher-2
Future 1 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 3 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 5 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 7 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 4 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 9 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 6 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 8 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 2 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 1 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 3 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 5 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 4 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 8 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 7 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 9 finished on thread Experimental-akka.actor.default-dispatcher-10
Future 6 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 2 finished on thread Experimental-akka.actor.default-dispatcher-4


1 commentaires

N'a pas eu scala.concurrent.blocking sur le radar - cela semble faire le travail. Merci encore pour vos commentaires!