1
votes

Comment séparer la lecture d'un fichier de l'ajout de données à la file d'attente

Mon cas est comme ceci: Je construis une application qui peut lire des données à partir d'une source (fichiers ou base de données) et écrire ces données dans une autre source (fichiers ou base de données).

Donc, fondamentalement, j'ai des objets:

InputHandler -> Queue -> OutputHandler

En regardant une situation où l'entrée est des fichiers, InputHandler aurait: 1. Utilisez FilesReader pour lire les données de tous les fichiers ( FilesReader encapsule la logique de lecture des fichiers et renvoie une collection d'objets) 2. Ajoutez les objets à la file d'attente.

(puis il se répète à l'infini puisque InputHandler a une boucle while qui recherche de nouveaux fichiers tout le temps).

Le problème apparaît lorsque les fichiers sont vraiment gros - FilesReader , qui lit tous les fichiers et les analyse, n'est pas la meilleure idée ici. Ce serait beaucoup mieux si je pouvais en quelque sorte lire une partie du fichier, l'analyser et le mettre dans une file d'attente - et le répéter jusqu'à la fin de chaque fichier.

C'est faisable en utilisant Streams, cependant , Je ne veux pas que mon FilesReader sache quoi que ce soit sur la file d'attente - j'ai l'impression que cela enfreint la règle de séparation des préoccupations de la POO.

Pourriez-vous me suggérer une solution pour ce problème?

// MISE À JOUR

Voici un code qui montre (de manière simplifiée) ce que fait InputHandler :

public class InputHandler {
  public Task Start() {
    while(true) {
      var newData = await _filesReader.GetData();
      _queue.Enqueue(newData);
    }
  }
}


9 commentaires

Veuillez partager le code que vous avez jusqu'à présent. Il est difficile de déduire de la question quel est le problème exact et comment vous conseiller au mieux sans quelques exemples de code.


Vous avez mentionné l'analyse des fichiers. De quel genre d'analyse s'agit-il? Parlez-vous de fichiers CSV par exemple, qui peuvent être analysés ligne par ligne?


@TheodorZoulias Eh bien, je voudrais prendre en charge CSV et JSON. Je suppose que j'aurais CsvFilesReader et JsonFilesReader


@PatrickTucci j'ai ajouté du code et plus d'explications


@Loreno comment analyser une partie d'un fichier JSON? Ces fichiers ne peuvent être analysés que dans leur ensemble!


Pourquoi utilisez-vous deux utilisateurs?


@DaleBurrell J'ai changé mon courrier à un moment donné et créé un nouveau compte. Je ne savais même pas que sur l'un de mes ordinateurs, je suis connecté sur l'autre compte :)


Je ne veux pas que mon FilesReader sache quoi que ce soit sur la file d'attente <--- que voulez-vous dire par là? N'est-il pas nécessaire que le FilesReader sache qu'il existe une file d'attente? Sinon, où enverra-t-il sa sortie pour un traitement ultérieur?


@TheodorZoulias Ce que je veux dire, c'est que je ne veux pas que FilesReader soit en quelque sorte codé en dur pour toujours compter sur la file d'attente. Je pourrais vouloir réutiliser FilesReader dans un autre scénario où il n'y a pas de file d'attente.


3 Réponses :


1
votes

Je ne suis pas tout à fait sûr de comprendre pourquoi l'utilisation d'un flux d'E / S quelconque changerait la façon dont vous ajouteriez des objets à la file d'attente.

Cependant, ce que je ferais personnellement est de configurer un événement personnalisé statique dans votre classe FilesReader, comme OnObjectRead. Utilisez un flux pour lire les fichiers et lorsque vous lisez un enregistrement, déclenchez l'événement et transmettez-lui cet objet / enregistrement.

Ensuite, ayez un abonné à l'événement qui prend l'enregistrement et le pousse dans la file d'attente. Ce serait à l'architecture de votre application de déterminer le meilleur endroit pour placer cet abonné.

En passant, vous avez mentionné que votre InputHandler a une boucle while qui recherche de nouveaux fichiers tout le temps. Je vous recommande fortement de ne pas utiliser de boucle while pour cela si vous ne vérifiez que le système de fichiers. C'est le but de FileSystemWatcher - pour vous donner un moyen efficace d'être immédiatement averti des changements dans le système de fichiers sans que vous ayez à faire une boucle. Sinon, vous broyez constamment le système de fichiers et consommez constamment des E / S disque.


2 commentaires

Bon point à propos de FileSystemWatcher - Je vais l'examiner, merci! Concernant votre première phrase - le problème est que FileReader ne sait rien de la file d'attente - il est censé être un service qui lit les fichiers et les renvoie. Quand je commence à utiliser des flux et à lire / analyser des fichiers partiellement, j'ai besoin d'un moyen de les «pousser» d'une manière ou d'une autre - votre idée avec les événements me semble bonne et, honnêtement, j'y ai pensé aussi.


Je suppose que InputHandler serait l'abonné ici. Mon objet de niveau supérieur est Application - il exécute le InputHandler et le OutputHandler pour s'exécuter dans des threads séparés connectés par une file d'attente.



0
votes

Ce code montre à quoi ressemble le code actuellement. Donc, si j'ai 1000 fichiers, chacun contenant beaucoup et beaucoup de données, _filesReader essaiera de lire toutes ces données et de les renvoyer - et la mémoire serait rapidement épuisée.

Concernant le problème de la consommation de mémoire illimitée, une solution simple consiste à remplacer la _queue par un BlockingCollection . Cette classe a des capacités de délimitation prêtes à l'emploi.

public class InputHandler
{
    private readonly BlockingCollection<string> _buffer
        = new BlockingCollection<string>(boundedCapacity: 10);

    public Task Start()
    {
        while (true)
        {
            var newData = await _filesReader.GetData();
            _buffer.Add(newData); // will block until _buffer
                                  // has less than 10 items.
        }
    }
}

6 commentaires

le problème se produit avant que j'ajoute réellement des données à la file d'attente. Je dois d'abord analyser les fichiers pour même obtenir les données. Mon problème principal est de savoir comment analyser les fichiers partiellement (pas tout à la fois) et ajouter des données à la file d'attente de manière itérative.


@Loreno J'ai abordé le point de votre question pour lequel vous avez fourni un exemple de code. Avoir 1000 fichiers est résolu en définissant le boundedCapacity de la BlockingCollection . Pour l'autre point, sur la façon d'analyser les fichiers par portions, j'ai souligné que c'était impossible puisque vous voulez analyser des formats incassables comme JSON.


Vous avez peut-être mal compris le problème que j'ai. Cependant, merci de m'avoir fait savoir que JSON était "incassable". Je ne m'y attendais pas vraiment. Pour l'instant, j'irai avec une solution où j'aurai FileHandlers. CsvFileHandler fonctionnera en utilisant des flux, tandis que JsonFileHandler chargera toutes les données en mémoire à la fois.


@Loreno Je ne comprends pas comment les flux vont vous aider à équilibrer votre charge de travail. Créer un flux ne représente pratiquement aucun travail. Passer des flux à l'analyseur signifie que vous déléguez tout le travail, la lecture et l'analyse du fichier, à l'analyseur. De plus, je ne sais pas ce qui se passe avec les données analysées de l'entrée CSV partielle. Si vous stockez toutes les données analysées de chaque fichier en mémoire, vous avez un problème similaire de consommation de mémoire élevée à l'autre extrémité du flux de travail, exagéré du fait que les données analysées nécessitent généralement plus de mémoire que les données brutes.


@TheodorZoulias Utiliser un flux pour lire un fichier approximativement VS lire l'intégralité du fichier à la fois - bien sûr, les flux sont une meilleure solution. Je ne passe pas le flux à un analyseur - FilesReader récupère une partie du fichier à partir du flux, le transforme en String, puis le transmet à l'analyseur. Les données analysées sont envoyées dans la file d'attente, à partir de laquelle OutputHandler prend les données et les écrit à un endroit. C'est juste une chose de type producteur / consommateur. Donc, en conclusion, si j'analyse juste une partie du fichier, je peux déjà l'envoyer dans la file d'attente pour un traitement ultérieur, tandis que, en même temps, le reste du fichier est lu.


Il est plus facile de lire partiellement un fichier en utilisant ReadLines , que de gérer le flux directement. Les ruisseaux sont toujours utilisés sous les hottes. La seule option est de les gérer directement ou non.



0
votes

Je pense que j'ai eu une idée. Mon objectif principal est d'avoir FilesReader qui ne repose sur aucun moyen spécifique de transfert des données. Tout ce qu'il devrait faire est de lire les données, de les renvoyer et de ne pas se soucier des files d'attente ou de tout ce que je pourrais utiliser. C'est un travail de InputHandler - il connaît la file d'attente et utilise FilesReader pour obtenir des données à mettre dans cette file d'attente.

J'ai changé FilesReader code> interface un peu. Maintenant, il a une méthode comme celle-ci:

await _filesReader.ReadData(file, data => _queue.Enqueue(data), cancellationToken);

Maintenant, InputHandler appelle la méthode comme ceci:

Task ReadData(IFileInfo file, Action<IEnumerable<IDataPoint>> resultHandler, CancellationToken cancellationToken)


0 commentaires