8
votes

Comment mettre en œuvre le consommateur-producteur avec plusieurs consommateurs et plusieurs files d'attente

Supposons qu'il y a 1 producteur P et 2 consommateurs C1 et C2. Et il y a 2 files d'attente Q1 et Q2, à la fois avec une capacité spécifique.

p produira des éléments et le mettrea dans Q1 et Q2 alternativement. L'article est produit pour un consommateur spécifique et ne peut être consommé par d'autres consommateurs. Comment puis-je implémenter ce qui suit en Java: Après avoir démarré les 3 threads, si Q1 est vide, le fil C1 est bloqué jusqu'à ce qu'il soit notifié lorsqu'il y a quelque chose dans Q1. Il en va de même q2. Et P sera bloqué lorsque les deux Q1 et Q2 sont pleins jusqu'à ce qu'il soit notifié lorsque Q1 ou Q2 n'est pas plein.

Je pensais utiliser BlockingQueue, qui bloquera un consommateur lorsque sa file d'attente est vide. Mais le problème est que lorsque l'une des files d'attente est pleine, le producteur sera bloqué. Y a-t-il une structure de données en Java que nous pouvons utiliser pour résoudre ce problème?

mise à jour

J'ai moi-même une solution moi-même, mais je ne suis pas sûr que ce soit efficace. Nous pouvons toujours avoir 2 blocs de blocage. Et lorsque le consommateur prend un article de sa file d'attente, il utilise blockingqueue.take () , il sera donc bloqué lorsqu'il n'y a pas d'élément dans la file d'attente. Lorsque le producteur ajoutez un élément à la file d'attente, il utilise blockingqueue.offer () . Afin qu'il ne soit jamais bloqué par cette opération et sera «faux» si la file d'attente est pleine. De plus, nous gardons un atomicinteger pour indiquer le nombre de files d'attente qui n'est pas pleine. Chaque fois que le producteur p voulait mettre un élément dans une file d'attente, s'il obtienne un faux retour, nous réduisons l'atomicinteger par 1. Lorsqu'il atteint 0, le producteur appelle atomicinteger.wait () . Chaque fois qu'un consommateur prend un article de sa file d'attente, il examine également l'atomicinteger. Quand il est 0, le consommateur l'augmente par 1 et appelez atomicinteger.notify () .

S'il vous plaît laissez-moi savoir si cette solution a du sens.

Merci beaucoup!


11 commentaires

P sera bloqué lorsque Q1 et Q2 sont pleins bloqués sur Que , q1, q2 ou un autre synchroniseur?


Vous n'avez peut-être pas besoin de deux files d'attente. Utilisez un seul blockingQuingQueue, appelez offre (E E, délai d'attente longue, unité TimeUnit) du producteur et appelez prenez des consommateurs, puis vous n'avez pas besoin de vous inquiéter sur les tailles de la file d'attente inégale


Bonjour Zim-Zam O'PootTertoot, merci. J'ai modifié la question: "L'article est produit pour un consommateur spécifique et ne peut être consommé par d'autres consommateurs." Donc, je ne sais pas si dans ce cas, nous pouvons utiliser un seul blocagequeur. C'est en fait un vrai problème que j'ai rencontré.


Salut John Vint, merci. P est censé être bloqué util sur l'une de la file d'attente n'est pas plein. Peu importe ce qu'il est bloqué.


Dans ce cas, il est logique d'utiliser des files d'attente distinctes. Utilisez Offre (E E, Timeout, Unité TimeUnit) à partir du producteur et Prenez des consommateurs afin que les trois threads bloquent jusqu'à ce que les espaces / éléments soient disponibles. Si vous souhaitez toujours pouvoir produire des articles pour la queue1 lorsque la file d'attente2 est pleine ou vice versa, vous voulez probablement deux producteurs au lieu d'une.


Si Q1 est complet est-ce correct pour p pour continuer à mettre les éléments dans q2 ?


Salut la variable misérable, oui.


Bonjour Zim-Zam O'Pootheroot, j'ai ajouté une solution possible dans le poteau. Je ne sais pas si c'est efficace.


Lorsque vous appelez blockingqueue.offer et il renvoie false, que faites-vous avec l'élément que vous avez produit? C'est à dire. Êtes-vous jeté? Si vous ne pouvez pas le jeter, vous devez toujours avoir le bloc producteur (sinon vous aurez besoin de stocker l'article ailleurs). Le problème avec votre atomicinteger.wait () est que attendre ne peut être appelé que dans un bloc synchronisé bloc / méthode, donc si vous êtes Pas prudent, cela causera beaucoup de conflit de verrouillage; Cependant, si vous faites le bloc synchronisé suffisamment fin-grain, votre solution devrait fonctionner correctement


Une autre option consiste à avoir le producteur dormir lorsque atomicinteger atteint 0, et avoir un consommateur interruption le producteur lorsqu'il incrémente atomicinteger - Assurez-vous simplement d'avoir un Catch (InterruptException) Bloc dans le producteur qui défait l'exception. Il y a une course de données potentielle ici si atomicinteger décréments à 0, suivi d'un consommateur incrémentant atomicinteger et interrompt le producteur, suivi du dormeur du producteur; vous aurez besoin de quelque chose comme un Atomicboolcan exécutant qui est vrai lorsque le producteur est en cours d'exécution et false quand il dort, alors votre


Le consommateur appellera interruption si atomicinteger> 0 &&! Exécution qui prend soin de la course de données


3 Réponses :


0
votes

2 commentaires

Merci. Y a-t-il un moyen d'utiliser la simultanéité Java sans autres bibliothèques?


Eh bien, si vous souhaitez mettre en œuvre le modèle de conception du producteur / consommateur (du JMS standard), vous devez utiliser une bibliothèque tierce partie.



1
votes

Avez-vous considéré comme un Service d'exécutif rayé . Cela vous permettra de résoudre votre problème et mettre vos consommateurs dans une piscine qui serait beaucoup plus efficace.


2 commentaires

Merci. Y a-t-il un moyen d'utiliser la simultanéité Java sans autres bibliothèques?


Ce n'est pas une autre bibliothèque, c'est une classe qui utilise la fonctionnalité de service exécutant standard.



0
votes

Peu importe la structure de données / le serveur de messagivers que vous choisissez, vous pouvez sortir des ressources avec l'une d'entre elles. Il y a toujours une limite sur la mémoire ou l'espace disque.

En fait, il n'est pas mal que le producteur soit arrêté.

Si vos files d'attente se remplissent, vous devriez essayer de restaurer la balance : vous pouvez ajouter plus de consommateurs. Vous pouvez améliorer la performance du consommateur. Si cela n'est pas possible, quelque chose devrait vraiment manifester le producteur. C'est un moyen d'éviter des erreurs de mémoire ou aucun espace laissé sur le périphérique .

Enfin, c'est le devoir du centre de données de surveiller les files d'attente de toute façon. Ils devraient vous informer si le degré de remplissage de vos files d'attente atteint une limite, par exemple> 80%.

mise à jour

Si le producteur ne peut pas envoyer sur toutes les files d'attente, car l'une de ses files d'attente est pleine, il est à la hauteur de lui de tamponner, mais la mise en mémoire tampon est quelque chose que les files d'attente devraient faire.


0 commentaires