6
votes

Thread dormant dans un Thread-Pool

Disons que nous avons un pool de threads avec un nombre limité de threads.

executor.execute(() -> {
    try {
        Thread.sleep(3000L);
    } catch (InterruptedException ignore) {}
});

Maintenant, disons qu'une des tâches actives doit dormir pendant 3 secondes (pour une raison quelconque). p>

Executor executor = Executors.newFixedThreadPool(3);

Comment pouvons-nous implémenter un tel pool de threads de manière à ce que, lorsqu'une tâche dort (ou attend sur un moniteur / condition ), le thread 1 peut être utilisé efficacement pour exécuter une autre tâche?

1 Par thread Je ne parle pas du thread Java "physique", car ce serait impossible tant que le thread est en veille. Ce que je veux dire, c'est que le pool de threads doit avoir une implémentation abstraite qui virtuellement semble permettre à un thread d'exécuter une autre tâche pendant le sommeil. Le point clé est qu'il y a toujours N tâches en cours d'exécution simultanément (non en veille).

Un peu similaire à la façon dont un moniteur gère l'accès à une région critique:

  • Si un thread attend sur une ressource, la ressource peut être utilisée par un autre thread.
  • Si le thread est notifié, il est placé dans l'ensemble d'attente pour (ré) accéder à cette ressource.


8 commentaires

Faites-lui faire un travail au lieu de dormir, je suppose.


@PavelSmirnov Pouvez-vous construire une abstraction fonctionnelle à ce sujet? Un qui tient compte des cas d'attente sur une ressource? Que faire si le travail exécuté se met également en veille (exécute un autre travail), puis un autre, puis un autre, etc.


eh bien, si vous avez un travail à faire qui nécessite un verrou, un thread peut essayer d'acquérir le verrou et ne faire le travail que s'il réussit. Sinon, il peut basculer vers un autre travail en attente dans une file d'attente et essayer de le faire, ou revenir à la tâche du thread d'origine. Le point que je voulais montrer est que vous feriez mieux de faire un travail, si c'est possible, au lieu de simplement dormir.


Cela me rappelle les coroutines dans lua - cherchez-le. Cela peut être une source d'inspiration pour vous.


Question interessante. Je ne pense pas que vous obtiendrez une réponse satisfaisante car vous essayez essentiellement d'implémenter un programmateur de threads sur le système d'exploitation. Vous pouvez obtenir en utilisant des verrous et des threads d'arrière-plan P> K, avec des méthodes personnalisées de "veille".


@PavelSmirnov Semble trop de responsabilité à mettre sur l'utilisateur de l'API du pool de threads pour s'assurer qu'il exécute une autre tâche si un verrou ne peut pas être obtenu, et pour répéter ce code dans chaque tâche. Sinon, votre argument est valable: ne pas écrire de tâches en sommeil.


@ me-dev.io Lecture très inspirante en effet. Cependant, comment suspendre une tâche et lui permettre d’être notifiée sur une ressource qu’elle attend?


J'ai mis à jour ma réponse - je pense que c'est la plus proche que vous obtiendrez ... mais je pense toujours à une meilleure solution


3 Réponses :


1
votes

Utilisation de ScheduledExecutorService code> pour exécuter la tâche après un délai serait préférable à l'utilisation de Thread.sleep () dans un thread.

ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
executor.schedule(() -> { ... }, 3000, TimeUnit.MILLISECONDS);

Cependant, dans votre exemple, le Le pool de threads n'a que 3 threads. Jusqu'à ce que vous atteigniez des dizaines, voire des centaines de threads, le fait d'avoir des threads en veille sur une machine de classe serveur ne devrait pas avoir beaucoup d'impact.


4 commentaires

Que faire si la tâche doit s'endormir au milieu de l'opération, pas au début?


@anddero, il devrait appeler la méthode de planification seule avec le délai approprié


@anddero Peut-être diviser la tâche en deux en renvoyant Future à partir de la première tâche et en planifiant la deuxième tâche uniquement lorsque get () renvoie la valeur. Il est difficile de dire ce que vous faites exactement.


@KarolDowbecki Disons qu'il y a des cas où les threads obtiennent un accès exclusif à une ressource. Ensuite, une tâche d'un autre thread tente d'accéder à cette ressource, mais doit attendre. Pendant le temps d'attente, une autre tâche pourrait être exécutée. Mais il y a ce petit problème que la tâche d'attente doit se poursuivre dès que cette ressource devient disponible pour elle.



0
votes

J'ai implémenté un exemple de travail minimal qui fait essentiellement ce que je pense que vous voulez.

Une interface de tâche (un peu comme l'interface exécutable, juste avec un contexte passé pour effectuer l'attente)

package io.medev.stackoverflow;

import java.util.Comparator;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;

public class TimeEfficientExecutor implements Executor {

    private final BlockingQueue<Task> taskQueue;
    private final CountDownLatch latch;
    private volatile boolean alive;

    public TimeEfficientExecutor(int threads) {
        this.taskQueue = new PriorityBlockingQueue<>(10, Comparator.comparingLong(Task::guessExecutionTimeMillis));
        this.latch = new CountDownLatch(threads);
        this.alive = true;

        for (int i = 0; i < threads; i++) {
            Thread thread = new Thread(new TimeEfficientExecutorRunnable());
            thread.start();
        }
    }

    @Override
    public void execute(Runnable runnable) {
        execute(Task.wrap(runnable));
    }

    public void execute(Runnable runnable, long guessedExecutionTimeMillis) {
        execute(Task.wrap(runnable, guessedExecutionTimeMillis));
    }

    public void execute(Task task) {
        this.taskQueue.offer(task);
    }

    public void shutdown() {
        this.alive = false;
    }

    public void awaitShutdown() throws InterruptedException {
        this.latch.await();
    }

    public void awaitShutdown(long timeout, TimeUnit timeUnit) throws InterruptedException {
        this.latch.await(timeout, timeUnit);
    }

    private class TimeEfficientExecutorRunnable implements Runnable {

        @Override
        public void run() {
            try {
                while (TimeEfficientExecutor.this.alive) {
                    Task task = TimeEfficientExecutor.this.taskQueue.poll();

                    if (task != null) {
                        try {
                            task.run(new IdleTaskContext());
                        } catch (Exception e) {
                            // TODO: logging
                        }
                    }
                }
            } finally {
                TimeEfficientExecutor.this.latch.countDown();
            }
        }
    }

    private class IdleTaskContext implements Task.Context {

        @Override
        public void idle(BooleanSupplier condition) throws InterruptedException {
            idle(condition, Long.MAX_VALUE);
        }

        @Override
        public void idle(long timeout, TimeUnit timeUnit) throws InterruptedException {
            idle(() -> false, timeout, timeUnit);
        }

        @Override
        public void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException {
            idle(condition, System.currentTimeMillis() + timeUnit.toMillis(timeout));
        }

        private void idle(BooleanSupplier condition, long idleUntilTs) throws InterruptedException {
            long leftMillis = idleUntilTs - System.currentTimeMillis();

            while (TimeEfficientExecutor.this.alive && !condition.getAsBoolean() && leftMillis >= 1L) {
                Task task = TimeEfficientExecutor.this.taskQueue.poll(leftMillis, TimeUnit.MILLISECONDS);
                leftMillis = idleUntilTs - System.currentTimeMillis();

                if (task != null) {
                    if (leftMillis >= 1L && task.guessExecutionTimeMillis() < leftMillis) {
                        task.run(new IdleTaskContext());
                    } else {
                        TimeEfficientExecutor.this.taskQueue.offer(task);
                    }
                }
            }
        }
    }
}


0 commentaires

0
votes

Ce que vous demandez, c'est essentiellement l'implémentation de coroutines / fibres par-dessus le thread JVM / OS. Une belle discussion a été donnée par Sanhong Li sur la façon dont les ingénieurs d'Alibaba ont mis en œuvre une telle construction - l'idée est au lieu de compter sur le planificateur de threads du système d'exploitation, vous devez vous fier à votre propre sélecteur.

Voir aussi projet Loom pour les fibres (fils verts utilisateur).


0 commentaires