1
votes

Parallel.ForEach plus rapide que Task.WaitAll pour les tâches liées aux E / S?

J'ai deux versions de mon programme qui soumettent ~ 3000 requêtes HTTP GET à un serveur Web.

La première version est basée sur ce que j'ai lu ici . Cette solution a du sens pour moi, car faire des requêtes Web est un travail lié aux E / S, et l'utilisation de async / await avec Task.WhenAll ou Task.WaitAll signifie que vous pouvez soumettre 100 requêtes en même temps, puis attendre qu'elles toutes soient terminez avant de soumettre les 100 demandes suivantes afin de ne pas enliser le serveur Web. J'ai été surpris de voir que cette version a terminé tout le travail en ~ 12 minutes - bien plus lentement que prévu.

La deuxième version soumet les 3000 requêtes HTTP GET dans un parallèle .Pour chaque boucle. J'utilise .Result pour attendre que chaque requête se termine avant que le reste de la logique dans cette itération de la boucle puisse s'exécuter. Je pensais que ce serait une solution beaucoup moins efficace, car l'utilisation de threads pour effectuer des tâches en parallèle est généralement mieux adaptée pour effectuer un travail lié au processeur, mais j'ai été surpris de voir que cette version a terminé tout le travail en ~ 3 minutes!

Ma question est pourquoi la version Parallel.ForEach est-elle plus rapide? Cela a été une surprise supplémentaire car lorsque j'ai appliqué les deux mêmes techniques à une API / un serveur Web différent , la version 1 de mon code était en fait plus rapide que la version 2 d'environ 6 minutes - c'est ce à quoi je m'attendais. Les performances des deux versions différentes pourraient-elles avoir quelque chose à voir avec la façon dont le serveur Web gère le trafic?

Vous pouvez voir une version simplifiée de mon code ci-dessous:

private async Task<ObjectDetails> TryDeserializeResponse(HttpResponseMessage response)
{
    try
    {
        using (Stream stream = await response.Content.ReadAsStreamAsync())
        using (StreamReader readStream = new StreamReader(stream, Encoding.UTF8))
        using (JsonTextReader jsonTextReader = new JsonTextReader(readStream))
        {
            JsonSerializer serializer = new JsonSerializer();
            ObjectDetails objectDetails = serializer.Deserialize<ObjectDetails>(
                jsonTextReader);
            return objectDetails;
        }
    }
    catch (Exception e)
    {
        // Log exception
        return null;
    }
}

private async Task<HttpResponseMessage> TryGetResponse(string urlStr)
{
    try
    {
        HttpResponseMessage response = await httpClient.GetAsync(urlStr)
            .ConfigureAwait(false);
        if (response.StatusCode != HttpStatusCode.OK)
        {
            throw new WebException("Response code is "
                + response.StatusCode.ToString() + "... not 200 OK.");
        }
        return response;
    }
    catch (Exception e)
    {
        // Log exception
        return null;
    }
}

private async Task<ListOfObjects> GetObjectDetailsAsync(string baseUrl, int id)
{
    string urlStr = baseUrl + @"objects/id/" + id + "/details";

    HttpResponseMessage response = await TryGetResponse(urlStr);

    ObjectDetails objectDetails = await TryDeserializeResponse(response);

    return objectDetails;
}

// With ~3000 objects to retrieve, this code will create 100 API calls
// in parallel, wait for all 100 to finish, and then repeat that process
// ~30 times. In other words, there will be ~30 batches of 100 parallel
// API calls.
private Dictionary<int, Task<ObjectDetails>> GetAllObjectDetailsInBatches(
    string baseUrl, Dictionary<int, MyObject> incompleteObjects)
{
    int batchSize = 100;
    int numberOfBatches = (int)Math.Ceiling(
        (double)incompleteObjects.Count / batchSize);
    Dictionary<int, Task<ObjectDetails>> objectTaskDict
        = new Dictionary<int, Task<ObjectDetails>>(incompleteObjects.Count);

    var orderedIncompleteObjects = incompleteObjects.OrderBy(pair => pair.Key);

    for (int i = 0; i < 1; i++)
    {
        var batchOfObjects = orderedIncompleteObjects.Skip(i * batchSize)
            .Take(batchSize);
        var batchObjectsTaskList = batchOfObjects.Select(
            pair => GetObjectDetailsAsync(baseUrl, pair.Key));
        Task.WaitAll(batchObjectsTaskList.ToArray());
        foreach (var objTask in batchObjectsTaskList)
            objectTaskDict.Add(objTask.Result.id, objTask);
    }

    return objectTaskDict;
}

public void GetObjectsVersion1()
{
    string baseUrl = @"https://mywebserver.com:/api";

    // GetIncompleteObjects is not shown, but it is not relevant to
    // the question
    Dictionary<int, MyObject> incompleteObjects = GetIncompleteObjects();

    Dictionary<int, Task<ObjectDetails>> objectTaskDict
        = GetAllObjectDetailsInBatches(baseUrl, incompleteObjects);

    foreach (KeyValuePair<int, MyObject> pair in incompleteObjects)
    {
        ObjectDetails objectDetails = objectTaskDict[pair.Key].Result
            .objectDetails;

        // Code here that copies fields from objectDetails to pair.Value
        // (the incompleteObject)

        AllObjects.Add(pair.Value);
    };
}

public void GetObjectsVersion2()
{
    string baseUrl = @"https://mywebserver.com:/api";

    // GetIncompleteObjects is not shown, but it is not relevant to
    // the question
    Dictionary<int, MyObject> incompleteObjects = GetIncompleteObjects();

    Parallel.ForEach(incompleteHosts, pair =>
    {
        ObjectDetails objectDetails = GetObjectDetailsAsync(
            baseUrl, pair.Key).Result.objectDetails;

        // Code here that copies fields from objectDetails to pair.Value
        // (the incompleteObject)

        AllObjects.Add(pair.Value);
    });
}


10 commentaires

À un moment donné, vous n'avez pas utilisé ConfigureAwait (false) (voir GetObjectDetailsAsync ) et cela aura un grand impact sur les performances car le code attend la synchronisation.


Il serait également intéressant de savoir dans quel contexte ce code s'exécute. WinForms / WPF / asp.net / Console / ...?


@SirRufo Ah oui, j'ajouterai ConfigureAwait (false) pour voir comment cela affecte les performances. C'est une application console (.Net Framework 4.6.1)


@SirRufo L'ajout de ConfigureAwait (false) dans GetObjectDetailsAsync n'a pas semblé affecter les performances.


Oui, une application console n'a pas du tout de contexte de synchronisation et il n'y a donc aucune synchronisation qui peut avoir un impact sur les performances. C'est la raison pour laquelle je voulais savoir dans quel type d'application le code s'exécute


La version asynchrone de votre code bloque beaucoup. Est-ce intentionnel? Votre GetAllObjectDetailsInBatches doit être une méthode async . Vous ne devez pas Task.WaitAll là-dedans, mais plutôt attendre Task.WhenAll . Votre TryDeserializeResponse doit être la méthode async et la lecture à partir du flux doit être wait response.Content.ReadAsStreamAsync () sans le .Result . Obtenez ces changements et voyez ce que les chiffres disent après cela.


@JohanP J'ai fait intentionnellement rendre GetAllObjectDetailsInBatches synchrone et utiliser Task.WaitAll car il n'y a pas d'autre travail qui peut être fait pendant que la fonction est en cours d'exécution (c'est une console application sans interface utilisateur). Mais bonne prise sur TryDeserializeResponse . C'était une erreur de copier / coller. J'ai fait une modification pour ajouter la version mise à jour.


@davekats il y a beaucoup de travail à faire, toutes les réponses 3k reprendront sur les threads du pool de threads et vous les bloquerez toutes. Votre pool commence avec le moins de threads possible et de nouveaux sont injectés à 2 par seconde, de sorte que votre accès concurrentiel et votre débit sont très faibles et il semble que vous rencontriez une famine de threadpool.


@JohanP J'ai du mal à comprendre comment le blocage du thread principal sur Task.WaitAll affecte le travail en cours sur les threads du pool de threads.


Je parlais plus du .Result sur ReadStreamAsync mais je vois que vous avez changé cela.


3 Réponses :


0
votes

https : //docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreach? view = netframework-4.8

Fondamentalement, le foreach parallèle permet aux itérations de s'exécuter en parallèle afin de ne pas contraindre l'itération à s'exécuter en série, sur un hôte qui n'est pas contraint par le thread, cela aura tendance à améliorer le débit


0 commentaires

1
votes

En bref:

  • Parallel.Foreach () est le plus utile pour les tâches liées au processeur.
  • Task.WaitAll () est plus utile pour les tâches liées aux E / S.

Donc, dans votre cas, vous obtenez des informations de serveurs Web, c'est-à-dire IO. Si les méthodes asynchrones sont correctement implémentées, elles ne bloquent aucun thread. (Il utilisera les ports IO Completion pour attendre) De cette façon, les threads peuvent faire d'autres choses.

En exécutant les méthodes asynchrones GetObjectDetailsAsync (baseUrl, pair.Key) .Result synchronisées, cela bloquera un thread. Ainsi, le pool de threads sera inondé par les threads en attente.

Je pense donc que la solution Task sera mieux adaptée.


1 commentaires

Merci Jeroen, je suis d'accord avec vous mais ma question n'est pas de savoir quelle solution convient le mieux. Je sais que la solution Task est la meilleure solution, mais ce que je vois, c'est que la solution Task est en fait plus lente - un résultat inattendu. Ma question est donc pourquoi, dans ce cas, la solution Parallel est-elle plus rapide?



0
votes

Une raison possible pour laquelle Parallel.ForEach peut s'exécuter plus rapidement parce qu'il crée l'effet secondaire de la limitation. Initialement, x threads traitent les x premiers éléments (où x dans le nombre de cœurs disponibles), et progressivement plus de threads peuvent être ajoutés en fonction de l'heuristique interne. La limitation des opérations d'E / S est une bonne chose, car elle protège le réseau et le serveur qui gère les requêtes contre une surcharge. Votre méthode alternative improvisée de limitation, en effectuant des requêtes par lots de 100, est loin d'être idéale pour de nombreuses raisons, l'une d'entre elles étant que 100 requêtes simultanées représentent beaucoup de requêtes! Une autre est qu'une seule opération de longue durée peut retarder l'achèvement du lot longtemps après l'achèvement des 99 autres opérations.

Notez que Parallel.ForEach n'est pas non plus idéal pour la parallélisation des opérations d'E / S. Il s'est avéré que cela fonctionnait mieux que l'alternative, gaspillant de la mémoire tout du long. Pour de meilleures approches, regardez ici: Comment limiter la quantité de opérations d'E / S asynchrones simultanées?


2 commentaires

Merci pour la bonne réponse! Ainsi, la différence entre la solution par lots et la solution utilisant SemaphoreSlim est que la solution SemaphoreSlim aura toujours 20 requêtes en cours d'exécution en même temps - en commençant une nouvelle requête dès que l'une des 20 se termine, alors que la méthode par lots attendra de lot suivant jusqu'à ce que toutes les demandes soient terminées dans le lot actuel. Je l'aime bien.


@davekats oui, vous avez le concept. :-)