3
votes

Puis-je exécuter plusieurs processus lents en arrière-plan afin que plusieurs tâches puissent s'exécuter en parallèle?

J'ai une application console écrite en utilisant C # en haut du framework Core .NET 2.2.

Mon application me permet de déclencher des tâches d'administration de longue durée à l'aide du planificateur de tâches Windows. p >

L'une des tâches d'administration effectue un appel d'API Web qui télécharge de nombreux fichiers avant de les télécharger sur le stockage Blob Azure. Voici les étapes logiques que mon code devra effectuer pour faire le travail

  1. Appelez l'API distante qui répond avec un message Mime où chaque message représente un fichier.
  2. Analyser les messages Mime et convertir chaque message en un MemoryStream en créant une collection de MemoryStream

Une fois que j'ai une collection avec plus de 1000 MemoryStream , je souhaite écrire chaque Stream sur le stockage Blob Azure. Étant donné que l'écriture sur le stockage distant est lente, j'espère pouvoir exécuter chaque itération d'écriture en utilisant son propre processus ou thread. Cela me permettra d'avoir potentiellement plus de 1000 threads fonctionnant en même temps en parallèle au lieu d'avoir à attendre le résultat de chaque opération d'écriture. Chaque thread sera responsable de la journalisation de toutes les erreurs susceptibles de se produire pendant le processus d'écriture / téléchargement. Toutes les erreurs enregistrées seront traitées en utilisant un travail différent, donc je n'ai pas à me soucier de réessayer.

D'après ce que je comprends, appeler le code qui écrit / télécharge le flux de manière asynchrone fera exactement cela. En d'autres termes, je dirais "il y a un Stream qui l'exécute et l'exécute aussi longtemps que nécessaire. Je ne me soucie pas vraiment du résultat tant que la tâche est terminée." p>

Lors des tests, j'ai découvert que ma compréhension de l'appel async est quelque peu invalide. J'avais l'impression que lors de l'appel d'une méthode définie avec async , elle sera exécutée dans le thread / worker en arrière-plan jusqu'à ce que ce processus soit terminé. Mais, ma compréhension a échoué lorsque j'ai testé le code. Mon code m'a montré que sans ajouter le mot-clé wait le code async n'est jamais vraiment exécuté. En même temps, lorsque le mot-clé wait est ajouté, le code attendra que le processus se termine avant de continuer. En d'autres termes, ajouter await pour mon besoin ira à l'encontre de l'objectif d'appeler la méthode de manière asynchrone.

Voici une version allégée de mon code dans le but d'expliquer ce que je suis essayer d'accomplir

public async Task Run()
{
    // This gets populated after calling the web-API and parsing out the result
    List<Stream> files = new List<MemoryStream>{.....};

    foreach (Stream file in files)
    {
        // This code should get executed in the background without having to await the result
        await Upload(file);
    }
}

// This method is responsible of upload a stream to a storage and log error if any
private async Task Upload(Stream stream)
{
    try
    {
        await Storage.Create(file, GetUniqueName());
    } 
    catch(Exception e)
    {
        // Log any errors
    }
}

À partir du code ci-dessus, appeler wait Upload (file); fonctionne et téléchargera le fichier comme prévu. Cependant, comme j'utilise await lors de l'appel de la méthode Upload () , ma boucle ne passera PAS à l'itération suivante tant que le code de téléchargement ne sera pas terminé. En même temps, en supprimant le mot-clé await , la boucle n'attend pas le processus de téléchargement, mais le Stream n'écrit jamais réellement dans le stockage comme si je n'avais jamais appelé le code.

Comment puis-je exécuter plusieurs méthodes Upload en parallèle pour avoir un thread en cours d'exécution par téléchargement en arrière-plan?


0 commentaires

5 Réponses :


7
votes

Convertissez la liste en une liste de tâches "Upload" et attendez-les toutes avec Task.WhenAll () :

public async Task Run()
{
    // This gets populated after calling the web-API and parsing out the result
    List<Stream> files = new List<MemoryStream>{.....};
    var tasks = files.Select(Upload);

    await Task.WhenAll(tasks);
}

Voir ce message pour plus d'informations sur les tâches / await.


9 commentaires

Merci pour cette information. alors que fait Task.WhenAll () dans ce cas? Déclenche-t-il toutes les tâches en même temps en parallèle, regroupe-t-il les tâches en groupes et répartit-il des groupes en parallèle ou s'exécute-t-il les unes après les autres dans un pipeline?


@MikeA en bref, le planificateur de tâches décidera du nombre de threads à libérer du pool de threads à exécuter simultanément, il utilise une meilleure estimation basée sur certains facteurs et heuristiques. En bref, il fonctionne en parallèle du mieux qu'il peut


@MichaelRandall: À ma connaissance, ce que vous avez décrit est Parallel.ForEach . Et d'après mon expérience, Task.WhenAll déclenche toutes les tâches en même temps. Est-ce documenté quelque part?


@abatishchev Les tâches se déclenchent à la seconde où vous exécutez, sélectionnez-les. Cependant, votre fonction continue et continue. Le Task.WhenAll est une méthode asynchrone qui sera renvoyée lorsqu'une tâche a échoué ou que toutes se sont terminées avec succès. Si vous attendez cela, l'exécution y est "suspendue" jusqu'à ce que la tâche soit terminée.


@abatishchev ouais, il est documenté dans le pool de tâches ou de threads ou quelque part, c'est exactement ce que fait le planificateur par défaut, mais vous pouvez le tester avec un tas de tâches et de threads. Sleep, cependant avec async et wait et le travail lié à io, il attendra que les ports d'achèvement io se terminent. Cependant, dans l'ensemble, le planificateur ne libérera pas de manière agressive les threads du pool de threads et adoptera une approche de meilleure estimation régie par des facteurs tels que Max threads, le type de travail que vous faites et l'heuristique. Stephan Toub et Stephan Cleary ont également des blogs dessus


@FrankerZ: ouais, et c'est mon argument. wait Task.WhenAll (Enumerable.Range (0, Math.Pow (2,10)). Select (_ => httpClient.GetAsync ("https://google.com"))) a gagné ne planifiez pas la charge de travail correctement. Est-ce que cela va?


@MichaelRandall: J'ai lu les deux blogs. Pouvez-vous fournir un lien plus spécifique? Et s'il vous plaît voir mon commentaire ci-dessus.


@abatishchev je l'admets, les sources pertinentes sur le fonctionnement interne du .net TaskScheduler par défaut ne sont pas faciles à trouver et il n'y a que des commentaires éclairés à ce sujet dans la documentation. De plus, le code source TPL n'est pas pour les âmes sensibles. Cependant, si vous voulez creuser assez loin, vous trouverez des références aux internes de celui-ci écrites par Stehpen Cleary, Stephen Toub, Eric Lippert, et plus encore. Il y a aussi beaucoup de questions sur son comportement sur SO (veuillez noter que certaines d'entre elles sont obsolètes)


@abatishchev Ce que je suggère, c'est que si vous voulez une réponse plus concrète du comportement du Planificateur de tâches par défaut, c'est que vous posiez une question sur StackOverflow. Expliquer pourquoi le TaskScheduler divisera les threads du pool de threads et les threads IO comme il le fait. Je serai heureux de rassembler toutes les sources dont je dispose et de référencer la documentation pertinente (et parfois pas évidente) que je peux trouver. Je remarque également que Stephen Cleary répond à nouveau aux questions et est actif sur les questions de tâche. Alors tu pourrais avoir de la chance là aussi



3
votes

Vous avez probablement besoin de ceci:

var tasks = files.Select(Upload);
await Task.WhenAll(tasks);

Notez simplement que cela engendrera autant de tâches que vous avez de fichiers, ce qui peut faire tomber le processus / la machine s'il y en a trop. Voir Avoir un ensemble de tâches avec seulement X s'exécutant à la fois comme n exemple comment résoudre ce problème.


0 commentaires

4
votes

J'espère pouvoir exécuter chaque itération d'écriture en utilisant son propre processus ou thread.

Ce n'est pas vraiment la meilleure façon de procéder. Les processus et les threads sont des ressources limitées. Votre facteur limitant attend sur le réseau pour effectuer une action.

Ce que vous voudrez faire est juste quelque chose comme:

var tasks = new List<Task>(queue.Count);

while (queue.Count > 0)
{
  var myobject = Queue.Dequeue();
  var task = blockBlob.UploadFromByteArrayAsync(myobject.content, 0, myobject.content.Length);
  tasks.Add(task);
}
await Task.WhenAll(tasks);

1 commentaires

C'est pourquoi les tâches sont mieux adaptées pour cela (ce que l'OP essayait de faire). Laissez le planificateur de tâches déterminer ce qui est le mieux pour optimiser les ressources de l'application / du serveur.



3
votes

Les autres réponses sont correctes, mais une autre approche est de votre TPL DataFlow disponible dans Nuget à partir de https://www.nuget.org/packages/System.Threading.Tasks.Dataflow/

public static async Task DoWorkLoads(List<Something> results)
{
   var options = new ExecutionDataflowBlockOptions
                     {
                        MaxDegreeOfParallelism = 50
                     };

   var block = new ActionBlock<Something>(MyMethodAsync, options);

   foreach (var result in results)
      block.Post(result );

   block.Complete();
   await block.Completion;

}

...

public async Task MyMethodAsync(Something result)
{       
   //  Do async work here
}

L'avantage de dataflow

  1. Est-ce que cela fonctionne naturellement avec async comme le fait les solutions basées sur les tâches WhenAll
  2. il peut également être intégré à un plus grand pipeline de tâches
    • Vous pouvez réessayer les erreurs en les renvoyant.
    • Ajoutez tous les appels de prétraitement aux blocs précédents
  3. Vous pouvez limiter le MaxDegreeOfParallelism si la limitation est un problème
  4. Vous pouvez créer des pipelines plus compliqués, d'où le nom de DataFlow


0 commentaires

0
votes

Vous pouvez convertir votre code en fonction Azure et laissez Azure gérer la majeure partie du parallélisme, de la montée en charge et du téléchargement vers Azure Blob Storage.

Vous pouvez utiliser un déclencheur Http ou un déclencheur Service Bus pour lancer chaque tâche de téléchargement, de traitement et de téléchargement.


0 commentaires