0
votes

Quel est le meilleur modèle pour un cache d'écriture thread-safe dans la base de données?

J'ai une méthode qui pourrait être appelée par plusieurs threads, pour écrire des données dans une base de données. Pour réduire le trafic de la base de données, je cache les données et les écris en masse.

Maintenant, je voulais savoir, y a-t-il un meilleur (par exemple, un modèle sans verrouillage) à utiliser?

Voici un exemple comment je le fais en ce moment?

    public class WriteToDatabase : IWriter, IDisposable
    {
        public WriteToDatabase(PLCProtocolServiceConfig currentConfig)
        {
            writeTimer = new System.Threading.Timer(Writer);
            writeTimer.Change((int)currentConfig.WriteToDatabaseTimer.TotalMilliseconds, Timeout.Infinite);
            this.currentConfig = currentConfig;
        }

        private System.Threading.Timer writeTimer;
        private List<PlcProtocolDTO> writeChache = new List<PlcProtocolDTO>();
        private readonly PLCProtocolServiceConfig currentConfig;
        private bool disposed;

        public void Write(PlcProtocolDTO row)
        {
            lock (this)
            {
                writeChache.Add(row);
            }
        }

        private void Writer(object state)
        {
            List<PlcProtocolDTO> oldCachce = null;
            lock (this)
            {
                if (writeChache.Count > 0)
                {
                    oldCachce = writeChache;
                    writeChache = new List<PlcProtocolDTO>();
                }
            }

            if (oldCachce != null)
            {
                    using (var s = VisuDL.CreateSession())
                    {
                        s.Insert(oldCachce);
                    }
            }

            if (!this.disposed)
                writeTimer.Change((int)currentConfig.WriteToDatabaseTimer.TotalMilliseconds, Timeout.Infinite);
        }

        public void Dispose()
        {
            this.disposed = true;
            writeTimer.Dispose();
            Writer(null);
        }
    }


1 commentaires

Vous voudrez peut-être jeter un œil à ceci: Pourquoi est-ce que lock (this) {...} est mauvais?


3 Réponses :


0
votes

Pas vraiment. Vous pourriez trouver cette réponse utile: quel est le coût de l'instruction de verrouillage?

Le meilleur commentaire est celui de Henk Holterman:

lock (Monitor.Enter / Exit) est très bon marché, moins cher que des alternatives comme un Waithandle ou Mutex.

Mais que faire si c'était (un peu) lent, préféreriez-vous avoir un programme rapide avec des résultats incorrects?

D'autres sur ce thread font l'observation très valable qu'un verrou ne vous ralentit pas autant qu'une condition de concurrence ou un problème de multi-threading difficile à trouver.

Je dirais que votre idée générale de regrouper les informations puis de les écrire en masse dans la base de données est juste - si vous vous attendez à beaucoup de hits sur votre base de données. Si ce n'est qu'occasionnel, je ne m'inquiéterais pas du tout - il suffit d'avoir 1 contexte de base de données par thread et de laisser la bibliothèque de base de données le régler. (Si vous avez tellement de threads que vous devez vous soucier du nombre de contextes de base de données que vous créez, je vous conseille de reconsidérer votre conception.)


2 commentaires

J'ai besoin de lots, il y a peut-être des centaines d'inserts par seconde


Ensuite, d'une manière ou d'une autre, vous aurez besoin d'une serrure. À un moment donné, vous devez décider «seul ce fil peut accéder à ces données». Il n'y a pas moyen de contourner cela. Vous en aurez besoin lors de l'ajout à votre lot et lorsque vous déciderez «J'ai un lot à envoyer et je dois réinitialiser ma collection». Il peut aider à « pré-batch » - à savoir former une collection d'une taille donnée sur vos fils (qui n'a pas besoin de verrouillage) et, après avoir atteint une taille donnée, puis l' ajouter au résultat agrégé (qui aurait besoin d' un verrou ). Dans le meilleur des cas, il y aurait une contestation limitée; le pire des cas est que chaque thread essaie d'ajouter un lot en même temps.



0
votes

Au lieu d'utiliser une List mutable et de la protéger à l'aide de verrous, vous pouvez utiliser une ImmutableList et cesser de vous inquiéter de la possibilité que la liste soit mutée par le mauvais thread au mauvais moment. Avec des collections immuables, il est bon marché et facile de transmettre des instantanés de vos données, car vous n'avez pas besoin de bloquer les rédacteurs (et peut-être aussi les lecteurs) lors de la création de copies des données. Une collection immuable est un instantané en soi.

Bien que vous n'ayez pas à vous soucier du contenu de la collection, vous devez toujours vous soucier de sa référence. En effet, la mise à jour d'une collection immuable signifie le remplacement de la référence à l'ancienne collection par une nouvelle collection. Vous ne voulez pas que plusieurs threads échangent des références de manière incontrôlable, vous avez donc toujours besoin d'une sorte de synchronisation. Vous pouvez toujours utiliser des lock , mais il est assez facile d'éviter complètement le verrouillage en utilisant des opérations interverrouillées. L'exemple ci-dessous utilise la méthode pratique ImmutableInterlocked.Update , qui permet de faire une mise à jour et un échange atomiques en une seule ligne:

private ImmutableList<PlcProtocolDTO> writeCache
    = ImmutableList<PlcProtocolDTO>.Empty;

public void Write(PlcProtocolDTO row)
{
    ImmutableInterlocked.Update(ref writeCache, x => x.Add(row));
}

private void Writer(object state)
{
    IList<PlcProtocolDTO> oldCache = Interlocked.Exchange(
        ref writeCache, ImmutableList<PlcProtocolDTO>.Empty);

    using (var s = VisuDL.CreateSession())
        s.Insert(oldCache);
}

private void Dump()
{
    foreach (var row in Volatile.Read(ref writeCache))
        Console.WriteLine(row);
}

Voici la description de la méthode ImmutableInterlocked.Update :

Mute une valeur sur place avec une sémantique de transaction de verrouillage optimiste via une fonction de transformation spécifiée. La transformation est réessayée autant de fois que nécessaire pour gagner la course au verrouillage optimiste.

Cette méthode peut être utilisée pour mettre à jour tout type de variables de type référence. Son utilisation peut être augmentée avec l'avènement des nouveaux types d'enregistrement C # 9 , qui sont immuables par défaut et sont destinés à être utilisés en tant que tels.


4 commentaires

Pensez-vous qu'une ImmutableList est plus performante qu'un verrou? Je ne sais pas, mais avec ça, la liste est recréée à chaque fois ... Peut-être que j'ai besoin de faire des tests avec le benchmark dotnet


et est-il sûr qu'aucune écriture n'est perdue? pourquoi ai-je alors besoin de l'ImmutableList?


@ user1237393 la création de nouvelles listes immuables est souvent abordable car elles sont implémentées en interne sous forme d'arbres binaires, constitués de nœuds pour la plupart réutilisables. Les performances des opérations de base sur une collection immuable ne sont pas excellentes, comparées aux mêmes opérations sur des collections normales ou simultanées (elles sont généralement au moins 10 fois plus lentes). Le gain vient chaque fois que vous avez besoin de prendre des instantanés des données. Si vous n'avez jamais besoin de snapshots, il n'y a aucun avantage en termes de performances et le seul avantage qui reste est celui de l'architecture (qui est bien entendu subjectif).


@ user1237393 si vous utilisez correctement les opérations verrouillées ou les verrous, aucune mise à jour ne sera perdue. Avec un lock (pessimiste) lock vous devez protéger à la fois l'ajout à la collection et l'échange de références. Avec le (optimiste) ImmutableInterlocked.Update , vous pouvez être sûr qu'en cas de course avec un autre thread, l'opération de mise à jour sera répétée sur la version renvoyée précédemment par le thread qui a remporté la course. Le prix de l'élimination de la serrure est la possibilité de tourner un peu (et de créer des déchets pour le GC). Dans tous les cas, il n'y a aucune possibilité pour une mise à jour de s'échapper.



0
votes

Il y a quelques problèmes que je peux voir avec le code basé sur la minuterie.

  • Même dans la nouvelle version du code, il est toujours possible de perdre des écritures au redémarrage ou à l'arrêt. La méthode Dispose n'attend pas l'achèvement du dernier rappel de minuterie qui peut être actuellement en cours. Étant donné que les rappels de minuterie s'exécutent sur les threads de pool de threads, qui sont des threads d'arrière-plan, ils seront abandonnés lorsque le thread principal se termine.
  • Il n'y a pas de limite sur la taille des lots, cela va casser lorsque vous atteignez une limite de l'API de stockage sous-jacente (par exemple, les bases de données sql ont une limite sur la longueur des requêtes et le nombre de paramètres utilisés).
  • puisque vous effectuez des E / S, l'implémentation devrait probablement être asynchrone
  • Cela se comportera mal sous charge. en particulier, comme la charge ne cesse d'augmenter, les lots deviendront plus gros et donc plus lents à exécuter, une exécution plus lente du lot à son tour donnera au suivant un temps supplémentaire pour accumuler les éléments les rendant encore plus lents, etc ... en fin de compte soit l'écriture du lot échouera (si vous atteignez une limite sql ou que la requête expire) ou l'application va simplement manquer de mémoire. Pour gérer une charge élevée, vous n'avez en réalité que deux choix: appliquer une contre-pression (c'est-à-dire ralentir les producteurs) ou réduire les écritures.
  • vous souhaiterez peut-être autoriser un nombre limité d'écrivains simultanés si la base de données peut le gérer.
  • Il existe une condition de writeTimer.Change sur le champ disposed qui peut entraîner une ObjectDisposedException dans writeTimer.Change .

Je pense qu'un meilleur modèle qui résout les problèmes ci-dessus est le modèle consommateur-producteur, vous pouvez l'implémenter dans .net avec un ConcurrentQueue ou avec la nouvelle api System.Threading.Channels.

Gardez également à l'esprit que si votre application se bloque pour une raison quelconque, vous perdrez les enregistrements qui sont encore mis en mémoire tampon.

Voici un exemple d'implémentation utilisant des canaux:

public interface IWriter<in T>
{
    ValueTask WriteAsync(IEnumerable<T> items);
}

public sealed record Options(int BatchSize, TimeSpan Interval, int MaxPendingWrites, int Concurrency);

public class BatchWriter<T> : IWriter<T>, IAsyncDisposable
{
    readonly IWriter<T> writer;
    readonly Options options;
    readonly Channel<T> channel;
    readonly Task[] consumers;

    public BatchWriter(IWriter<T> writer, Options options)
    {
        this.writer = writer;
        this.options = options;

        channel = Channel.CreateBounded<T>(new BoundedChannelOptions(options.MaxPendingWrites)
        {
            // Choose between backpressure (Wait) or
            // various ways to drop writes (DropNewest, DropOldest, DropWrite).
            FullMode = BoundedChannelFullMode.Wait,

            SingleWriter = false,
            SingleReader = options.Concurrency == 1
        });

        consumers = Enumerable.Range(start: 0, options.Concurrency)
            .Select(_ => Task.Run(Start))
            .ToArray();
    }

    async Task Start()
    {
        var batch = new List<T>(options.BatchSize);

        var timer = Task.Delay(options.Interval);
        var canRead = channel.Reader.WaitToReadAsync().AsTask();

        while (true)
        {
            if (await Task.WhenAny(timer, canRead) == timer)
            {
                timer = Task.Delay(options.Interval);
                await Flush(batch);
            }
            else if (await canRead)
            {
                while (channel.Reader.TryRead(out var item))
                {
                    batch.Add(item);

                    if (batch.Count == options.BatchSize)
                    {
                        await Flush(batch);
                    }
                }

                canRead = channel.Reader.WaitToReadAsync().AsTask();
            }
            else
            {
                await Flush(batch);
                return;
            }
        }

        async Task Flush(ICollection<T> items)
        {
            if (items.Count > 0)
            {
                await writer.WriteAsync(items);
                items.Clear();
            }
        }
    }

    public async ValueTask WriteAsync(IEnumerable<T> items)
    {
        foreach (var item in items)
        {
            await channel.Writer.WriteAsync(item);
        }
    }

    public async ValueTask DisposeAsync()
    {
        channel.Writer.Complete();
        await Task.WhenAll(consumers);
    }
}


2 commentaires

Ce n'était qu'un échantillon rayé, pour obtenir l'idée :-) Nous avons plusieurs emplacements dans notre code où nous collectons certaines modifications et les traitons dans un lot. En réalité, je redémarre la minuterie, lorsque le rappel de la minuterie est terminé. Et aussi j'arrête la minuterie avec une mise au rebut et j'écris le cache dans la base de données.


@ user1237393 limitez-vous également la taille des lots et la limitation des producteurs?