Problem
Depuis le multiprocessing.Pool
docs :
apply_async (func ...)
: Une variante de la méthodeapply ()
qui renvoie un objet résultat. ...
Pour en savoir plus ...
apply (func [ args [ kwds]])
: Appel de func avec des arguments args et des arguments de mot-clé kwds. Il bloque jusqu'à ce que le résultat soit prêt. Compte tenu de ces blocs, apply_async () est mieux adapté pour effectuer un travail en parallèle. De plus, func n'est exécuté que dans l'un des nœuds de calcul du pool.
La dernière ligne en gras suggère qu'un seul worker d'un pool est utilisé. Je trouve que cela n'est vrai que sous certaines conditions.
Étant donné
Voici le code qui exécute Pool.apply_async ()
dans trois cas similaires. Dans tous les cas, l'identifiant du processus est imprimé.
# Time uncommented # 1. Docs : [6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780] # 2. Delay : [6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112] # 3. No delay: [6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112]
Résultats
L'exemple du docs (cas 1) confirme qu'un seul worker est exécuté. Nous étendons cet exemple dans les cas suivants en appliquant blocking_func
, qui bloque avec un certain retard.
Commenter la ligne time.sleep ()
dans blocking_func ()
met tous les cas en accord.
# Time commented # 1. Docs : [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208] # 2. Delay : [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208] # 3. No delay: [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]
3 Réponses :
Un seul worker est utilisé pour cet appel . Un seul apply_async
ne peut pas être exécuté dans deux nœuds de calcul. Cela n'empêche pas l'exécution de plusieurs appels apply_async
dans différents nœuds de calcul. Une telle restriction irait complètement à l'encontre du point d'avoir un pool de processus.
Merci. Je ne savais pas que plus d'un travailleur pouvait être utilisé par appel. Pourquoi un seul processus est-il utilisé dans le Cas 1
? La tâche n'est-elle pas assez lourde pour mériter d'utiliser plus de processus?
Votre confusion semble provenir du fait que [pool.apply_async (...) for i in range (10)]
est one appel, quand il y a vraiment dix appels indépendants. Un appel à n'importe quelle méthode de pool est un "travail". Un travail peut généralement conduire à une ou plusieurs tâches distribuées. Les méthodes apply
ne produisent toujours qu'une seule tâche sous le capot. Une tâche est une unité de travail indivisible qui sera reçue dans son ensemble par un pool-worker aléatoire.
Il n'y a qu'une seule enquête
partagée, tous les travailleurs sont alimentés. Quel travailleur inactif sera réveillé de l'attente à get ()
une tâche de cette file d'attente revient au système d'exploitation. Votre résultat-entropie pour le cas 1 est encore quelque peu surprenant et probablement très chanceux, du moins à moins que vous ne confirmiez que vous n'avez que deux cœurs.
Et oui, votre observation pour cette exécution est également influencée par le temps de calcul nécessaire pour une tâche, car les threads (l'unité d'exécution planifiée dans un processus) sont généralement planifiés avec des politiques de découpage du temps (par exemple ~ 20 ms pour Windows).
/ p>
Merci. Non, je me rends compte qu'il y a dix appels de apply_async ()
. Ce qui n'était pas clair, c'est que chaque appel se rapporte à un processus aléatoire sélectionné dans le pool de travailleurs. Les résultats m'ont amené à croire qu'un seul processus a été utilisé, mais c'est peut-être un hasard puisque, oui, je n'ai que deux cœurs et les travaux n'ont peut-être pas été suffisamment intenses pour déclencher une commutation de processus. - pylang il y a 8 minutes
@pylang Ah je vois. Les processus sont créés lorsque vous instanciez le pool. Il y avait donc deux processus, mais un seul a été planifié en un clin d'œil pour traiter vos travaux.
Poussé par le commentaire de @ Darkonaut, j'ai inspecté davantage et j'ai constaté que la fonction de blocage était trop rapide. J'ai testé ce dernier code, modifié, avec une nouvelle fonction de blocage intensive.
Code
La nouvelle fonction de blocage calcule de manière itérative les nombres de Fibonacci. Un argument facultatif peut être passé pour élargir la plage et calculer des nombres plus grands.
# Results Docs : [10032, 10032, 10032, 10032, 10032, 10032, 10032, 10032, 10032, 10032] Offset : [10032, 8268, 10032, 8268, 10032, 8268, 10032, 8268, 10032, 8268] Duration : 1.67s
if __name__ == '__main__': start = time.time() apply_async() end = time.time() print(f"Duration : {end - start:.2f}s")
Démo
Passer un grand entier ( 100000
) au paramètre offset, par exemple ... [pool.apply_async (blocking_func, args = (i, 100000)) ...]
et en exécutant le code, nous sommes en mesure de déclencher la commutation de processus de manière plus fiable.
def blocking_func(n, offset=0): """Return an intensive result via Fibonacci number.""" n += offset a, b = 0, 1 for _ in range(n): a, b = b, a + b return a, os.getpid() def blocking_func(n, offset=0): """Return an intensive result via recursive fibonacci number.""" func = blocking_func n += offset if n <= 1: return n, os.getpid() return func(n-1) + func(n-2)
Il est intéressant de noter que 100k nombres de Fibonacci sont calculés de manière asynchrone 10 fois en moins de 2 secondes. En revanche, l'utilisation d'une implémentation récursive de Fibonacci serait comparativement intensive à ~ 30 itérations (non illustrées).