1
votes

Est-il possible et / ou conseillé d'utiliser plusieurs System.Threading.Channels dans un seul objet?

Je travaille sur une application web .net core 3.0 et j'ai décidé d'utiliser System.Threading.Channels dans un service singleton. Le niveau supérieur de mes services de requêtes étendues injecte ce singleton pour accéder à ses canaux.

J'ai décidé d'utiliser ce modèle pour découpler les requêtes (qui produisent des mises à jour en direct pour d'autres clients connectés) de l'exécution de ces mises à jour. P >

L'implémentation d'UN canal dans un objet a de nombreux exemples.

Quelqu'un peut-il me dire s'il est possible / conseillé d'utiliser plusieurs canaux dans mon singleton?

Je n'ai pas encore rencontré de problèmes pour créer plusieurs canaux et les «démarrer» lorsque le singleton est créé. Je ne suis tout simplement pas arrivé à un point où je peux tester avec plusieurs demandes de clients frappant différents canaux sur le singleton pour voir si cela fonctionne bien. (Ou pas du tout? ...)

Ma principale motivation pour utiliser plusieurs canaux est que je veux que le singleton fasse différentes choses en fonction du type d'élément dans le canal.

public class MyChannelSingleton 
{
    public Channel<MyType> TypeOneChannel = Channel.CreateUnbounded<MyType>();
    public Channel<MyOtherType> TypeTwoChannel = Channel.CreateUnbounded<MyOtherType>();

    public MyChannelSingleton() 
    {
        StartChannels();
    }

    private void StartChannels() 
    {
        // discarded async tasks due to calling in ctor
        _ = StartTypeOneChannel();
        _ = StartTypeTwoChannel();
    }

    private async Task StartTypeOneChannel()
    {
        var reader = TypeOneChannel.Reader;

        while (await reader.WaitToReadAsync())
        {
            if (reader.TryRead(out MyType item))
            {
                // item is sucessfully read from channel
            }
        }
    }

    private async Task StartTypeTwoChannel()
    {
        var reader = TypeTwoChannel.Reader;

        while (await reader.WaitToReadAsync())
        {
            if (reader.TryRead(out MyOtherType item))
            {
                // item is sucessfully read from channel
            }
        }
    }
}

Je prévois également de ne jamais "compléter" les chaînes et de les rendre disponibles pendant toute la durée de vie de l'application.


7 commentaires

Le plus gros problème avec le démarrage de nombreux threads - en particulier ceux de traitement - est que vous pourriez avoir plus de threads en cours d'exécution qui sont utiles. Souvent, vous pouvez confier ce travail à un Threadpool. | Je ne me souviens pas avoir eu des cours de ce cours auparavant, alors je dois le regarder.


Le manque d'exemple de code là-dedans est vraiment décourageant. Mais cela semble être une Queue avec quelques considérations spéciales pour le multitâche. Particulièrement FullMode et SingleReader / SingleWriter semblent utiles. Je dois me souvenir de celui-là.


Il semble que le principal problème avec la chaîne soit l'équilibre entre le producteur et le consommateur lorsque vous utilisez des chaînes illimitées. Si vos consommateurs sont suffisamment rapides pour traiter les données des canaux afin de suivre le rythme des producteurs, ils peuvent les utiliser.


Je pense que le démarrage de plusieurs canaux va à l'encontre de l'objectif même de cette classe. Il prend par défaut SingleReader et Wrtier = false, car vous êtes censé le partager une instance entre plusieurs lecteurs et écrivains sans avoir à vous soucier de la synchronisation. TryRead et TryWrite se chargeront de la synchronisation pour vous (si ces deux propriétés restent fausses). | Je suppose que ces fonctions utilisent un mécanisme de verrouillage simple. Les mêmes types que j'écrirais ou que les collections simultanées ont.


Je m'attends à avoir plusieurs écrivains sur chacun de ces canaux avec des sursauts / pics d'écriture occasionnels à long terme. J'ai besoin de différents canaux pour différents types pour mon cas d'utilisation. Comme vous, j'ai parcouru la documentation clairsemée et je ne peux pas déterminer si l'utilisation de plusieurs canaux dans un objet est bonne ou mauvaise ...


Différents canaux à des fins différentes semblent tout à fait corrects. L'essentiel est que vous n'essayiez pas plusieurs canaux dans le même but. Une façon de gérer les pics est de choisir une capacité élevée. | Le gros problème sera d'équilibrer les parties écrivain et lecteur. Vous voulez généralement avoir autant de lecteurs que possible / nécessaire, le reste étant des lecteurs. À moins bien sûr que la capacité ne soit complètement utilisée - alors vous avez besoin de lecteurs plus que d'écrivains. Ou plus grande capacité. Ou éventuellement plus de puissance de serveur, y compris un cluster.


D'accord, j'ai finalement compris à quoi servent les chaînes illimitées. Aucune limite de capacité fixe, mais peut en conséquence se heurter à un MOO. Si vous devez gérer des pics et que votre capacité maximale ne fonctionne pas, cela semble être la voie à suivre. Si le serveur est toujours à court de mémoire, il s'agit d'un problème matériel. | Considérez cependant que .NET a une limite stricte pour la mémoire virtuelle. docs.microsoft.com/en- us / windows / win32 / memory /… ? | Je suppose qu'après cela, le GC devient incapable de gérer cela et vous devez utiliser la gestion directe de la mémoire.


3 Réponses :


0
votes

Malheureusement, je ne peux ne pas trouver le code source . Et appeler la Documentation clairsemée serait un euphémisme. Je peux donc au mieux vous dire "si c'était ma classe, comment je le ferais".

Le gros problème lié à la présence de plusieurs canaux en mémoire - en particulier illimités - serait la fragmentation de la mémoire provoquant un MOO précoce. En effet, même avec une seule illimitée, un gros problème serait de devoir agrandir la collection. List n'est guère plus qu'un wrapper autour d'un T [] avec un support de croissance automatique. Un autre problème avec les listes illimitées est que tôt ou tard vous êtes à court d'index .

Comment résoudre ce problème? Une liste liée . Dans environ 90% des cas, une liste liée serait la dernière collection que je considérerais même. Les 10% restants sont des files d'attente et des constructions semblables à des files d'attente. Et les chaînes ressemblent très à une file d’attente. Sur ces 10% de cas, dans 9%, j'utiliserais simplement ce que fait l'implémentation de la file d'attente. Il s’agit du 1% restant.

Pour un accès aléatoire, la liste liée est la pire collection possible. Pour les files d'attente, c'est faisable. Mais pour éviter les MOO liés à la fragmentation dans .NET? Pour minimiser le coût de la croissance? Pour contourner la limite du hard array? Là, la liste liée est absolument imbattable .

Et si ça ne fait pas ça? Il devrait être possible de créer votre propre version de chaîne qui fait cela et de simplement la remplacer.


7 commentaires

Les canaux ne sont pas des tampons (et n'ont pas de problèmes de réallocation), les listes liées sont plus chères pour chaque opération que les listes et bien pire, elles ne sont pas sûres pour les threads. Un ConcurrentQueue serait mieux mais cela ne fournit pas d'écriture en lecture asynchrone. Le code source est sur Github, pas sur la source de référence


Les listes BTW sont moins chères à l'insertion que les listes liées car l'insertion coûte 0 jusqu'à ce que vous deviez redimensionner la liste. Quelque chose qui peut facilement être évité en spécifiant simplement une capacité. Cela n'a même pas besoin d'être précis. Les listes liées, en revanche, doivent allouer un autre nœud pour chaque insertion.


@PanagiotisKanavos Je ne me vois pas appeler Channel a Buffer. Je l'ai comparé à une file d'attente, qui est l'équivalent le plus proche. Au mieux, je ne sais pas si un ConcurrentQueue serait meilleur. | Les listes ne sont moins chères lors de l'insertion que si elles ne sont pas trop longues. Bien que la croissance se produise rarement, le coût de toute opération de croissance augmente le niveau de remplissage de la ligne. Ce petit impact de devoir rechercher et réaffecter une référence est assez comparable à celui de copier 100 éléments dans la liste nouvellement développée. Et nous parlions explicitement d'une chaîne illimitée. Alors bébé va grandir.


Le problème du MOO s'applique uniquement à List, qui utilise un tableau comme tampon. Les canaux n'utilisent pas de tampons et n'ont donc pas de tels problèmes. Et les benchmarks réels montrent que les listes chaînées sont beaucoup plus chères que le coût absolu 0 de la définition d'une valeur dans un tableau, surtout une fois que vous prenez en compte le coût des allocations et du garbage collection. Peu importe la mise en cache du processeur et les problèmes de localité qui rendent l'utilisation d'un tampon beaucoup plus rapide que de devoir retourner à la mémoire principale pour trouver le nœud suivant.


Je ne sais pas au mieux si un ConcurrentQueue serait meilleur . Infiniment mieux. Les listes liées ne sont pas sûres pour les threads, point final. Les files d'attente simultanées sont.


Que cette réponse est fausse et même pas pertinente pour la question - les listes liées ne sont pas thread-safe. Le code source est disponible dans le référentiel CoreFX sur Github avec l'ensemble de la source .NET Core


@PanagiotisKanavos: "les listes liées ne sont pas sûres pour les threads" C'est pourquoi je n'ai jamais rien dit de près . Le mécanisme de Thread Safety n'était pas pertinent pour cette discussion. Il s'agissait entièrement de l'utilisation de la mémoire et de problèmes similaires dans les cas de demande élevée, avec plusieurs canaux. | TL; DR: Vous n'êtes pas d'accord avec des choses que je n'ai jamais dites, jamais impliquées et honnêtement, je ne peux pas comprendre comment vous les lisez dans mon message.



1
votes

Un Channel est juste une file d'attente asynchrone thread-safe. Il n'effectue aucun traitement par lui-même, il s'agit simplement d'un stockage FIFO en mémoire passif. Vous pouvez en avoir autant que vous le souhaitez.

Vous pourriez profiter du fait qu'un Channel expose séparément un Reader et un Writer , pour limiter l'accès des clients de votre classe à la fonctionnalité minimale dont elle a besoin. En d'autres termes, au lieu d'exposer des propriétés de type Channel , vous pouvez envisager d'exposer des propriétés de type ChannelWriter ou ChannelReader .

La création de chaînes illimitées doit également être effectuée avec prudence. Un seul canal mal utilisé pourrait faire de votre application une victime de OutOfMemoryException assez facilement.

Une alternative à l'exposition des propriétés de type ChannelReader pourrait être d'exposer IAsyncEnumerable s.


0 commentaires

6
votes

Vous pouvez en utiliser autant que vous le souhaitez, à condition de les utiliser correctement. En fait, utiliser un service d'arrière-plan (essentiellement un singleton) qui expose un pipeline de traitement est une manière très courante de les utiliser dans .NET Core.

Les canaux ne sont pas que des files d'attente asynchrones. Ils sont similaires aux blocs DataFlow - ils peuvent être utilisés pour créer des pipelines de traitement, chaque bloc / travailleur traitant les données d'un tampon d'entrée / ChannelReader et transmettant les résultats à un tampon de sortie / ChannelWriter. Les blocs DataFlow gèrent le traitement asynchrone via les tâches elles-mêmes. Avec les canaux, nous devons gérer nous-mêmes les tâches des travailleurs.

Un concept très important que nous devons garder à l'esprit est que les chaînes ne sont pas accessibles directement. En fait, dans presque tous les cas, ils ne devraient même pas être exposés en tant que champs ou propriétés. Dans la plupart des cas, seul un ChannelReader est nécessaire. Dans certains cas, par exemple en tête d'un pipeline, un ChannelWriter peut être exposé. Ou non.

Travailleurs / étapes individuels

Une étape de travail typique ressemblerait à ceci

private ChannelReader<MyType> ActiveOnly(ChannelReader<MyType> reader,CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<MyType>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        var inpStream=reader.ReadAllAsync(token)
                            .Where(it=>it.IsActive);
        await foreach(var item from inpStream)
        {
             await writer.WriteAsync(item);
        }
    },token).ContinueWith(t=>channel.TryComplete(t));

    return channel.Reader;    
}

Certaines choses à noter:

  • Vous pouvez créer plusieurs tâches si vous le souhaitez et utiliser Task.WhenAll pour attendre que tous les collaborateurs se terminent avant de fermer le canal.
  • Vous pouvez utiliser un canal borné pour empêcher l'accumulation de nombreux messages si le pipeline n'est pas assez rapide.
  • Si l'annulation est signalée, la lecture du canal d'entrée et de la tâche de travail sera annulée.
  • Une fois la tâche de travail terminée, que ce soit parce qu'elle a été annulée ou rejetée, le canal sera fermé.
  • Une fois le canal "principal" terminé, la fin des étapes passera d'une étape à l'autre.

Combinaison d'étapes

Plusieurs étapes peuvent être combinées en passant son lecteur de sortie au lecteur d'entrée d'un autre, par exemple:

interface IQueuedService<T>
{
    Task PostAsync(T input);
}


0 commentaires