7
votes

Java comment éviter d'utiliser Thread.sleep () dans une boucle

De mon côté, je démarre deux threads appelés producteur et consommateur. Les deux contiennent la boucle while (true) . La boucle du producteur est un serveur UDP et ne nécessite donc pas de veille. Mon problème est dans la boucle du consommateur. La boucle consommateur supprime les objets de la file d'attente liée et la transmet à une fonction pour un traitement ultérieur. D'après ce qui a été recherché, il n'est pas recommandé d'utiliser le sommeil de thread dans une boucle car parfois O / S ne sera pas libéré à la fin du temps défini. Si je supprime la mise en veille des threads lorsque l'application est idéale, cela fait glisser le processeur de 20 à 30%.

class Consumer implements Runnable {

    String str;  
    ConcurrentLinkedQueue<String> queue;

    Consumer(ConcurrentLinkedQueue<String> queue) {
        this.queue = queue;  
    }

    @Override
    public void run() {

        while (true) {
            try {

                while ((str = queue.poll()) != null) {

                    call(str);  // do further processing

                   }
            } catch (IOException e) {
                ferpt.felog("svr class", "consumer", "consumer thread", e.getClass().getName() + ": " + e.getMessage());
                break;
            }

            try {
                Thread.sleep(500);
            } catch (InterruptedException ex) {

                ferpt.felog("svr class", "consumer","sleep", ex.getClass().getName() + ": " + ex.getMessage());
            }

        }

    }

}

class Producer implements Runnable {
    private DatagramSocket dsocket;
    FError fer = new FError();

    int port =1548;
    ConcurrentLinkedQueue<String> queue;

    Producer(ConcurrentLinkedQueue<String> queue){
        this.queue = queue; 
    }

    @Override
    public void run() {

        try {

            // Create a socket to listen on the port.
            dsocket = new DatagramSocket(port);
            // Create a buffer to read datagrams into.
            byte[] buffer = new byte[30000];
            // Create a packet to receive data into the buffer
            DatagramPacket packet = new DatagramPacket(buffer,
            buffer.length);

            while (true) {
                try {

                   // Wait to receive a datagram
                    dsocket.receive(packet);
                    //Convert the contents to a string,
                    String msg = new String(buffer, 0, packet.getLength());

                    int ltr = msg.length();
                     // System.out.println("MSG =" + msg);

                    if(ltr>4)
                    {

                        SimpleDateFormat sdfDate = new SimpleDateFormat  ("yyyy-MM-dd HH:mm:ss");//dd/MM/yyyy

                        Date now = new Date();
                        String strDate = sdfDate.format(now);

                        //System.out.println(strDate);

                        queue.add(msg + "&" + strDate);

                     // System.out.println("MSG =" + msg);
                    }

                  // Reset the length of the packet before reusing it.
                   packet.setLength(buffer.length);

                } catch (IOException e) {
                    fer.felog("svr class", "producer", "producer thread",e.getClass().getName() + ": " + e.getMessage());
                    dsocket.close();
                    break; 
                }
            }

        } catch (SocketException e) {
          fer.felog("svr class", "producer","Another App using the udp port " + port, e.getClass().getName() + ": " + e.getMessage()); 

        }

    }

}


5 commentaires

Il peut être bon de lire les différentes classes disponibles dans le package java.util.concurrent.


Un modèle traditionnel produit / consommateur utiliserait un verrou de moniteur pour bloquer le consommateur jusqu'à ce que le producteur ait quelque chose à offrir, vous pouvez utiliser une file d'attente de blocage pour obtenir le même résultat


Pourquoi ne pas utiliser BlockingQueue


Merci pour toutes vos réponses. Dans ma situation, je ne peux pas utiliser la file d'attente de blocage. Le producteur recevra à tout moment un msg sur le port UDP. La partie consommateur doit supprimer un par un pour faire un traitement supplémentaire sur le msg, ce qui prend un certain temps, donc je ne peux pas utiliser le blocage.


En relation: NetBeans / Java / Nouvel indice: Thread.sleep appelé boucle


3 Réponses :


8
votes

Au lieu de rendre Consumer étendre Runnable , vous pouvez modifier votre code pour incorporer un ScheduledExecutorService qui exécute l'interrogation de la file d'attente toutes les demi-secondes au lieu de mettre le thread en veille. Un exemple de ceci serait

public void schedule() {
    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    executor.scheduleAtFixedRate(() -> {
        String str;
        try {
            while ((str = queue.poll()) != null) {
                call(str);  // do further processing
            }
        } catch (IOException e) {
            ferpt.felog("svr class", "consumer", "consumer thread", e.getClass().getName() + ": " + e.getMessage());
        }
    }, 0, 500, TimeUnit.MILLISECONDS);
}


1 commentaires

Merci pour la réponse rapide et je vais essayer ceci et vous faire savoir - comment cela s'est passé.



0
votes

Laisser le consommateur attendre () sur un objet auquel les deux ont accès, et laisser le producteur notifier () aux écouteurs sur cet objet lorsqu'il y a de nouveaux messages. Le consommateur doit alors supprimer tous les messages, pas seulement un seul comme dans l'exemple.


1 commentaires

Cela revient au même qu'une file d'attente bloquante (ce qui est tout à fait préférable aux réponses conseillant d'utiliser une tâche planifiée). Inutile de le réinventer.



9
votes

La bonne solution à votre problème est d'utiliser une file d'attente de blocage. Cela vous donne plusieurs avantages:

  • ne gaspille pas le processeur occupé à attendre
  • peut avoir une capacité limitée - imaginez que vous avez un producteur rapide, mais un consommateur lent -> si la taille de la file d'attente n'est pas limitée, votre application peut facilement atteindre la condition OutOfMemory

Voici une petite démo, avec laquelle vous pouvez jouer:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProdConsTest {
    public static void main(String[] args) throws InterruptedException {
        final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        final Runnable producer = () -> {
            for (int i = 0; i < 1000; i++) {
                try {
                    System.out.println("Producing: " + i);
                    queue.put(i);

                    //Adjust production speed by modifying the sleep time
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    //someone signaled us to terminate
                    break;
                }
            }
        };

        final Runnable consumer = () -> {
            while (true) {
                final Integer integer;
                try {
                    //Uncomment to simulate slow consumer:
                    //Thread.sleep(1000);

                    integer = queue.take();
                } catch (InterruptedException e) {
                    //someone signaled us to terminate
                    break;
                }
                System.out.println("Consumed: " + integer);
            }
        };


        final Thread consumerThread = new Thread(consumer);
        consumerThread.start();

        final Thread producerThread = new Thread(producer);
        producerThread.start();

        producerThread.join();
        consumerThread.interrupt();
        consumerThread.join();
    }
}

Décommentez maintenant le sleep () dans le consommateur et observez ce qui se passe avec l'application. Si vous utilisiez une solution basée sur une minuterie telle que le ScheduledExecutorService proposé ou si vous étiez occupé à attendre, alors avec un producteur rapide, la file d'attente se développerait de manière incontrôlable et finirait par planter votre application


1 commentaires

Merci pour votre réponse. J'ai implémenté le ScheduledExectorService et cela fonctionne très bien. Comme je l'ai mentionné, le producteur écoute un port UDP et il peut recevoir des paquets à tout moment et je ne peux pas utiliser la file d'attente de blocage. Le consommateur est beaucoup plus rapide que le producteur. La méthode d'appel remet l'objet à la classe multi thread. Quoi qu'il en soit, j'essaierai aussi votre méthode