J'ai une fonction qui génère des tâches (tâches liées io):
def run(self) while True: self.semaphore.acquire() # first acquire, then get task t = get_task() self.process_task(t) def process_task(self, task): try: self.execute_task(task) self.mark_as_done(task) except: self.mark_as_failed(task) self.semaphore.release()
Et j'essaie d'écrire un consommateur en asyncio qui traitera au maximum 10 tâches à la fois et une la tâche est terminée puis en prendra une nouvelle. Je ne sais pas si je devrais utiliser des sémaphores ou existe-t-il une sorte d'exécuteur de pool asycio? J'ai commencé à écrire un pseudocode avec des fils:
def get_task(): while True: new_task = _get_task() if new_task is not None: yield new_task else: sleep(1)
Quelqu'un pourrait-il m'aider? Je n'ai aucune idée de l'endroit où placer les mots clés async / wait
3 Réponses :
Async n'est pas des threads. Si, par exemple, vous avez des tâches liées aux E / S de fichiers, écrivez-les de manière asynchrone en utilisant aiofiles
import time from concurrent.futures import ThreadPoolExecutor import asyncio def blocking(x): time.sleep(1) print( x, "is done" ) async def run(loop): futs = [] executor = ThreadPoolExecutor(max_workers=5) for x in range(15): future = loop.run_in_executor(executor, blocking, x) futs.append( future ) await asyncio.sleep(4) res = await asyncio.gather( *futs ) loop = asyncio.get_event_loop() loop.run_until_complete( run(loop) ) loop.close()
Merci pour votre réponse. Le fait est que je ne veux pas faire 10 sur 10 mais commencer la tâche suivante lorsqu'une tâche est terminée. Donc au début je commence 10 tâches, une terminée et j'ajoute la suivante à la piscine. Pour qu'il y ait 10 tâches en cours de traitement à la fois tout le temps
mais le problème dans votre solution est que vous récupérez une nouvelle tâche alors que vous n'avez pas encore acquis de sémaphore, ce qui cause des problèmes dans mon cas
Limite de tâches simple en utilisant asyncio.Sepmaphore Le problème avec cette solution est que les tâches sont extraites du générateur avec gourmandise. Par exemple, si le générateur lit à partir d'une grande base de données, le programme pourrait manquer de mémoire. À part cela, il est idiomatique et se comporte bien. Une solution, qui utilise protocole de générateur asynchrone pour extraire de nouvelles tâches à la demande: Il peut être considéré comme sous-optimal, car il ne commence pas à exécuter des tâches avant que 10 ne soient disponibles . Et voici une solution concise et magique utilisant le modèle de travail : Elle repose sur une propriété quelque peu contre-intuitive de pouvoir avoir plusieurs itérateurs asynchrones sur le même générateur asynchrone, auquel cas chaque élément généré est vu par un seul itérateur. Mon instinct me dit qu'aucune de ces solutions ne se comporte correctement sur annulation . async def max10(task_generator):
async def worker():
async for task in task_generator:
await task
await asyncio.gather(*[worker() for i in range(10)])
async def max10(task_generator):
tasks = set()
gen = task_generator.__aiter__()
try:
while True:
while len(tasks) < 10:
tasks.add(await gen.__anext__())
_done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
except StopAsyncIteration:
await asyncio.gather(*tasks)
async def max10(task_generator):
semaphore = asyncio.Semaphore(10)
async def bounded(task):
async with semaphore:
return await task
async for task in task_generator:
asyncio.ensure_future(bounded(task))
J'aime l'approche didactique de cette réponse, mais le dernier extrait pourrait être beaucoup plus simple. Puisque vous avez un nombre fixe de travailleurs, vous pouvez vous débarrasser du sémaphore. Et sans le sémaphore, les ouvriers peuvent utiliser la boucle async for
ordinaire.
Merci, édité. A également ajouté une explication pourquoi cela fonctionne même :)
Comme l'a souligné Dima Tismek , l'utilisation de sémaphores pour limiter la concurrence est vulnérable à l'épuisement du Il y a deux domaines où le générateur le code pourrait être amélioré: Voici une implémentation qui aborde les deux problèmes: Un cas de test simple: task_generator code > trop empressé, car il n'y a pas de contre-pression entre l'obtention des tâches et leur soumission à la boucle d'événements. Une meilleure option, également explorée par l'autre réponse, n'est pas de générer une tâche dès que le générateur a produit un objet, mais de créer un nombre fixe d'ouvriers qui épuisent le générateur simultanément.
async def mock_task(num):
print('running', num)
await asyncio.sleep(random.uniform(1, 5))
print('done', num)
async def mock_gen():
tnum = 0
while True:
await asyncio.sleep(.1 * random.random())
print('generating', tnum)
yield asyncio.create_task(mock_task(tnum))
tnum += 1
if __name__ == '__main__':
asyncio.run(throttle(mock_gen(), 3))
async def throttle(task_generator, max_tasks):
it = task_generator.__aiter__()
cancelled = False
async def worker():
async for task in it:
try:
await task
except asyncio.CancelledError:
# If a generated task is canceled, let its worker
# proceed with other tasks - except if it's the
# outer coroutine that is cancelling us.
if cancelled:
raise
# other exceptions are propagated to the caller
worker_tasks = [asyncio.create_task(worker())
for i in range(max_tasks)]
try:
await asyncio.gather(*worker_tasks)
except:
# In case of exception in one worker, or in case we're
# being cancelled, cancel all workers and propagate the
# exception.
cancelled = True
for t in worker_tasks:
t.cancel()
raise
p>
Btw., D'où vient l'exigence d'exécuter au plus 10 tâches à la fois?
Question existante avec une solution qui pourrait s'appliquer ici: stackoverflow.com/a/48486557/705086