6
votes

Java: get + clear atomic pour la carte

Je voudrais implémenter la logique suivante:

-la structure suivante doit être utilisée

 final Map<String, List<Update>> toBeProcessed = new HashMap<>(updatesPerId);
 updatesPerId.clear();
 // iterate over toBeProcessed and process them

-n les producteurs ajouteront des mises à jour à la carte updatesPerId (pour le même id, 2 mises à jour peuvent être ajoutées en même temps)

-one TimerThread fonctionnera de temps en temps et doit traiter les mises à jour reçues. Quelque chose comme:

//Map<String, CopyOnWriteArrayList> keeping the pending updates 
//grouped by the id of the updated object
final Map<String, List<Update>> updatesPerId = new ConcurrentHashMap<>();

Existe-t-il un moyen de sécuriser ce thread logique sans synchroniser la logique d'ajout des producteurs et la logique de timerThread (consommateur)? Je pense à un clear + get atomique mais il semble que ConcurrentMap ne fournit pas quelque chose comme ça. De plus, je dois mentionner que les mises à jour doivent être conservées par un identifiant d'objet mis à jour afin que je ne puisse pas remplacer la carte par une file d'attente ou autre chose.

Des idées? Merci!


2 commentaires

Cela pourrait fonctionner correctement avec un RWLock. Les producteurs acquerront le verrou pour la lecture et le consommateur celui pour l'écriture. Cependant, l'acquisition du verrou pour la lecture lorsque vous écrivez réellement quelque chose n'est pas intuitive.


Pouvez-vous clarifier ce que vous entendez par «sans synchronisation»?


3 Réponses :


4
votes

Vous pouvez tirer parti du fait que ConcurrentHashMap.compute s'exécute de manière atomique .

Vous pouvez mettre dans le updatesPerId comme ceci: p>

Map<String, List<Update>> newMap = new HashMap<>();
for (String key : updatesPerId.keySet()) {
  newMap.put(key, updatesPerId.remove(key));
}
// ... Process entries in newMap.

Il ne s'agit pas d'utiliser computeIfAbsent puis d'ajouter à la valeur de retour, qui ne serait pas atomique.

Ensuite dans votre fil pour supprimer des choses:

for (String key : updatesPerId.keySet()) {
  List<Update> list = updatesPerId.put(key, null);
  updatesPerId.compute(key, (k, list) -> {
    // ... Process the contents of the list.

    // Removes the key/value pair from the map.
    return null;
  });
}

Donc, l'ajout d'une clé à la liste (ou le traitement de toutes les valeurs pour cette clé) peut bloquer si vous le faites essayez de traiter la clé aux deux endroits à la fois; sinon, il ne sera pas bloqué.


Edit: comme indiqué par @StuartMarks, il serait peut-être préférable de simplement tout extraire de la carte d'abord, puis de les traiter plus tard, dans l'ordre pour éviter de bloquer d'autres threads en essayant d'ajouter:

updatesPerId.compute(k, (k, list) -> {
  if (list == null) list = new ArrayList<>();
  // ... add to the list

  // Return a non-null list, so the key/value pair is stored in the map.
  return list;
});


8 commentaires

Mais, puisque vous devez de toute façon changer les deux endroits, vous pouvez aussi bien envelopper la Carte dans une classe distincte de votre choix.


Je copierais le contenu de la liste ailleurs et retournerais null à partir de la fonction de remappage, et traiterais le contenu plus tard. Le traitement au sein de la fonction de remappage peut prendre un certain temps, ce qui peut bloquer d'autres threads souhaitant mettre à jour la carte.


@StuartMarks bien sûr. Est-ce que List thingsToProcess = updatesPerId.remove (key); serait une autre façon?


Vous appelez remove () au lieu de compute () ? Pourrait fonctionner. Je suis préoccupé par la modification simultanée lors de l'itération du jeu de clés. Cela ne provoquera pas CME, mais cela pourrait faire sauter des entrées. J'envisagerais d'itérer le jeu d'entrée et d'appeler remove () sur l'itérateur du jeu d'entrée. De cette façon, vous avez déjà la valeur (liste des choses à traiter) en main sans avoir à faire une autre recherche. Semble un peu plus propre, mais plus verbeux.


@StuartMarks était d'accord. J'ai joué avec le modifier dans ma réponse; pas encore tout à fait compris.


@StuartMarks "sans avoir à faire une autre recherche" s'applique lorsque vous effectuez une itération sur HashMap.entrySet () . En cas de ConcurrentHashMap , il n'y a aucune différence entre appeler remove () sur l'itérateur du jeu d'entrées ou appeler remove (key) sur la carte. Ce n’est même pas plus sûr en ce qui concerne les mises à jour simultanées. Considérez ConcurrentHashMap m = new ConcurrentHashMap <> (); m.put ("toto", 42); Iterator > it = m.entrySet (). Iterator (); Map.Entry E = it.next (); m.put ("toto", 100); System.out.println ("supprimer" + e + "de" + m); it.remove (); System.out.println (m);


@Holger Huh. C'est certainement vrai pour la mise en œuvre actuelle du CHM. Il semble cependant peu judicieux de se fier à cette hypothèse. Je recommanderais toujours la modification via l'itérateur, au cas où l'implémentation CHM changerait, ou au cas où le code appelant serait modifié pour utiliser une implémentation de carte différente.


@Holger Aussi, vous devriez peut-être envisager d'utiliser var lorsque vous mettez un tas de code dans un commentaire. :-)



1
votes

Existe-t-il un moyen de sécuriser ce thread logique sans synchroniser la logique d'ajout des producteurs et la logique de timerThread (consommateur)?

En bref, non - selon ce que vous entendez par «synchronisation».

Le moyen le plus simple est d'envelopper votre Map dans une classe à vous. >

class UpdateManager {
    Map<String,List<Update>> updates = new HashMap<>();
    public void add(Update update) {
        synchronized (updates) {
            updates.computeIfAbsent(update.getKey(), k -> new ArrayList<>()).add(update);
        }
    }
    public Map<String,List<Update>> getUpdatesAndClear() {
        synchronized (updates) {
            Map<String,List<Update>> copy = new HashMap<>(updates);
            updates.clear();
            return copy;
        }
    }
}

4 commentaires

"Y a-t-il un moyen de rendre ce thread logique sûr sans synchroniser la logique d'ajout des producteurs et la logique de timerThread (consommateur)?"


@AndyTurner Oui, j'ai répondu à cela.


Vous ne l'avez pas fait. Vous venez de dire "ça dépend", puis vous avez utilisé la synchronisation.


@AndyTurner J'ai dit "non, ce n'est pas possible". Le "dépend" se réfère à "si vous entendez une synchronisation explicite par le client", par exemple via un verrou. Notez que votre réponse utilise également la synchronisation, simplement masquée.



2
votes

Je suggère d'utiliser LinkedBlockingQueue au lieu de CopyOnWriteArrayList comme valeur de la carte. Avec COWAL, les ajouts deviennent de plus en plus chers, donc l'ajout de N éléments entraîne des performances N ^ 2. L'addition de LBQ est O (1). De plus, LBQ a drainTo qui peut être utilisé efficacement ici. Vous pouvez faire ceci:

updatesPerId.forEach((id, queue) -> {
    List<Update> updates = new ArrayList<>();
    queue.drainTo(updates);
    processUpdates(id, updates);
});

Producteur:

updatesPerId.computeIfAbsent(id, LinkedBlockingQueue::new).add(update);

Consommateur:

final Map<String, Queue<Update>> updatesPerId = new ConcurrentHashMap<>();

C'est quelque peu différent de ce que vous aviez suggéré. Cette technique traite les mises à jour pour chaque identifiant, mais permet aux producteurs de continuer à ajouter des mises à jour à la carte pendant que cela se passe. Cela laisse des entrées de mappe et des files d'attente dans la mappe pour chaque identifiant. Si les identifiants finissent par être beaucoup réutilisés, le nombre d'entrées sur la carte plafonnera à la limite des hautes eaux.

Si de nouveaux identifiants arrivent continuellement et que les anciens identifiants deviennent désaffectés, la carte augmentera continuellement. , ce qui n'est probablement pas ce que vous voulez. Si tel est le cas, vous pouvez utiliser la technique de la réponse d'Andy Turner .

Si le consommateur a vraiment besoin de prendre un instantané et d'effacer toute la carte, je pense que vous devez utiliser le verrouillage, ce que vous vouliez éviter.


0 commentaires