8
votes

Mises à jour manquantes avec verrous et ConcurrentHashMap

J'ai un scénario où je dois maintenir une Map qui peut être remplie par plusieurs threads, chacun modifiant leur List respective (l'identifiant / clé unique étant le nom du thread ), et lorsque la taille de la liste d'un thread dépasse une taille de lot fixe, nous devons conserver les enregistrements dans la base de données.

Classe d'agrégateur

if(//Some condition) {
    Thread.sleep(//2 minutes);
    aggregator.getLock().lock();
    List<T> instrumentList = instrumentMap.values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
    if(instrumentList.size() > 0) {
        saver.persist(instrumentList);
        instrumentMap .values().parallelStream().forEach(x -> x.clear());
        aggregator.getLock().unlock();
    }
}

Il y en a un de plus thread séparé s'exécutant toutes les 2 minutes (en utilisant le même verrou) pour conserver tous les enregistrements dans Map (pour nous assurer que quelque chose persiste toutes les 2 minutes et que la taille de la carte ne devient pas trop grande)

private volatile ConcurrentHashMap<String, List<T>>  instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReentrantLock lock ;

public void addAll(List<T> entityList, String threadName) {
    try {
        lock.lock();
        List<T> instrumentList = instrumentMap.get(threadName);
        if(instrumentList == null) {
            instrumentList = new ArrayList<T>(batchSize);
            instrumentMap.put(threadName, instrumentList);
        }

        if(instrumentList.size() >= batchSize -1){
            instrumentList.addAll(entityList);
            recordSaver.persist(instrumentList); 
            instrumentList.clear();
        } else {
            instrumentList.addAll(entityList);  
        }
    } finally {
        lock.unlock();
    }

}

Cette solution fonctionne très bien dans presque tous les scénarios que nous avons testés, sauf que parfois nous voyons que certains enregistrements ont disparu, c'est-à-dire qu'ils ne sont pas persistants du tout, bien qu'ils aient été bien ajouté à la carte.

Mes questions sont:

  1. Quel est le problème avec ce code?
  2. La ConcurrentHashMap n'est-elle pas la meilleure solution ici?
  3. La List utilisée avec ConcurrentHashMap a-t-elle un problème?
  4. Dois-je utiliser la méthode de calcul de ConcurrentHashMap ici (pas besoin je pense, car ReentrantLock fait déjà le même travail)?


11 commentaires

Je ne suis pas sûr des enregistrements manquants, mais si tous les accès à instrumentMap sont protégés par le verrou , il n'y a aucun avantage à utiliser ConcurrentMap .


@Slaw je suis d'accord que je n'ai pas écrit ceci et que je ne cherche pas à changer cela jusqu'à ce que je comprenne le problème avec le code .. merci pour votre réponse


Règle générale, entre lock () et unlock (), il devrait y avoir un moment (condition) pour vérifier les données partagées (carte) à la place que vous utilisez if (condition). Veuillez vérifier les directives de verrouillage pour le problème du consommateur producteur.


Eh bien, je ne peux pas voir le problème dans le code affiché. Bien que cela ne signifie pas que le problème n'est pas là, un exemple reproductible minimal approprié illustrant le problème serait utile. Une chose à vérifier est tout accès non surveillé. recordSaver.persist passe-t-il jamais la liste à un autre thread sur la ligne d'une manière non bloquante? Je demande parce que vous passez la List elle-même, pas une copie, ce qui signifie qu'un accès non synchronisé peut se produire quelque part. En revanche, votre thread de sauvegarde toutes les deux minutes appelle saver.persist avec une "copie" contenant toutes les valeurs aplaties de la carte.


@Slaw votre point sur l'accès non bloquant - je vérifierai à coup sûr


@Amettez une fois que vous avez compris le problème, postez une réponse.


Je vais certainement @ nits-kk mais cela prendra 15 à 20 jours car ce problème se produit rarement.


@Slaw pouvez-vous s'il vous plaît ajouter votre observation dans les réponses afin que je puisse vous en créditer si cela fonctionne :)


Est-il même nécessaire de stocker les instruments dans une carte? La persistance se fait avec des listes d'instruments, la clé "threadName" semble inutilisée.


juste pour être vraiment sûr - veuillez enregistrer à la fois aggregator.getLock () et lock juste avant de l'acquérir. Avec aggregator.getLock () également lors de la publication :) si les hashcodes (comme la dernière partie de ReentrantLock @ 4682fer34 ou autre) sont exactement les mêmes, au moins nous sommes sûrs que nous synchronisons correctement.


@vanOekel convient que c'est un code mal écrit, ce n'est pas la manière parfaite de le faire.


3 Réponses :


0
votes

Je vois que vous utilisez le parallelStream de ConcurrentHashMap dans un verrou. Je ne connais pas la prise en charge des flux Java 8+, mais une recherche rapide montre que

  1. ConcurrentHashMap est une structure de données complexe, qui avait autrefois des bogues d'accès concurrentiel
  2. Les flux parallèles doivent respecter les restrictions d'utilisation complexes et mal documentées
  3. Vous modifiez vos données dans un flux parallèle

Sur la base de ces informations (et de mon détecteur de bogues de concurrence basé sur l'intestin ™), je parie que la suppression de l'appel à parallelStream pourrait améliorer la robustesse de votre code. De plus, comme mentionné par @Slaw, vous devez utiliser HashMap ordinaire à la place de ConcurrentHashMap si toute l'utilisation de instrumentMap est déjà protégée par un verrou.

Bien sûr, puisque vous ne publiez pas le code de recordSaver , il est possible qu'il comporte également des bogues (et pas nécessairement des bogues liés à la concurrence). En particulier, vous devez vous assurer que le code qui lit les enregistrements à partir du stockage persistant - celui que vous utilisez pour détecter la perte d'enregistrements - est sûr, correct et correctement synchronisé avec le reste de votre système (de préférence en utilisant un , base de données SQL standard).


1 commentaires

Supprimer un appel à parallelStream - peut augmenter la robustesse mais n'est pas lié au bogue auquel nous sommes confrontés (car le bogue n'est jamais observé au moment où nous l'utilisons). Hashmap avec lock ou ConcurrentHashMap.compute l'un d'eux est suffisant et je suis d'accord que la façon dont il est utilisé ici est exagéré ... mais cela n'affecte pas les enregistrements de toute façon. @slaw suggestions - la seule faille que nous avons manquée, c'est-à-dire permettre à la référence de s'échapper vers une méthode (recordSaver.persist) qui n'est pas synchronisée et effacer la liste juste après avoir fait cela ... signifie que les enregistrements peuvent sortir de la portée



0
votes

Il semble que ce soit une tentative d'optimisation là où ce n'était pas nécessaire. Dans ce cas, moins c'est plus et plus simple c'est mieux. Dans le code ci-dessous, seuls deux concepts de concurrence sont utilisés: synchronisé pour garantir qu'une liste partagée est correctement mise à jour et final pour garantir que tous les threads voient la même valeur.

import java.util.ArrayList;
import java.util.List;

public class Aggregator<T> implements Runnable {

    private final List<T> instruments = new ArrayList<>();

    private final RecordSaver recordSaver;
    private final int batchSize;


    public Aggregator(RecordSaver recordSaver, int batchSize) {
        super();
        this.recordSaver = recordSaver;
        this.batchSize = batchSize;
    }

    public synchronized void addAll(List<T> moreInstruments) {

        instruments.addAll(moreInstruments);
        if (instruments.size() >= batchSize) {
            storeInstruments();
        }
    }

    public synchronized void storeInstruments() {

        if (instruments.size() > 0) {
            // in case recordSaver works async
            // recordSaver.persist(new ArrayList<T>(instruments));
            // else just:
            recordSaver.persist(instruments);
            instruments.clear();
        }
    }


    @Override
    public void run() {

        while (true) {
            try { Thread.sleep(1L); } catch (Exception ignored) {
                break;
            }
            storeInstruments();
        }
    }


    class RecordSaver {
        void persist(List<?> l) {}
    }

}


1 commentaires

je vais essayer ça



2
votes

La réponse fournie par @Slaw dans les commentaires a fait l'affaire. Nous laissions l'instance instrumentList s'échapper de manière non synchronisée, c'est-à-dire que l'accès / les opérations se déroulent sur la liste sans aucune synchronisation. Corriger la même chose en passant la copie à d'autres méthodes a fait l'affaire.

La ligne de code suivante est celle où ce problème se produisait

recordSaver.persist (instrumentList); instrumentList.clear ();

Ici, nous permettons à l'instance instrumentList de s'échapper de manière non synchronisée, c'est-à-dire qu'elle est passée à une autre classe (recordSaver.persist) sur laquelle elle devait être actionnée, mais nous effaçons également la liste dans la ligne suivante (dans la classe Aggregator) et tout cela se passe de manière non synchronisée. L'état de la liste ne peut pas être prédit dans l'économiseur d'enregistrement ... une erreur vraiment stupide.

Nous avons résolu le problème en passant une copie clonée de instrumentList à la méthode recordSaver.persist (...). De cette façon, instrumentList.clear () n'a aucun effet sur la liste disponible dans recordSaver pour d'autres opérations.


2 commentaires

vous devez expliquer la réponse en détail, se référer simplement au commentaire ne le coupera pas.


Bien sûr, je vais le faire.