2
votes

Traiter les tâches par lots dans Asyncio

J'ai une fonction qui génère des tâches (tâches liées io):

def run(self)
   while True:
       self.semaphore.acquire() # first acquire, then get task
       t = get_task()
       self.process_task(t)

def process_task(self, task):
   try:
       self.execute_task(task)
       self.mark_as_done(task)
   except:
       self.mark_as_failed(task)
   self.semaphore.release()

Et j'essaie d'écrire un consommateur en asyncio qui traitera au maximum 10 tâches à la fois et une la tâche est terminée puis en prendra une nouvelle. Je ne sais pas si je devrais utiliser des sémaphores ou existe-t-il une sorte d'exécuteur de pool asycio? J'ai commencé à écrire un pseudocode avec des fils:

def get_task():
    while True:
        new_task = _get_task()
        if new_task is not None:
            yield new_task
        else:
            sleep(1)

Quelqu'un pourrait-il m'aider? Je n'ai aucune idée de l'endroit où placer les mots clés async / wait


2 commentaires

Btw., D'où vient l'exigence d'exécuter au plus 10 tâches à la fois?


Question existante avec une solution qui pourrait s'appliquer ici: stackoverflow.com/a/48486557/705086


3 Réponses :


1
votes

Async n'est pas des threads. Si, par exemple, vous avez des tâches liées aux E / S de fichiers, écrivez-les de manière asynchrone en utilisant aiofiles

import time
from concurrent.futures import ThreadPoolExecutor
import asyncio

def blocking(x):
  time.sleep(1)
  print( x, "is done" )

async def run(loop):
  futs = []
  executor = ThreadPoolExecutor(max_workers=5)
  for x in range(15):
    future = loop.run_in_executor(executor, blocking, x)
    futs.append( future )

  await asyncio.sleep(4)
  res = await asyncio.gather( *futs )

loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()


2 commentaires

Merci pour votre réponse. Le fait est que je ne veux pas faire 10 sur 10 mais commencer la tâche suivante lorsqu'une tâche est terminée. Donc au début je commence 10 tâches, une terminée et j'ajoute la suivante à la piscine. Pour qu'il y ait 10 tâches en cours de traitement à la fois tout le temps


mais le problème dans votre solution est que vous récupérez une nouvelle tâche alors que vous n'avez pas encore acquis de sémaphore, ce qui cause des problèmes dans mon cas



4
votes

Limite de tâches simple en utilisant asyncio.Sepmaphore

async def max10(task_generator):
    async def worker():
        async for task in task_generator:
            await task

    await asyncio.gather(*[worker() for i in range(10)])

Le problème avec cette solution est que les tâches sont extraites du générateur avec gourmandise. Par exemple, si le générateur lit à partir d'une grande base de données, le programme pourrait manquer de mémoire.

À part cela, il est idiomatique et se comporte bien.

Une solution, qui utilise protocole de générateur asynchrone pour extraire de nouvelles tâches à la demande:

async def max10(task_generator):
    tasks = set()
    gen = task_generator.__aiter__()
    try:
        while True:
            while len(tasks) < 10:
                tasks.add(await gen.__anext__())
            _done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    except StopAsyncIteration:
        await asyncio.gather(*tasks)

Il peut être considéré comme sous-optimal, car il ne commence pas à exécuter des tâches avant que 10 ne soient disponibles .

Et voici une solution concise et magique utilisant le modèle de travail :

async def max10(task_generator):
    semaphore = asyncio.Semaphore(10)

    async def bounded(task):
        async with semaphore:
            return await task

    async for task in task_generator:
        asyncio.ensure_future(bounded(task))

Elle repose sur une propriété quelque peu contre-intuitive de pouvoir avoir plusieurs itérateurs asynchrones sur le même générateur asynchrone, auquel cas chaque élément généré est vu par un seul itérateur.

Mon instinct me dit qu'aucune de ces solutions ne se comporte correctement sur annulation .


2 commentaires

J'aime l'approche didactique de cette réponse, mais le dernier extrait pourrait être beaucoup plus simple. Puisque vous avez un nombre fixe de travailleurs, vous pouvez vous débarrasser du sémaphore. Et sans le sémaphore, les ouvriers peuvent utiliser la boucle async for ordinaire.


Merci, édité. A également ajouté une explication pourquoi cela fonctionne même :)



1
votes

Comme l'a souligné Dima Tismek , l'utilisation de sémaphores pour limiter la concurrence est vulnérable à l'épuisement du task_generator code > trop empressé, car il n'y a pas de contre-pression entre l'obtention des tâches et leur soumission à la boucle d'événements. Une meilleure option, également explorée par l'autre réponse, n'est pas de générer une tâche dès que le générateur a produit un objet, mais de créer un nombre fixe d'ouvriers qui épuisent le générateur simultanément.

Il y a deux domaines où le générateur le code pourrait être amélioré:

  • il n'y a pas besoin de sémaphore - c'est superflu quand le nombre de tâches est fixé au départ;
  • gestion de l'annulation des tâches générées et de la tâche de limitation.

Voici une implémentation qui aborde les deux problèmes:

async def mock_task(num):
    print('running', num)
    await asyncio.sleep(random.uniform(1, 5))
    print('done', num)

async def mock_gen():
    tnum = 0
    while True:
        await asyncio.sleep(.1 * random.random())
        print('generating', tnum)
        yield asyncio.create_task(mock_task(tnum))
        tnum += 1

if __name__ == '__main__':
    asyncio.run(throttle(mock_gen(), 3))

Un cas de test simple:

async def throttle(task_generator, max_tasks):
    it = task_generator.__aiter__()
    cancelled = False
    async def worker():
        async for task in it:
            try:
                await task
            except asyncio.CancelledError:
                # If a generated task is canceled, let its worker
                # proceed with other tasks - except if it's the
                # outer coroutine that is cancelling us.
                if cancelled:
                    raise
            # other exceptions are propagated to the caller
    worker_tasks = [asyncio.create_task(worker())
                    for i in range(max_tasks)]
    try:
        await asyncio.gather(*worker_tasks)
    except:
        # In case of exception in one worker, or in case we're
        # being cancelled, cancel all workers and propagate the
        # exception.
        cancelled = True
        for t in worker_tasks:
            t.cancel()
        raise
p>


0 commentaires