8
votes

Dans quelles conditions bloquerezQuuerez-vous. Jetez une exception interrompue?

Supposons que j'ai un fil qui consomme des objets produits par un autre fil. Sa méthode de course est la suivante, avec Inqueue étant un blocageQueue xxx

en outre, un thread différent signalera qu'il n'y a plus d'éléments de travail en interrompant ce fil d'exécution. Prendra () lancer une exception interrompue s'il n'a pas besoin de bloquer pour récupérer le prochain élément de travail. C'est-à-dire que si le producteur signale qu'il est fait de remplir la file d'attente de travail, est-il possible de laisser accidentellement des articles en inquée ou de manquer l'interruption?


1 commentaires

Vous avez presque compris cela. Plutôt que d'avoir le consommateur définir "arrêt" sur true sur une interruption, le producteur définissez-le trop avant d'interrompre le consommateur. Notez que cela a) Gardie les choses jolies en évitant une valeur sentinelle («Poison pilule»), B) gère correctement les réveilles parasites, etc.) est plus générale pour que vous puissiez arrêter délibérément le consommateur si la file d'attente est vide ou non.


5 Réponses :


5
votes

Un bon moyen de signaler la résiliation d'une file d'attente de blocage consiste à soumettre une valeur «poison» dans la file d'attente indiquant qu'un arrêt s'est produit. Cela garantit que le comportement attendu de la file d'attente est honoré. Appelant Thread.interrupt () n'est probablement pas une bonne idée si vous vous souciez d'éliminer la file d'attente.

Pour fournir du code: P>

boolean shutdown = false;
while (!shutdown) {
    try {
        WorkItem w = inQueue.take();
        if (w == QUEUE_IS_DEAD)
          shutdown = true;
        else
          w.consume();
    } catch (InterruptedException e) { 
        // possibly submit QUEUE_IS_DEAD to the queue
    }
}


1 commentaires

Cela semble fonctionner mieux. D'après ce que je peux dire dans de rares cas, l'interruption «fait» peut être livrée avant de prendre () les réveilles en raison de plus de choses dans la file d'attente. Se prévenir, je devais drainer la file d'attente avec une seconde boucle de toute façon.



2
votes

selon Javadoc , la méthode prenez () lancera interrompteException si interrompu en attendant.


1 commentaires

Qu'est-ce que ça veut dire exactement? "" Si interrompu en attendant "" Modifier: Nevermind. J'ai compris la réponse de F fkykko ci-dessous.



-1
votes

Le paquet Java.ConCurrency.Utils a été conçu et mis en œuvre par certains des meilleurs esprits de la programmation simultanée. En outre, interrompre les threads comme moyen de les terminer est explicitement approuvé par leur livre "Concurrence Java dans la pratique". Par conséquent, je serais extrêmement surpris si des articles étaient laissés dans la file d'attente en raison d'une interruption.


2 commentaires

Essayez-le dans un test et soyez "extrêmement surpris" ;-)


"Les gens intelligents ont fait" plus "interruption () interruption () interrompt les threads" n'entraîne pas "les files d'attente de blocage ne vérifient pas l'état d'interruption tant qu'elles sont vides". Par exemple, au moins une implémentation (cf. grepcode.com/file/repository.grepcode.com/java/root/jdk/open jdk / ... ) de ArrayBlockingQueue appelle retrantlock.lockinterruptimativement () , qui vérifie le statut d'interruption du fil (lancant une interruptionException s'il est interrompu) avant d'essayer d'obtenir l'élément suivant.



2
votes

Je me suis demandé à propos de la même chose et de lire le Javadoc pour prendre () je croyais que cela lancerait une exception interrompue seulement après avoir pris tous les éléments de la file d'attente, car si la file d'attente avait des articles, Cela n'aurait pas à "attendre". Mais j'ai fait un petit test:

package se.fkykko.slask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class BlockingQueueTakeTest {

public static void main(String[] args) throws Exception {
    Runner t = new Runner();
    Thread t1 = new Thread(t);
    for (int i = 0; i < 50; i++) {
        t.queue.add(i);
    }
    System.out.println(("Number of items in queue: " + t.queue.size()));
    t1.start();
    Thread.sleep(1000);
    t1.interrupt();
    t1.join();
    System.out.println(("Number of items in queue: " + t.queue.size()));
    System.out.println(("Joined t1. Finished"));

}

private static final class Runner implements Runnable {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(100);
    AtomicLong m_count = new AtomicLong(0);

    @Override
    public void run() {
        try {
            while (true) {
                queue.take();
                System.out.println("Took item " + m_count.incrementAndGet());
                final long start = System.currentTimeMillis();
                while ((System.currentTimeMillis() - start) < 100) {
                    Thread.yield(); //Spin wait
                }
            }
        }
        catch (InterruptedException ex) {
            System.out.println("Interrupted. Count: " + m_count.get());
        }
    }
}

}


0 commentaires

1
votes

Vous ne pouvez pas en général d'interruption les fils d'un ExecutorService code> à partir du code externe si vous avez utilisé ExecutorService :: execute (Runnable) code> pour commencer les discussions, parce que le code externe ne dispose pas d'une référence au discussion code> objets de chacun des fils en cours d'exécution (voir la fin de cette réponse pour une solution si, si vous avez besoin ExecutorService :: execute code>). Toutefois, si vous utilisez à la place ExecutorService :: soumettre (appelable ) code> pour soumettre les emplois, vous revenez d'un Future code>, qui maintient en interne une référence à le fil conducteur une fois appelable :: call () code> commence l'exécution. Ce fil peut être interrompu en appelant Future :: annuler (true) code>. Tout code dans (ou appelé par) appelable code> que vérifie l'état de thread courant d'interruption peut donc être interrompue par le Future code> référence. Cela inclut BlockingQueue :: take () code>, qui, même bloqué, répondra à une interruption du fil. (. Les méthodes de blocage généralement JRE se réveiller si elle est interrompue alors bloqué, se rendent compte qu'ils ont été interrompus, et jeter un InterruptedException code>)

Pour résumer: Future :: cancel () code > et future :: annuler (true) code> deux annuler les travaux futurs em>, tandis que future :: cancel (true) code> aussi interruptions des travaux en cours em> (tant que les travaux en cours de répond à la discussion d'interruption). Aucun des deux Annuler code> invocations affecte le travail qui a déjà réalisé avec succès. P>

Notez qu'une fois qu'un thread est interrompu par l'annulation, un InterruptException code> sera jeté dans le thread (par exemple par BlockingQueue :: take () code> dans ce cas). Cependant, vous a CancellationException code> sera catapultée dans le thread principal la prochaine fois que vous appelez Future :: get () code> sur annulé avec succès futur code> (par exemple un Future code> qui a été annulé avant de se terminer). Ceci est différent de ce que vous attendriez normalement: si un non-annulé appelable code> jette InterruptedException code>, le prochain appel à Future :: get () code> jetteront InterruptedException code>, mais si un annulé appelable code> jette InterruptedException code>, le prochain appel à Future :: get () code> sera à CancellationException code> p>

Voici un exemple qui illustre ceci:. p>

Future 1 was cancelled
Future 0 was cancelled
Thread 2 got value 100
Thread 3 got value 101
Thread 1 got interrupted


0 commentaires