3
votes

pourquoi plus d'un worker est-il utilisé dans `multiprocessing.Pool (). apply_async ()`?

Problem

Depuis le multiprocessing.Pool docs :

apply_async (func ...) : Une variante de la méthode apply () qui renvoie un objet résultat. ...

Pour en savoir plus ...

apply (func [ args [ kwds]]) : Appel de func avec des arguments args et des arguments de mot-clé kwds. Il bloque jusqu'à ce que le résultat soit prêt. Compte tenu de ces blocs, apply_async () est mieux adapté pour effectuer un travail en parallèle. De plus, func n'est exécuté que dans l'un des nœuds de calcul du pool.

La dernière ligne en gras suggère qu'un seul worker d'un pool est utilisé. Je trouve que cela n'est vrai que sous certaines conditions.

Étant donné

Voici le code qui exécute Pool.apply_async () dans trois cas similaires. Dans tous les cas, l'identifiant du processus est imprimé.

# Time uncommented
# 1. Docs    : [6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780]
# 2. Delay   : [6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112]
# 3. No delay: [6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112]

Résultats

L'exemple du docs (cas 1) confirme qu'un seul worker est exécuté. Nous étendons cet exemple dans les cas suivants en appliquant blocking_func , qui bloque avec un certain retard.

Commenter la ligne time.sleep () dans blocking_func () met tous les cas en accord.

# Time commented
# 1. Docs    : [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]
# 2. Delay   : [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]
# 3. No delay: [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]


0 commentaires

3 Réponses :


2
votes

Un seul worker est utilisé pour cet appel . Un seul apply_async ne peut pas être exécuté dans deux nœuds de calcul. Cela n'empêche pas l'exécution de plusieurs appels apply_async dans différents nœuds de calcul. Une telle restriction irait complètement à l'encontre du point d'avoir un pool de processus.


1 commentaires

Merci. Je ne savais pas que plus d'un travailleur pouvait être utilisé par appel. Pourquoi un seul processus est-il utilisé dans le Cas 1 ? La tâche n'est-elle pas assez lourde pour mériter d'utiliser plus de processus?



3
votes

Votre confusion semble provenir du fait que [pool.apply_async (...) for i in range (10)] est one appel, quand il y a vraiment dix appels indépendants. Un appel à n'importe quelle méthode de pool est un "travail". Un travail peut généralement conduire à une ou plusieurs tâches distribuées. Les méthodes apply ne produisent toujours qu'une seule tâche sous le capot. Une tâche est une unité de travail indivisible qui sera reçue dans son ensemble par un pool-worker aléatoire.

Il n'y a qu'une seule enquête partagée, tous les travailleurs sont alimentés. Quel travailleur inactif sera réveillé de l'attente à get () une tâche de cette file d'attente revient au système d'exploitation. Votre résultat-entropie pour le cas 1 est encore quelque peu surprenant et probablement très chanceux, du moins à moins que vous ne confirmiez que vous n'avez que deux cœurs.

Et oui, votre observation pour cette exécution est également influencée par le temps de calcul nécessaire pour une tâche, car les threads (l'unité d'exécution planifiée dans un processus) sont généralement planifiés avec des politiques de découpage du temps (par exemple ~ 20 ms pour Windows).

/ p>


2 commentaires

Merci. Non, je me rends compte qu'il y a dix appels de apply_async () . Ce qui n'était pas clair, c'est que chaque appel se rapporte à un processus aléatoire sélectionné dans le pool de travailleurs. Les résultats m'ont amené à croire qu'un seul processus a été utilisé, mais c'est peut-être un hasard puisque, oui, je n'ai que deux cœurs et les travaux n'ont peut-être pas été suffisamment intenses pour déclencher une commutation de processus. - pylang il y a 8 minutes


@pylang Ah je vois. Les processus sont créés lorsque vous instanciez le pool. Il y avait donc deux processus, mais un seul a été planifié en un clin d'œil pour traiter vos travaux.



1
votes

Poussé par le commentaire de @ Darkonaut, j'ai inspecté davantage et j'ai constaté que la fonction de blocage était trop rapide. J'ai testé ce dernier code, modifié, avec une nouvelle fonction de blocage intensive.

Code

La nouvelle fonction de blocage calcule de manière itérative les nombres de Fibonacci. Un argument facultatif peut être passé pour élargir la plage et calculer des nombres plus grands.

# Results
Docs     : [10032, 10032, 10032, 10032, 10032, 10032, 10032, 10032, 10032, 10032]
Offset   : [10032, 8268, 10032, 8268, 10032, 8268, 10032, 8268, 10032, 8268]
Duration : 1.67s

if __name__ == '__main__':        
    start = time.time()
    apply_async()
    end = time.time()
    print(f"Duration : {end - start:.2f}s")

Démo

Passer un grand entier ( 100000 ) au paramètre offset, par exemple ... [pool.apply_async (blocking_func, args = (i, 100000)) ...] et en exécutant le code, nous sommes en mesure de déclencher la commutation de processus de manière plus fiable.

def blocking_func(n, offset=0):
    """Return an intensive result via Fibonacci number."""
    n += offset
    a, b = 0, 1
    for _ in range(n):
        a, b = b, a + b
    return a, os.getpid()


def blocking_func(n, offset=0):
    """Return an intensive result via recursive fibonacci number."""
    func = blocking_func
    n += offset
    if n <= 1:
        return n, os.getpid()
    return func(n-1) + func(n-2)

Il est intéressant de noter que 100k nombres de Fibonacci sont calculés de manière asynchrone 10 fois en moins de 2 secondes. En revanche, l'utilisation d'une implémentation récursive de Fibonacci serait comparativement intensive à ~ 30 itérations (non illustrées).


0 commentaires