-1
votes

Comment assurer l'ordre de traitement dans des flux parallèles?

Je suis en train de faire l'ordre dans des flux parallèles basés sur une valeur de champ. Peut-être que je vais introduire un exemple abstrait:

disons que nous avons des utilisateurs de classe xxx

et nous avons la liste de ces utilisateurs: xxx

Je veux forcer le flux Parellel pour traiter chaque utilisateur par priorité, disons que nous avons 100 utilisateurs avec 0 priorité, 100 utilisateurs avec 1 priorité et 100 utilisateurs avec 2 priorités et 100 utilisateurs avec 2 priorités.

Je veux Démarrez les utilisateurs de processus avec priorité 2 uniquement lorsque la priorité 1 est effectuée et lorsque la priorité 0 est terminée.

Je pense

mvn install -t 4

pourrait être l'approche que je recherche (les premiers modules indépendants de la construction). Est-il possible de le faire dans les flux Java? Utilisez également des alternatives sont possibles.

Mon approche consiste à diviser vers une liste spécifique par priorité, puis à traiter la liste par liste


1 commentaires

Pourquoi voulez-vous contrôler l'ordre de traitement lorsque l'opération n'est terminée que lorsque tous les éléments ont été traités de toute façon?


3 Réponses :


0
votes

séquentiel vs parallèle n'est pas identique que commande .

Si vous avez un flux commandé et effectuez des opérations garantissant la maintenance de la commande, il n'est pas important si le flux est traité en parallèle ou séquentiel; La mise en œuvre doit conserver la commande.

Dans votre cas (seulement si votre liste ne respecte pas la commande que vous souhaitez ..) Vous pouvez utiliser un comparateur spécifique ou laisser utilisateur implémente comparable basé dans la priorité . Puis triez votre liste avant de commencer à effectuer d'autres opérations de flux. Parallèle ou séquentiel ne donnera pas de résultat différent

ou à l'aide de types de collecte spécifiques tels que triède ou PriorityQueue

Une certaine attention sur Prioritorkeuee L'itérateur fourni dans la méthode itérateur () n'est pas garanti de traverser les éléments de la prioritéBlockingQueue dans un ordre particulier.

Vous devez donc trier l'élément pendant le streaming pour garder la commande -> flux (). Tri () ou simplement utiliser méthode de sondage


0 commentaires

1
votes

Pour traiter les utilisateurs en blocs, par priorité, mais traiter les utilisateurs de la même priorité en parallèle, premier groupe des utilisateurs par priorité, puis traiter chaque groupe séparément.

// Group users by priority
TreeMap<Integer, List<User>> usersByPriority = users.stream()
        .collect(Collectors.groupingBy(User::getPriority, TreeMap::new, Collectors.toList()));

// Process groups in priority order
for (List<User> list : usersByPriority.values()) {

    // Process users of current priority in parallel
    list.parallelStream().forEach(user -> {
        // process user here
    });

    // We won't loop to next priority until all users with current priority has been processed
}


2 commentaires

@CODECALE Vous pouvez remplacer le foreachordedededededededededededededededed () avec un URD> pour en boucle, si vous ne voulez pas de flux imbriqués.


Le problème n'est pas le moyen fonctionnel ou impératif de faire cela, mais la verbosité du code en chaîne. Je dirais qu'une approche plus appropriée consiste à extraire la deuxième boucle / flux dans une autre méthode (refactorisation de la méthode d'extraction). Cela améliorera la lisibilité du flux ci-dessus.



0
votes

Si vous souhaitez traiter les 100 premiers avec la priorité 1 code>, alors ce dernier 100, etc., il n'est ni parallèle ni simultané, mais en réalité la séquence. Ces sous-listes partielles peuvent être traitées en parallèle. PRIOTYQUESUE CODE> A > ou tritedmap code> sont des façons d'aller.

triedmap: strong> p>

Utilisez le Treeet code> Mise en œuvre à l'intérieur du Collecteurs.GroupingBy Code> Méthode: P>

for (List<User> users : queue) {
    users.stream().parallel()...
}


0 commentaires