7
votes

producteur - consommer; Comment le consommateur s'arrête-t-il?

J'ai donc simulé mon problème de consommateur de producteur et j'ai le code ci-dessous. Ma question est la suivante: comment le consommateur s'arrête-t-il s'il est en constante alors que (vrai).

Dans le code ci-dessous, j'ai ajouté P>

public class ConsumerThread implements Runnable
{
 private BlockingQueue<Integer> queue;
 private String name;
 private boolean isFirstTimeConsuming = true;
 public ConsumerThread(String name, BlockingQueue<Integer> queue)
 {
    this.queue=queue;
    this.name=name;
 }

@Override
public void run()
{
    try
    {       
        while (true)
        {   
            if (isFirstTimeConsuming)
            {
                System.out.println(name+" is initilizing...");
                Thread.sleep(4000);
                isFirstTimeConsuming=false;
            }
            try{

                if (queue.peek()==null)
                    Thread.currentThread().interrupt();

                Integer data = queue.take();

                System.out.println(name+" consumed ------->"+data);
                Thread.sleep(70);    

            }catch(InterruptedException ie)
            {
                System.out.println("InterruptedException!!!!");
                break;
            }
        }

        System.out.println("Comsumer " + this.name + " finished its job; terminating.");

    }catch (InterruptedException e)
    {
        e.printStackTrace();
    } 
}


0 commentaires

5 Réponses :


3
votes

N'utilisez pas d'interruption sur le thread, mais plutôt de casser la boucle lorsqu'il n'est plus nécessaire: xxx

ou vous pouvez également utiliser une variable pour marquer une opération de fermeture en attente puis casser la boucle et fermer la boucle après: xxx


1 commentaires

Merci, ma question est davantage en ce qui concerne «Terminer le fil» et la file d'attente »() == null ne le résoudra pas (cela ne fonctionne pas dans le vrai problème). Merci pour la pointe de ne pas utiliser l'interruption.



0
votes

Dans le monde réel, la plupart des messages sont livrés avec un en-tête d'une sorte qui définit un type de message / sous-type ou peut-être des objets différents.

Vous pouvez créer une commande et un type de commande ou de type de message qui indique au fil de faire quelque chose quand il reçoit le message (comme l'arrêt, recharger une table, ajouter un nouvel auditeur, etc.).

De cette façon, vous pouvez dire qu'une commande et un fil de contrôle envoient des messages dans le flux de messages normal. Vous pouvez avoir le fil CNC parlant à un terminal opérationnel dans un système à grande échelle, etc.


0 commentaires

0
votes

Si votre file d'attente peut vider avant que votre consommateur se termine, vous aurez besoin d'un drapeau pour indiquer le fil quand vous pouvez arrêter. Ajoutez une méthode de setter afin que le producteur puisse dire au consommateur de s'arrêter. Puis modifiez votre code de sorte que, au lieu de: xxx

avoir votre code de code xxx


0 commentaires

10
votes

A: Il n'y a tout simplement aucune garantie que, simplement parce que peek renvoie null , le producteur a cessé de produire. Et si le producteur a simplement été ralentis? Maintenant, le consommateur quitte et le producteur continue de produire. Donc, l'idée de 'Peeck' -> 'Break' échoue essentiellement.

B: Réglage d'un drapeau "Du fait" du consommateur et de la lecture dans le producteur échoue également, si:

  1. Le consommateur vérifie le drapeau, trouve qu'il devrait continuer à courir, puis à "prendre"
  2. Entre-temps, le producteur définissait le drapeau pour "ne pas gérer"
  3. Maintenant, les blocs de consommation attendent toujours pour un paquet de fantômes

    Le contraire peut également se produire, et un paquet est laissé de côté non consommé non consommé.

    puis pour vous déplacer, vous voudrez faire une synchronisation supplémentaire avec des mutiles au-dessus et au-dessus de la " BlockingQueue '.

    C: Je trouve "Code Rosetta" pour bien décider de la bonne pratique, dans des situations telles que ceci:

    http : //rosettacode.org/wiki/synchronous_concurrency#java

    Le producteur et le consommateur doit être d'accord sur un objet (ou un attribut de l'objet) qui représente la fin des entrées. Ensuite, le producteur définit cet attribut dans le dernier paquet et le consommateur cesse de le consommer. C'est-à-dire ce que vous avez mentionné dans votre question comme "poison".

    dans l'exemple de code Rosetta ci-dessus, cet "objet" est simplement une chaîne vide appelée "EOF": xxx


3 commentaires

Oui, c'est ce qu'on appelle «poison» ça marche bien, je me demandais simplement s'il y a une meilleure approche.


@ADHG - Le point de ma réponse était donc: cela semble être la seule technique simple qui fonctionne simplement. Pour moi, cela semble aussi être «propre» (bien que ce soit «propre» est vraiment une question d'opinion personnelle).


Je suis totalement d'accord avec vous. J'ai finalement utilisé cette solution. Merci +1



0
votes

Vous pouvez utiliser ce modèle Typeafe avec des pilules anti-poison:

public sealed interface BaseMessage {

    final class ValidMessage<T> implements BaseMessage {

        @Nonnull
        private final T value;


        public ValidMessage(@Nonnull T value) {
            this.value = value;
        }

        @Nonnull
        public T getValue() {
            return value;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            ValidMessage<?> that = (ValidMessage<?>) o;
            return value.equals(that.value);
        }

        @Override
        public int hashCode() {
            return Objects.hash(value);
        }

        @Override
        public String toString() {
            return "ValidMessage{value=%s}".formatted(value);
        }
    }

    final class PoisonedMessage implements BaseMessage {

        public static final PoisonedMessage INSTANCE = new PoisonedMessage();


        private PoisonedMessage() {
        }

        @Override
        public String toString() {
            return "PoisonedMessage{}";
        }
    }
}

public class Producer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    Producer(@Nonnull BlockingQueue<BaseMessage> messages) {
        this.messages = messages;
    }

    @Override
    public Void call() throws Exception {
        messages.put(new BaseMessage.ValidMessage<>(1));
        messages.put(new BaseMessage.ValidMessage<>(2));
        messages.put(new BaseMessage.ValidMessage<>(3));
        messages.put(BaseMessage.PoisonedMessage.INSTANCE);
        return null;
    }
}

public class Consumer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    private final int maxPoisons;


    public Consumer(@Nonnull BlockingQueue<BaseMessage> messages, int maxPoisons) {
        this.messages = messages;
        this.maxPoisons = maxPoisons;
    }

    @Override
    public Void call() throws Exception {
        int poisonsReceived = 0;
        while (poisonsReceived < maxPoisons && !Thread.currentThread().isInterrupted()) {
            BaseMessage message = messages.take();
            if (message instanceof BaseMessage.ValidMessage<?> vm) {
                Integer value = (Integer) vm.getValue();
                System.out.println(value);
            } else if (message instanceof BaseMessage.PoisonedMessage) {
                ++poisonsReceived;
            } else {
                throw new IllegalArgumentException("Invalid BaseMessage type: " + message);
            }
        }
        return null;
    }
}


0 commentaires