Je veux créer plusieurs threads en C ++ qui continuent de fonctionner et attendent les commandes principales des threads et les exécutent en conséquence. Voici le code que j'ai écrit et je sais que cela cause le problème de rotation
. La question est donc de savoir comment puis-je laisser le processeur arrêter d'exécuter le thread de travail jusqu'à ce que je change la commande. Je sais qu'il y a avenir et promesse
, mais ils ne semblent pas adaptés à cette situation.
[modifier] Je suis un noob C ++ et c'est tellement compliqué! Si quelqu'un peut me partager des tutoriels ou des bibliothèques pour résoudre ce problème, ce serait très apprécié!
#include <iostream> #include <thread> #include <mutex> class empty { public: empty() {} void work1() { std::cout << "work1" << std::endl; } void work2() { std::cout << "work2" << std::endl; } }; enum CMD { WAIT, CMD1, CMD2, DONE }; void worker(CMD &cmd, empty &e) { std::mutex mutex; while (cmd != DONE) { switch (cmd) { case WAIT: break; case CMD1: e.work1(); // excute cmd 1 mutex.lock(); cmd = WAIT; // change cmd to WAIT mutex.unlock(); break; case CMD2: e.work2(); mutex.lock(); cmd = WAIT; mutex.unlock(); break; default: break; } } } int main(int argc, const char * argv[]) { empty e1 = empty(); empty e2 = empty(); CMD cmd = WAIT; // mutilple thread working on mutilple empty object std::thread wokerThread1 = std::thread(worker, std::ref(cmd), std::ref(e1)); std::thread wokerThread2 = std::thread(worker, std::ref(cmd), std::ref(e2)); ... //some other code cmd = CMD1; ... cmd = CMD2; ... cmd = CMD1; ... cmd = DONE; wokerThread1.join(); wokerThread2.join(); return 0; }
3 Réponses :
Une des raisons de faire cela est d'utiliser une concurrent_bounded_queue
. Vous pouvez utiliser l'implémentation TBB pour cela ou l'implémenter en utilisant std: : queue
et std :: condition_variable
.
Implémentation en utilisant uniquement std
;
#include <chrono> #include <queue> #include <thread> #include <mutex> #include <iostream> #include <condition_variable> enum CMD { CMD1, CMD2, DONE }; void executeCmd(int cmd) { printf("exec %u: cmd[%d]\n", std::this_thread::get_id(), cmd); std::this_thread::sleep_for(std::chrono::seconds(cmd)); } class Worker { public: Worker() : _thread(std::thread(&Worker::work, this)) { } void pushCmd(CMD cmd) { printf("push %u: cmd[%d]\n", std::this_thread::get_id(), cmd); const std::lock_guard<std::mutex> lock(_m); _tasks.push(cmd); _cv.notify_one(); } void finish() { pushCmd(DONE); _thread.join(); } private: std::thread _thread; std::mutex _m; std::queue<CMD> _tasks; std::condition_variable _cv; CMD popCmd() { std::unique_lock<std::mutex> lk(_m); _cv.wait(lk, [&]{ return !_tasks.empty(); }); CMD cmd = _tasks.front(); printf("pop %u: cmd[%d]\n", std::this_thread::get_id(), cmd); _tasks.pop(); return cmd; } void work() { while (true) { CMD cmd = popCmd(); switch (cmd) { case CMD1: executeCmd(1); break; case CMD2: executeCmd(2); break; case DONE: default: return; } } } }; int main(int argc, const char * argv[]) { Worker w1, w2; w1.pushCmd(CMD1); w2.pushCmd(CMD1); w1.pushCmd(CMD2); w2.pushCmd(CMD2); w1.finish(); w2.finish(); return 0; }
Implémentation en utilisant tbb::concurrent_bounded_queue
;
#include <tbb/concurrent_queue.h> void worker(tbb::concurrent_bounded_queue<CMD>& tasks) { while (true) { CMD cmd; tasks.pop(cmd); switch (cmd) { case CMD1: // excute cmd 1 break; case CMD2: // excute cmd 2 break; case DONE: default: return; } } } int main(int argc, const char * argv[]) { tbb::concurrent_bounded_queue<CMD> tasks; std::thread wokerThread1 = std::thread(worker, std::ref(tasks)); std::thread wokerThread2 = std::thread(worker, std::ref(tasks)); ... tasks.push(CMD1); tasks.push(CMD2); ... }
Notez que vous voulez exécuter la même tâche plusieurs fois, vous pouvez créer un Worker
qui résume tout comme suit;
#include <queue> #include <chrono> #include <thread> #include <mutex> #include <iostream> #include <condition_variable> std::mutex g_m; std::condition_variable g_cv; enum CMD { CMD1, CMD2, DONE }; void push_cmd(std::queue<CMD>& tasks, CMD cmd) { const std::lock_guard<std::mutex> lock(g_m); tasks.push(cmd); g_cv.notify_one(); } CMD pop_cmd(std::queue<CMD>& tasks) { std::unique_lock<std::mutex> lk(g_m); g_cv.wait(lk, [&tasks]{ return !tasks.empty(); }); CMD cmd = tasks.front(); tasks.pop(); return cmd; } void execute_cmd(int cmd) { std::cout << std::this_thread::get_id() << ": cmd [" << cmd << "]" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(cmd)); } void worker(std::queue<CMD>& tasks) { CMD cmd = pop_cmd(tasks); while (true) { switch (cmd) { case CMD1: execute_cmd(1); break; case CMD2: execute_cmd(2); break; case DONE: default: return; } } } int main(int argc, const char * argv[]) { std::queue<CMD> tasks; std::thread wokerThread1 = std::thread(worker, std::ref(tasks)); std::thread wokerThread2 = std::thread(worker, std::ref(tasks)); push_cmd(tasks, CMD1); push_cmd(tasks, CMD2); // push `DONE` for each worker push_cmd(tasks, DONE); push_cmd(tasks, DONE); wokerThread1.join(); wokerThread2.join(); return 0; }
TBB pourrait apparaître comme MT Swiss-Army-Knife. Cependant, pour apprendre, je recommanderais de corriger le code avec ce que fournit std
C ++. À mon humble avis, cela vaut la peine d'acquérir de l'expérience sur ce qui se passe "sous le capot".
N'avez-vous pas besoin de mettre la commande DONE
deux fois (c'est-à-dire une pour chaque thread)?
@scheff vous pouvez toujours écrire votre propre concurrent_thread_queue: P
vous pouvez toujours écrire votre propre concurrent_thread_queue C'est ce que j'ai déjà fait (et échange de tampons via des variables de pointeur atomique et communication inter-thread via des drapeaux atomiques et ... - je ne me souviens pas quoi d'autre.): - )
@bertubezz Merci pour votre réponse! Laissez-moi essayer de voir si cela fonctionne.
@Scheff Merci de votre aide! Je pense que cette question ne peut pas être résolue par des implémentations faciles, j'aurai un long chemin à parcourir pour apprendre ces concepts.
@bertubezz Désolé si je ne l'ai pas précisé, mais ce que je veux faire, c'est que j'ai besoin des deux (ou de plusieurs) threads pour faire la même tâche en parallèle pendant que je peux changer la tâche quand je veux, pas un seul thread une tâche et l'autre en font une autre. ils prendront des données différentes. Dois-je utiliser plusieurs files d'attente (une file pour un thread) pour faire cela?
@bertubezz s'il vous plaît passez en revue ma modification de votre code, j'ajoute une boucle while dans worker, cela fonctionne très bien maintenant, merci encore
Comme vous l'avez mentionné, vous êtes un noob C ++, assurez-vous simplement que vous ne faites aucune de ces erreurs " https://www.acodersjourney.com/top-20-cplusplus-multithreading-mistakes/ "
Super, maintenant je peux vérifier avec votre lien.
Il semble que vous pourriez utiliser des observateurs
avec des opérations asynchrones ici. L'idée est que votre thread de contrôle principal met à jour le CMD
pour tous les observateurs intéressés, puis les observateurs exécutent une opération particulière en fonction du CMD
. Dans cet exemple, j'ai fait un blocage d'opération de "mise à jour" (le travail précédent doit être terminé avant d'en commencer un nouveau) et le retour de void
. Cependant, vous pouvez penser à d'autres possibilités, comme renvoyer false
si l'opération précédente est toujours en cours.
#include <chrono> #include <future> #include <iostream> #include <memory> #include <thread> #include <vector> enum CMD { WAIT, CMD1, CMD2, DONE }; class SomeSystem { public: SomeSystem() = default; void work1() { // let's pretend this work takes some time std::this_thread::sleep_for(std::chrono::milliseconds(1)); std::cout << "work1" << std::endl; } void work2() { std::this_thread::sleep_for(std::chrono::milliseconds(1)); std::cout << "work2" << std::endl; } }; class CmdObserver { public: CmdObserver(std::shared_ptr<SomeSystem> system, int id): system_(system), id_(id) { std::cout << "observer[" << id_ << "] CTor" << std::endl; } void update(CMD cmd) { if (work_.valid() && work_.wait_for(std::chrono::seconds(0)) == std::future_status::timeout) { std::cout << "observer[" << id_ << "] blocking until previous work is finished" << std::endl; } work_ = std::async(std::launch::async, [this, cmd]() { doTheJob(cmd); }); } private: void doTheJob(CMD cmd) { std::cout << "observer[" << id_ << "] going to execute cmd " << cmd << std::endl; switch (cmd) { case CMD1: system_->work1(); break; case CMD2: system_->work2(); break; default: std::cout << cmd << std::endl; } } std::shared_ptr<SomeSystem> system_; // id_ is just for demonstration purposes int id_; std::future<void> work_; }; int main() { int observerId = 0; std::vector<std::shared_ptr<SomeSystem> > systems({ std::make_shared<SomeSystem>(), std::make_shared<SomeSystem>(), std::make_shared<SomeSystem>(), std::make_shared<SomeSystem>(), std::make_shared<SomeSystem>() }); std::vector<CmdObserver> observers; for (auto system : systems) { observers.push_back(CmdObserver(system, observerId)); observerId++; } for (auto& observer : observers) { observer.update(CMD1); } for (auto& observer : observers) { observer.update(CMD2); } // let's pretend we do some long operation here std::this_thread::sleep_for(std::chrono::seconds(1)); for (auto& observer : observers) { observer.update(CMD1); } }
Merci! Votre réponse fonctionne exactement ce que je voulais!
Jetez un œil à
std :: condition_variable
.Vous devriez envisager d'utiliser un
std :: atomic
. Cela sent les courses de données partout.De plus, pour que les mutex fonctionnent, ils doivent être partagés entre les threads. Comme vous l'avez actuellement, chaque thread obtient son propre mutex, il n'y a donc pas de synchronisation.
Vous pouvez utiliser des tubes pour envoyer une commande au thread qui bloquera (mettez read inside while (true)) si le tube est vide et se réveillera quand ce n'est pas le cas.
En plus de @Frank: Chaque accès à une variable partagée doit être protégé contre les mutex et, bien sûr, la même variable au mieux toujours avec le même mutex. Cela inclut également le fil "principal". Toutes les occurrences de
cmd =
... dansmain ()
ne sont pas gardées -> U.B.Une autre réponse SO: Alors que la boucle du thread principal est bloquée lors de l'utilisation de std :: thread
Avec quelques fautes de frappe mineures mais, en général, (IMHO) pas si mal: Multithreading C ++ 11 (avec 10 parties).
@Frank Je vois où est le problème, merci pour votre aide!