J'ai une liste de commandes dans lesquelles j'essaie de traiter 2000 à la fois. Ce que je veux faire est de traiter chacun des 2000 lots de commande en même temps et une fois que tous sont terminés, retournez. Dans l'exemple de code ci-dessous, je prends une liste de commandes et j'envoie le lot à CreateOrders lorsque vous l'ajoutez aux commandes terminées, car j'ai besoin de toutes les renvoyer. Comment puis-je traiter en parallèle ces lots de 2000 dans ce cas?
public List<Order> BatchOrders(List<Order> orders) { var completedOrders = new List<Order>(); int batchSize = 2000; //Create orders in batch for (int i = 0; i < orders.Count(); i += batchSize ) { var batchOrders = orders.Skip(i).Take(batchSize).ToList(); completedOrders.AddRange(CreateOrders(batchOrders)); } return completedOrders; }
4 Réponses :
Essayez une méthode d'aide comme celle-ci:
public List<Order> BatchOrders(List<Order> orders) { var completedOrders = new ConcurrentBag<List<Order>>(); var opts = new ParallelOptions() { MaxDegreeOfParallelism = 4 }; int batchSize = 2000; var batches = orders.Batch(batchSize).ToList(); Parallel.For(0,batches.Count,opts, batchNum => { var batch = batches[batchNum]; var startId = batchNum * batchSize; completedOrders.Add(CreateOrders(batch, startId)); }); return completedOrders.SelectMany(c => c).ToList(); }
Ensuite, pour gérer chaque lot en parallèle, quelque chose comme:
public List<Order> BatchOrders(List<Order> orders) { var completedOrderBatches = new ConcurrentBag<List<Order>>(); var opts = new ParallelOptions() { MaxDegreeOfParallelism = 4 }; Parallel.ForEach(orders.Batch(),opts, batch => { completedOrderBatches.Add(CreateOrders(batch)); }); return completedOrderBatches.SelectMany(c => c).ToList(); }
Si vous avez besoin le numéro de lot dans CreateOrders, vous pouvez alors utiliser Parallel.For au lieu de Paralle.ForEach. Tout comme utiliser pour
au lieu de foreach
.
public static class BatchExtensions { public static IEnumerable<List<T>> Batch<T>(this IEnumerable<T> col, int batchSize = 2000) { var batch = new List<T>(batchSize); foreach (var o in col) { batch.Add(o); if (batch.Count == batchSize) { var rc = batch; batch = new List<T>(batchSize); yield return rc; } } if (batch.Count > 0) { yield return batch; } } }
super tx je vais vérifier cela l'apprécier
List
n'est pas thread-safe. Vous ne devez pas utiliser AddRange
de cette manière.
@AlexanderPetrov bonne prise. Réponse mise à jour avec accumulateur thread-safe.
génial merci beaucoup @DavidBrowne c'est super appréciez tous les commentaires et l'aide
Quelque chose comme ça?
public List<Order> BatchOrders(List<Order> orders) { int batchSize = 2000; List <Task<IEnumerable<Order>>> tasks = new List<Task<IEnumerable<Order>>>(); //Create orders in batch for (int i = 0; i < orders.Count; i += batchSize) { var batchOrders = orders.Skip(i).Take(batchSize).ToList(); // Run CreateOrders as a task and store the task tasks.Add(Task.Run(() => CreateOrders(batchOrders))); } var allTasks = tasks.ToArray(); // Wait till all the tasks are complete Task.WaitAll(allTasks); var completedOrders = new List<Order>(); //Merge the results foreach (var task in allTasks) completedOrders.AddRange(task.Result); return completedOrders; }
merci Jasper, c'est aussi une bonne option en utilisant Task dans ce cas. Merci tout cela est une aide énorme
Une amélioration mineure, un cas de coin, vraiment - il n'y a aucune garantie que vous pouvez toujours Take (batchSize)
, disons qu'il y a 2001 éléments et que la taille du lot est de 2000
De plus, vous utilisez .WaitAll qui est une opération de blocage
WaitAll
implémente l'exigence de une fois que tout est terminé
.
Oui, c'est correct, mais nous pourrions utiliser Task.WhenAll
puis renvoyer une Task
qui peut être wait
ed et rendre tout le code asynchrone
@ ironstone13 Quelle amélioration suggérez-vous? S'il n'y a que 1
élément dans la collection, .Take (2000)
renverra simplement cet élément.
Je lis cela comme une simple question d'équilibre. Si vous avez 2001, pourquoi ne pas le diviser en 1000 et 1001?
@RufusL, oui, vous avez raison, c'est exactement ce qui se passe, mon erreur. Confirmé cela avec des documents et le code source. Merci!
Merci beaucoup pour cette contribution.
Je pense que les réponses qui sont déjà fournies sont excellentes!
Je voudrais partager les miennes, car j'adore le code déclaratif et concis.
Veuillez noter que bien que ce code soit meilleur lisibilité , elle a une efficacité d'exécution moins bonne , car la collection source est énumérée autant de fois qu'il y a de lots.
Les méthodes d'extension sont définies sur IReadonlyCollection
et pas sur IEnumerable
pour exclure les énumérations évaluées paresseusement, qui chargent les données à partir de sources de données externes.
Premièrement, quelques assistants d'extension
async Task<IEnumerable<CreatedOrder>> BatchOrders(List<Order> orders, int batchSize) { var batches = orders .Batch(batchSize) .Select(batch => Task.Run(() => CreateOrders(batch))) .ToArray(); var result = (await Task.WhenAll(batches)).SelectMany(x=>x); return result; }
Maintenant, votre méthode est plutôt courte et concise
public static class BatchExtensions { public static IEnumerable<T> TakePart<T>(this IReadOnlyCollection<T> data, int batchNumber, int batchSize) => data .Skip(batchNumber * batchSize) .Take(batchSize); public static IEnumerable<IEnumerable<T>> Batch<T>(this IReadOnlyCollection<T> data, int batchSize) => Enumerable .Range(0, data.Count / batchSize) .Select(index => TakePart(data, index, batchSize)); }
Pour info, vous devriez pouvoir simplement .Take (batchSize)
Oui, @RufusL, merci, vous avez raison. Super d'apprendre à connaître quelque chose que je ne savais pas avant. J'ai mis à jour mon message. Merci!
Merci. J'ai pu utiliser ParallelFor et cela a semblé bien fonctionner dans ce cas, je vais essayer celui-ci plus tard et vérifier les performances sur tout cela. Je dois également obtenir la position de départ dans CreateOrders, comme mon message d'origine avec skip and take "i", j'en ai besoin, car dans CreateOrders, je l'utilise pour créer un nom de fichier. La réponse avec ParallelFor ci-dessus a ajouté cela et cela semble bien fonctionner. Merci encore à tous.
Voici comment vous pouvez le faire avec l'aide de la Bibliothèque TPL Dataflow . Vous auriez besoin de deux blocs, un BatchBlock pour effectuer le traitement par lots et un TransformManyBlock
pour traiter chaque lot.
List<Order> BatchOrders(List<Order> orders, CancellationToken cancellationToken = default) { var batchBlock = new BatchBlock<Order>(batchSize: 2000); var tranformBlock = new TransformManyBlock<Order[], Order>(batch => { return CreateOrders(batch.ToList()); }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 5, CancellationToken = cancellationToken }); batchBlock.LinkTo(tranformBlock, new DataflowLinkOptions() { PropagateCompletion = true }); foreach (var order in orders) { batchBlock.Post(order); } batchBlock.Complete(); var results = new List<Order>(); while (tranformBlock.OutputAvailableAsync().Result) { while (tranformBlock.TryReceive(out var order)) { results.Add(order); } } tranformBlock.Completion.GetAwaiter().GetResult(); return results; }
Cette bibliothèque est intégrée à .NET Core. Les avantages sont:
Task.WaitAll
par exemple) Et les inconvénients:
Pourquoi voulez-vous choisir le numéro que vous traitez à la fois? Pourquoi ne pas laisser au système d'exécution, qui sait quelles ressources sont disponibles?
@JasperKent, c'est une bonne question! Cela pourrait être dû à un système en aval qui a un débit limité, mais je ne fais que deviner.
Ils sont transmis à cette API externe et le problème est que le système après un certain montant ne fonctionne pas bien, nous avons donc la possibilité de définir un numéro à envoyer dans le lot à la fois.
«Ce que je veux faire, c'est traiter chacun des 2000 lots de commandes en même temps» et «le système après un certain montant ne fonctionne pas bien» sont contradictoires. Dans ce cas, vous enverrez peut-être des dizaines de milliers de commandes simultanément. Que fera alors le backend?
Ou est-ce que 2.000 s'exécutent séquentiellement? :)
@Oguzozgul Le système externe peut gérer plusieurs lots, mais chaque lot après une certaine taille a des problèmes :).
Maintenant c'est le problème de quelqu'un d'autre je suppose :) Bonne chance