J'ai donc simulé mon problème de consommateur de producteur et j'ai le code ci-dessous. Ma question est la suivante: comment le consommateur s'arrête-t-il s'il est en constante alors que (vrai).
Dans le code ci-dessous, j'ai ajouté P>
public class ConsumerThread implements Runnable { private BlockingQueue<Integer> queue; private String name; private boolean isFirstTimeConsuming = true; public ConsumerThread(String name, BlockingQueue<Integer> queue) { this.queue=queue; this.name=name; } @Override public void run() { try { while (true) { if (isFirstTimeConsuming) { System.out.println(name+" is initilizing..."); Thread.sleep(4000); isFirstTimeConsuming=false; } try{ if (queue.peek()==null) Thread.currentThread().interrupt(); Integer data = queue.take(); System.out.println(name+" consumed ------->"+data); Thread.sleep(70); }catch(InterruptedException ie) { System.out.println("InterruptedException!!!!"); break; } } System.out.println("Comsumer " + this.name + " finished its job; terminating."); }catch (InterruptedException e) { e.printStackTrace(); } }
5 Réponses :
ou vous pouvez également utiliser une variable pour marquer une opération de fermeture en attente puis casser la boucle et fermer la boucle après: p>
Merci, ma question est davantage en ce qui concerne «Terminer le fil» et la file d'attente »() == null ne le résoudra pas (cela ne fonctionne pas dans le vrai problème). Merci pour la pointe de ne pas utiliser l'interruption.
Dans le monde réel, la plupart des messages sont livrés avec un en-tête d'une sorte qui définit un type de message / sous-type ou peut-être des objets différents. p>
Vous pouvez créer une commande et un type de commande ou de type de message qui indique au fil de faire quelque chose quand il reçoit le message (comme l'arrêt, recharger une table, ajouter un nouvel auditeur, etc.). p>
De cette façon, vous pouvez dire qu'une commande et un fil de contrôle envoient des messages dans le flux de messages normal. Vous pouvez avoir le fil CNC parlant à un terminal opérationnel dans un système à grande échelle, etc. P>
Si votre file d'attente peut vider avant que votre consommateur se termine, vous aurez besoin d'un drapeau pour indiquer le fil quand vous pouvez arrêter. Ajoutez une méthode de setter afin que le producteur puisse dire au consommateur de s'arrêter. Puis modifiez votre code de sorte que, au lieu de: avoir votre code de code p>
B: strong> Réglage d'un drapeau "Du fait" du consommateur et de la lecture dans le producteur échoue également, si: P> Le contraire peut également se produire, et un paquet est laissé de côté non consommé non consommé. p> puis pour vous déplacer, vous voudrez faire une synchronisation supplémentaire avec des mutiles au-dessus et au-dessus de la " BlockingQueue '. P> http : //rosettacode.org/wiki/synchronous_concurrency#java p> Le producteur et le consommateur doit être d'accord sur un objet (ou un attribut de l'objet) qui représente la fin des entrées. Ensuite, le producteur définit cet attribut dans le dernier paquet et le consommateur cesse de le consommer. C'est-à-dire ce que vous avez mentionné dans votre question comme "poison". p> dans l'exemple de code Rosetta ci-dessus, cet "objet" est simplement une chaîne peek code> renvoie
null code>, le producteur a cessé de produire. Et si le producteur a simplement été ralentis? Maintenant, le consommateur quitte et le producteur continue de produire. Donc, l'idée de 'Peeck' -> 'Break' échoue essentiellement.
vide code> appelée "EOF": p>
Oui, c'est ce qu'on appelle «poison» ça marche bien, je me demandais simplement s'il y a une meilleure approche.
@ADHG - Le point de ma réponse était donc: cela semble être la seule technique simple qui fonctionne simplement. Pour moi, cela semble aussi être «propre» (bien que ce soit «propre» est vraiment une question d'opinion personnelle).
Je suis totalement d'accord avec vous. J'ai finalement utilisé cette solution. Merci +1
Vous pouvez utiliser ce modèle Typeafe avec des pilules anti-poison:
public sealed interface BaseMessage { final class ValidMessage<T> implements BaseMessage { @Nonnull private final T value; public ValidMessage(@Nonnull T value) { this.value = value; } @Nonnull public T getValue() { return value; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ValidMessage<?> that = (ValidMessage<?>) o; return value.equals(that.value); } @Override public int hashCode() { return Objects.hash(value); } @Override public String toString() { return "ValidMessage{value=%s}".formatted(value); } } final class PoisonedMessage implements BaseMessage { public static final PoisonedMessage INSTANCE = new PoisonedMessage(); private PoisonedMessage() { } @Override public String toString() { return "PoisonedMessage{}"; } } } public class Producer implements Callable<Void> { @Nonnull private final BlockingQueue<BaseMessage> messages; Producer(@Nonnull BlockingQueue<BaseMessage> messages) { this.messages = messages; } @Override public Void call() throws Exception { messages.put(new BaseMessage.ValidMessage<>(1)); messages.put(new BaseMessage.ValidMessage<>(2)); messages.put(new BaseMessage.ValidMessage<>(3)); messages.put(BaseMessage.PoisonedMessage.INSTANCE); return null; } } public class Consumer implements Callable<Void> { @Nonnull private final BlockingQueue<BaseMessage> messages; private final int maxPoisons; public Consumer(@Nonnull BlockingQueue<BaseMessage> messages, int maxPoisons) { this.messages = messages; this.maxPoisons = maxPoisons; } @Override public Void call() throws Exception { int poisonsReceived = 0; while (poisonsReceived < maxPoisons && !Thread.currentThread().isInterrupted()) { BaseMessage message = messages.take(); if (message instanceof BaseMessage.ValidMessage<?> vm) { Integer value = (Integer) vm.getValue(); System.out.println(value); } else if (message instanceof BaseMessage.PoisonedMessage) { ++poisonsReceived; } else { throw new IllegalArgumentException("Invalid BaseMessage type: " + message); } } return null; } }