8
votes

"Atomiquement" mettre à jour un tableau entier

J'ai un fil d'écriture unique et un fil de lecture unique pour mettre à jour et traiter un pool de tableaux (références stockées sur la carte). Le rapport des écrivies à lire est presque 5: 1 (la latence des écrivies est une préoccupation).

Le fil de l'écrivain doit mettre à jour peu d'éléments d'un tableau dans la piscine en fonction de certains événements. Toute l'opération d'écriture (tous les éléments) doit être atomique.

Je tiens à vous assurer que le thread de lecteur lit le tableau mis à jour précédé si le thread de l'écrivain est la mise à jour (quelque chose comme volatile mais à l'ensemble de la matrice plutôt que des champs individuels). Fondamentalement, je peux me permettre de lire des valeurs obsolètes mais non bloquez.

En outre, étant donné que les écritures sont si fréquentes, il serait vraiment coûteux de créer de nouveaux objets ou de verrouiller l'ensemble de la matrice en lecture / écriture.

existe-t-il une structure de données plus efficace qui pourrait être utilisée ou utiliser des serrures moins chères?


17 commentaires

Peut-être un Readwritelock ?


Que diriez-vous d'une base de données en mémoire?


Une mise à jour atomique s'applique-t-elle à un seul tableau dans la piscine?


Combien de mémoire supplémentaire êtes-vous prêt à réserver? Double?


Quelle est la taille des matrices?


Les tableaux sont de longueur 20 (surtout doubles). Tous les tableaux dans la piscine sont initialisés au démarrage. Et la taille de la piscine peut être jusqu'à 60 000 matrices.


20 doubles? C'est assez petit.


Dans ce cas, la solution la plus facile est probablement de copier le tableau, de la muter et de remettre le nouveau tableau vers la carte. Une copie de groupe de 20 items est très rapide. Il vous suffit de vous assurer que la nouvelle référence est visible (c'est-à-dire la concurrence ou l'atomériférations par exemple).


@OldcurMudgeon Oui Double La taille va bien tant qu'elle est au démarrage.Je souhaiterait limiter la création d'objets après l'exécution du temps.


@ PatiriciaShanahan Oui Mettez à jour atomique un tableau dans la piscine.


Oui, essayez de copier-ecriver d'abord et voyez si c'est assez rapide.


@assylie actuellement, j'ai une référence volatile. Mais cela impliquerait de créer de nouveaux objets sur chaque écriture. La fréquence d'écriture est très élevée et je souhaite que je crée aussi peu d'objets que possible, même si les objets sont petits.


Je suppose que la création et la copie d'un objet de 20 articles vont prendre 100s de nanosecondes. Pas beaucoup plus. Et GC pour des objets de courte durée est pratiquement libre. Vous devriez profiler et comparer des solutions alternatives avant de prendre une décision.


La question principale est que vous pouvez vous permettre d'attendre en lisant? Ou la lecture devrait être aussi rapide que possible?


@Trequartista ma question est-ce même un vrai problème? Je veux dire, je ne peux pas simplement utiliser de bonnes ooles synchronisées. Je suis tellement fatigué de voir "Je pense que je pourrais ou préoccupé par la possible ...". Si ce n'est pas un problème et que vous n'avez pas de mesures, arrêtez de gaspiller le temps de tout le monde.


@assylia: Pourquoi postez-vous votre réponse dans la zone de commentaire? Je voudrais upvouvert ça ...


@meriton j'ai fait ça.


8 Réponses :


2
votes

Du de cette idée: le thread de l'écrivain ne mute pas le tableau. Il faut simplement files les mises à jour.

Le fil de lecteur, chaque fois qu'il entre dans une session de lecture nécessitant un instantané stable de la matrice, applique les mises à jour en file d'attente sur le tableau, puis lit le tableau. P>

class Update
{
    int position;
    Object value;
}

ArrayBlockingQueue<Update> updates = new ArrayBlockingQueue<>(Integer.MAX_VALUE);

void write()
{
    updates.put(new Update(...));
}

Object[] read()
{
    Update update;
    while((update=updates.poll())!=null)
        array[update.position] = update.value;

    return array;
}


1 commentaires

@Vemv Bon point. Lire () peut appeler mises à jour.Tearray () Pour gérer uniquement des mises à jour limitées. Mais si l'écrivain est si rapide que le lecteur ne peut pas suivre, nous devrons manifester du côté écriture.



2
votes

Une autre idée, étant donné que la matrice ne contient que 20 doubles.

avoir deux matrices, une pour écrire, une pour la lecture. p>

lecteur verrouille le tableau de lecture lors de la lecture. P>

write()
    update write array
    if tryLock()
        copy write array to read array
        unlock()


5 commentaires

Ma seule plainte est que si le lecteur va verrer beaucoup de temps, l'écrivain sera en attente au lieu de mettre à jour le "tableau d'écriture" en réponse à des "événements" éventuels (comme décrit dans la question). Ainsi, déterminer une stratégie optimale pour l'auteur n'est pas simple.


@Vemv Writer ne bloque jamais. S'il ne peut pas verrouiller le tableau de lecture pour le mettre à jour, oubliez-le.


Si trylock échoue et étant donné que l'auteur ne reçoit pas de nouveaux événements à écrire, le lecteur ne pourra jamais observer la dernière écriture.


vrai. Bien que l'hypothèse soit que l'écriture est très occupée.


Vrai aussi :) Mais de toute façon, ma deuxième réponse (qui est inspirée par la tienne) Stackoverflow.com/a/15445130/569050 est sans mutex, non bloquante et coffre-fort. Il n'y a que le coût de la copie et le coût de la duplication de la quantité de tableaux (lecture de versions + écriture)



2
votes

Y a-t-il une structure de données plus efficace? P>

Oui, absolument! Ils sont appelés structures de données persistantes. Ils sont capables de représenter une nouvelle version d'un vecteur / carte / etc simplement en stockant les em> les différences em> par rapport à une version précédente. Toutes les versions sont immuables, ce qui les rend appropriés pour la concurrence (écrivains n'interférer pas / bloquer les lecteurs, et vice versa). P>

Afin d'exprimer des modifications, on stocke des références à une structure de données persistante dans une référence Tapez tel que atomicreference code> et modifie ce que ces références pointaient - pas les structures elles-mêmes em>. p>

Clojure fournit une implémentation de la bande de haut niveau de structures de données persistantes. Ils sont écrits en Java pur et efficace. P>

Le programme suivant expose comment on aborderait votre problème décrit à l'aide de structures de données persistantes. P>

import clojure.lang.IPersistentVector;
import clojure.lang.PersistentVector;

public class AtomicArrayUpdates {

    public static Map<Integer, AtomicReference<IPersistentVector>> pool
        = new HashMap<>();
    public static Random rnd = new Random();
    public static final int SIZE = 60000;
    // For simulating the reads/writes ratio
    public static final int SLEEP_TIMÉ = 5;

    static {        
        for (int i = 0; i < SIZE; i++) {
            pool.put(i, new AtomicReference(PersistentVector.EMPTY));
        }
    }

    public static class Writer implements Runnable {   
        @Override public void run() {
            while (true) {
                try {
                    Thread.sleep(SLEEP_TIMÉ);
                } catch (InterruptedException e) {}

                int index = rnd.nextInt(SIZE);
                IPersistentVector vec = pool.get(index).get();

                // note how we repeatedly assign vec to a new value
                // cons() means "append a value".
                vec = vec.cons(rnd.nextInt(SIZE + 1)); 
                // assocN(): "update" at index 0
                vec = vec.assocN(0, 42); 
                // appended values are nonsense, just an example!
                vec = vec.cons(rnd.nextInt(SIZE + 1)); 

                pool.get(index).set(vec);

            }
        }
    }

    public static class Reader implements Runnable {
        @Override public void run() {
            while (true) {
                try {
                    Thread.sleep(SLEEP_TIMÉ * 5);
                } catch (InterruptedException e) {}

                IPersistentVector vec = pool.get(rnd.nextInt(SIZE)).get();
                // Now you can do whatever you want with vec.
                // nothing can mutate it, and reading it doesn't block writers!
            }
        } 
    }

    public static void main(String[] args) {
        new Thread(new Writer()).start();
        new Thread(new Reader()).start();
    }
}


0 commentaires

0
votes

La variation suivante est inspirée par Ma réponse précédente et L'un des de Zhong.j.yu.

Les écrivains n'interfèrent pas / bloquent les lecteurs et vice-versa, et il n'y a pas de problèmes de sécurité / de visibilité à fil ou de raisonnement délicat en cours . P>

public class V2 {

    static Map<Integer, AtomicReference<Double[]>> commited = new HashMap<>();
    static Random rnd = new Random();

    static class Writer {
        private Map<Integer, Double[]> writeable = new HashMap<>();
        void write() {        
            int i = rnd.nextInt(writeable.size());   
            // manipulate writeable.get(i)...
            commited.get(i).set(writeable.get(i).clone());
        }
    }

    static class Reader{
        void read() {
            double[] arr = commited.get(rnd.nextInt(commited.size())).get();
            // do something useful with arr...
        } 
    }

}


0 commentaires

0
votes
  • Vous avez besoin de deux références statiques: readarray code> et Writerarray code> et un simple mutex à suivre lorsque l'écriture a été modifiée. P> LI>

  • avoir une fonction verrouillée appelée ChangewriRraRay apporter des modifications à une cycopie de Writerarray: P>

    chaîne synchronisée [] ChangewriRraRay (chaîne [] WritearrayCopyCopyCopy, autres paramètres vont ici) { // Ici, apportez des modifications à DeepCopy of Writerarray P>

          //then return deepCopy
          return writeArrayCopy;
    }
    
  • Notez que ChangewriRraRay code> est une programmation fonctionnelle avec efficacement aucun effet secondaire car il renvoie une copie qui n'est ni ReaderAray code> Writerarray code> . p> li>

  • Celui qui calcule ChangewriRraRay code> doit l'appeler comme Writerarray = Changewrayarraray (Writerarray.deepcopy ()) Code>. P> LI>

  • Le mutex est modifié par Changementwriearraray code> et updatreaderraray code> mais est uniquement vérifié par updatreaderray code>. Si le mutex est défini, updateReadArray code> signalerai simplement la référence de ReadArray code> au bloc réel de WriteArray code> p> li>

    EDIT: P>

    @Vemv concernant la réponse que vous avez mentionnée. Tandis que les idées sont identiques, la différence est significative: les deux références statiques sont statique code> de sorte que pas de temps consacré à la copie des modifications dans le Readarray code>; Le pointeur de lavearray code> est déplacé sur le point sur writarray code>. Effectivement, nous échangeons au moyen d'une matrice TMP que ChangewriRraRay code> génère si nécessaire. De plus, le verrouillage ici est minimal car la lecture ne nécessite pas de verrouillage dans le sens où vous pouvez avoir plus d'un lecteur à tout moment. P>

    En fait, avec cette approche, vous pouvez conserver un nombre de lecteurs simultanés et vérifier le compteur pour être zéro pour pouvoir mettre à jour readarray code> avec writearray code> ; Encore une fois, promouvoir que la lecture ne nécessite aucun verrou. p> ul>


2 commentaires

Exactement la même chose que l'une des réponses de Zhong Stackoverflow.com/a/15444421/569050


@Vemv J'ai la modification à "Ajouter" un commentaire qui devait être long à ajouter ici.



1
votes

Je ferais comme suit:

  • Synchronisez le tout et voyez si la performance est suffisamment bonne. Considérant que vous n'avez qu'un seul fil écrivain et un fil de lecteur, la conflit sera faible et cela pourrait fonctionner assez bien p>

    //If all the keys and arrays are constructed before the writer/reader threads 
    //start, no need for a ConcurrentMap - otherwise use a ConcurrentMap
    private final Map<Key, AtomicReference<double[]>> map = new HashMap<> ();
    
    public void write(Key key, double value, int index) {
        AtomicReference<double[]> ref = map.get(key);
        double[] oldArray = ref.get();
        double[] newArray = oldArray.clone();
        newArray[index] = value;
        //you might want to check the return value to see if it worked
        //or you might just skip the update if another writes was performed
        //in the meantime
        ref.compareAndSet(oldArray, newArray);
    }
    
    public double[] read(Key key) {
        return map.get(key).get(); //check for null
    }
    
  • S'il est trop lent, j'aurais l'écrivain de faire une copie de la matrice, de modifier certaines valeurs et de remettre le nouveau tableau vers la carte. Notez que Les copies de tableau sont très rapides - généralement, un réseau de 20 éléments tiendrait probablement moins de 100 nanosecondes p>

    private final Map<Key, double[]> map = new HashMap<> ();
    
    public synchronized void write(Key key, double value, int index) {
        double[] array = map.get(key);
        array[index] = value;
    }
    
    public synchronized double[] read(Key key) {
        return map.get(key);
    }
    


1 commentaires

Un point mineur sur les objets de courte durée libres d'allouer: alors qu'ils n'augmentent pas le coût d'une collecte de déchets, ils augmentent leur fréquence et augmentent ainsi la collecte globale des déchets. Bien sûr, que les frais généraux sont si petits qu'il est très peu probable d'importer.



0
votes

Améliorer la réponse de @ zhong.j.yu, c'est vraiment une bonne idée de faire la queue des écritures au lieu d'essayer de les exécuter quand elles se produisent. Cependant, nous devons nous attaquer au problème lorsque les mises à jour se présentent si vite que le lecteur s'éteindrait sur des mises à jour entrant en permanence. Mon idée est que si la lecture n'effectue que si la lecture ne fait que faire la queue avant la lecture et ignorer les écritures suivantes (celles-ci seraient abordé par la prochaine lecture).

Vous devrez écrire votre propre file d'attente synchronisée. Il sera basé sur une liste liée et ne contiendrait que deux méthodes: P>

public synchronised Element cut();


0 commentaires

0
votes

J'ai une solution amusante en utilisant trois tableaux et une bascule booléenne volatil. Fondamentalement, les deux threads ont sa propre matrice. En outre, il existe un tableau partagé contrôlé via la bascule.

Lorsque l'auteur se termine et que la bascule le permet, elle copie le tableau nouvellement écrit dans le tableau partagé et retourne la bascule. P>

De même, avant le Le lecteur démarre, lorsque la bascule le permet, elle copie le tableau partagé dans son propre tableau et retourne la bascule. P>

public class MolecularArray {
    private final double[] writeArray;
    private final double[] sharedArray;
    private final double[] readArray;

    private volatile boolean writerOwnsShared;

    MolecularArray(int length) {
        writeArray = new double[length];
        sharedArray = new double[length];
        readArray = new double[length];
    }

    void read(Consumer<double[]> reader) {
        if (!writerOwnsShared) {
            copyFromTo(sharedArray, readArray);
            writerOwnsShared = true;
        }
        reader.accept(readArray);
    }

    void write(Consumer<double[]> writer) {
        writer.accept(writeArray);
        if (writerOwnsShared) {
            copyFromTo(writeArray, sharedArray);
            writerOwnsShared = false;
        }
    }

    private void copyFromTo(double[] from, double[] to) {
        System.arraycopy(from, 0, to, 0, from.length);
    }
}
  • Cela dépend du "thread de l'écrivain unique et du lecteur unique". Li>
  • il ne bloque jamais. Li>
  • Il utilise une quantité de mémoire constante (bien que énorme). Li>
  • Appels répétés vers Lire Code> sans intervenant écrire code> ne copie pas et vice versa. li>
  • Le lecteur ne voit pas nécessairement les données les plus récentes, mais elle voit les données du premier écriture code> démarré après le précédent lire code>, le cas échéant. li> ul>

    Je suppose que cela pourrait être amélioré à l'aide de deux tableaux partagés. p> p>


0 commentaires