1
votes

Pourquoi le threading des calculs en virgule flottante sur le CPU les rend-ils beaucoup plus longs?

Je travaille actuellement sur une simulation scientifique (Gravitational nbody). Je l'ai d'abord écrit avec un algorithme naïf à thread unique, et cela a fonctionné de manière acceptable pour un petit nombre de particules. J'ai ensuite multi-threadé cet algorithme (il est parallèlement embarrassant), et le programme a pris environ 3 fois plus de temps. Ce qui suit est un exemple minimal, complet et vérifiable d'un algorithme trivial avec des propriétés similaires et une sortie dans un fichier dans / tmp (il est conçu pour fonctionner sous Linux, mais le C ++ est également standard). Soyez averti que si vous décidez d'exécuter ce code, il produira un fichier de 152,62 Mo. Les données sont générées pour empêcher le compilateur d'optimiser le calcul hors du programme.

real    14m32.856s
user    216m58.063s
sys     0m4.492s

Quand je lance ceci (compilé avec clang 7.0.1 sous Linux), j'obtiens les heures suivantes du Commande Linux time . La différence entre ceux-ci est similaire à ce que je vois dans mon programme réel. L'entrée intitulée «réel» est ce qui est pertinent pour cette question, car il s'agit de l'heure d'horloge nécessaire au programme pour s'exécuter.

À un seul thread:

real    6m27.261s
user    6m27.081s
sys     0m0.051s

Multi-thread :

#include <iostream>
#include <functional>
#include <thread>
#include <vector>
#include <atomic>
#include <random>
#include <fstream>
#include <chrono>

constexpr unsigned ITERATION_COUNT = 2000;
constexpr unsigned NUMBER_COUNT = 10000;

void runThreaded(unsigned count, unsigned batchSize, std::function<void(unsigned)> callback){
    unsigned threadCount = std::thread::hardware_concurrency();
    std::vector<std::thread> threads;
    threads.reserve(threadCount);

    std::atomic<unsigned> currentIndex(0);

    for(unsigned i=0;i<threadCount;++i){
        threads.emplace_back([&currentIndex, batchSize, count, callback]{
            unsigned startAt = currentIndex.fetch_add(batchSize);

            if(startAt >= count){
                return;
            }else{
                for(unsigned i=0;i<count;++i){
                    unsigned index = startAt+i;
                    if(index >= count){
                        return;
                    }
                    callback(index);
                }
            }
        });
    }

    for(std::thread &thread : threads){
        thread.join();
    }
}

void threadedTest(){
    std::mt19937_64 rnd(0);
    std::vector<double> numbers;

    numbers.reserve(NUMBER_COUNT);
    for(unsigned i=0;i<NUMBER_COUNT;++i){
        numbers.push_back(rnd());
    }

    std::vector<double> newNumbers = numbers;

    std::ofstream fout("/tmp/test-data.bin");

    for(unsigned i=0;i<ITERATION_COUNT;++i) {
        std::cout << "Iteration: " << i << "/" << ITERATION_COUNT << std::endl;
        runThreaded(NUMBER_COUNT, 100, [&numbers, &newNumbers](unsigned x){
            double total = 0;
            for(unsigned y=0;y<NUMBER_COUNT;++y){
                total += numbers[y]*(y-x)*(y-x);
            }
            newNumbers[x] = total;
        });
        fout.write(reinterpret_cast<char*>(newNumbers.data()), newNumbers.size()*sizeof(double));
        std::swap(numbers, newNumbers);
    }
}

void unThreadedTest(){
    std::mt19937_64 rnd(0);
    std::vector<double> numbers;

    numbers.reserve(NUMBER_COUNT);
    for(unsigned i=0;i<NUMBER_COUNT;++i){
        numbers.push_back(rnd());
    }

    std::vector<double> newNumbers = numbers;

    std::ofstream fout("/tmp/test-data.bin");

    for(unsigned i=0;i<ITERATION_COUNT;++i){
        std::cout << "Iteration: " << i << "/" << ITERATION_COUNT << std::endl;
        for(unsigned x=0;x<NUMBER_COUNT;++x){
            double total = 0;
            for(unsigned y=0;y<NUMBER_COUNT;++y){
                total += numbers[y]*(y-x)*(y-x);
            }
            newNumbers[x] = total;
        }
        fout.write(reinterpret_cast<char*>(newNumbers.data()), newNumbers.size()*sizeof(double));
        std::swap(numbers, newNumbers);
    }
}

int main(int argc, char *argv[]) {
    if(argv[1][0] == 't'){
        threadedTest();
    }else{
        unThreadedTest();
    }
    return 0;
}

En tant que tel, je demande ce qui cause ce ralentissement massif alors que je m'attends à une accélération significative (environ d'un facteur 8, car J'ai un processeur à 8 cœurs à 16 threads). Je ne l'implémente pas sur le GPU car l'étape suivante consiste à apporter des modifications à l'algorithme pour le faire passer de O (n²) à O (nlogn), mais qui ne sont pas non plus à l'amiable avec un GPU. L'algorithme modifié aura moins de différence avec mon algorithme O (n²) actuellement implémenté que l'exemple inclus. Enfin, je veux observer que le temps subjectif pour exécuter chaque itération (jugé par le temps entre les lignes d'itération apparaissant) change de manière significative dans les exécutions filetées et non filetées.


10 commentaires

Si vos threads utilisent des données partagées et des variables atomiques ou des verrous, il est fort possible que tous les coûts de synchronisation annulent complètement les gains de la parallélisation. Pour des performances optimales, ne partagez rien, ou le moins possible. Pourriez-vous aborder cela avec une stratégie de style de carte / réduction?


@tadman Comme je peux le voir, la variable atomique n'est accessible qu'une seule fois par thread. S'il y a 8 threads, on n'y accède que 8 fois, ce qui est très peu.


C'est ce à quoi je m'attendais, mais il doit y avoir autre chose qui entrave les performances. Pourquoi utilisez-vous constexpr pour des nombres simples? Pourquoi pas simplement const ? Je ne peux pas reproduire votre problème car lors de l'exécution de ce code, j'obtiens un segfault.


c ++ - Quelle est la surcharge de performances de std :: function? - Dépassement de pile ?


@tadman J'utilise constexpr pour encourager l'inlining. Je me rends compte que ce n'est peut-être pas tout à fait nécessaire, mais mon livre C ++ IIRC prétend que cela le rend plus probable.


@tadman Exécuter avec le premier argument 't' ou '' <-> threadé / non threadé.


Cela vaut la peine de mentionner que lors de la publication, mais oui, maintenant, il fonctionne au moins.


Êtes-vous sûr de bien répartir le travail? Je peux voir l'ajustement du décalage, mais ils semblent tous parcourir les entrées size , donc cela devrait être beaucoup plus lent dans votre version filetée en raison de la duplication du travail.


const devrait être suffisant pour des valeurs simples. constexpr peut entrer en jeu avec des expressions plus complexes. Dans tous les cas, si vous compilez avec -O3 , vous devriez être défini dans les deux cas, juste que const a plus de sens ici. Peut-être effectuer un benchmark pour voir s'il y a une différence, ou vider la sortie d'assembly compilée des fonctions impactées pour voir si quelque chose change.


Au lieu de sauvegarder dans un fichier, faites simplement std :: cout << std :: accumulate (newNumbers.begin (), newNumbers.end (), 0.) << '\ n'; . Puisque les newNumbers sont dérivés de nombres aléatoires, le compilateur doit émettre du code pour effectuer les calculs réels et afficher la somme.


3 Réponses :


5
votes

C'est un peu difficile de suivre ce code, mais je pense que vous dupliquez le travail à grande échelle parce que chaque thread fait presque tout le travail, en sautant juste une petite partie au début.

Je suis en supposant que la boucle interne de runThreaded devrait être:

unsigned startAt = currentIndex.fetch_add(batchSize);

while (startAt < count) {
  if (startAt >= count) {
    return;
  } else {
    for(unsigned i=0;i<batchSize;++i){
      unsigned index = startAt+i;

      if(index >= count){
        return;
      }

      callback(index);
    }
  }

  startAt = currentIndex.fetch_add(batchSize);
}

i est la clé ici. Vous ne devez faire que le travail requis par le lot, et non count fois, qui est la liste entière moins le décalage initial.

Avec cette mise à jour, le code s'exécute de manière significative plus rapide. Je ne sais pas si cela fait tout le travail nécessaire car il est difficile de dire si cela se produit réellement, le résultat est très minime.


3 commentaires

Cela nécessitera une boucle supplémentaire afin que chaque thread obtienne un autre lot ou vous ne traiterez pas chaque index.


@ 1201ProgramAlarm C'est probablement le cas, car une structure de type pool de threads aiderait ici. J'ai ajusté le code en boucle jusqu'à ce qu'il épuise la piscine.


Merci! C'était ça: D J'ai regardé le code pendant des heures et je ne l'ai pas vu.



2
votes

Pour une parallélisation facile sur plusieurs processeurs, je recommande d'utiliser tbb :: parallel_for . Il utilise le nombre correct de processeurs et divise la plage pour vous, éliminant complètement le risque de mal l'implémenter. Alternativement, il existe un parallèle for_each en C ++ 17 . En d'autres termes, ce problème a un certain nombre de bonnes solutions.

La vectorisation du code est un problème difficile et ni clang ++ - 6 ni g ++ - 8 ne vectorisent automatiquement la ligne de base code. Par conséquent, la version SIMD ci-dessous, j'ai utilisé d'excellents Vc: types C ++ portables, sans surcharge pour la programmation explicitement parallèle aux données a > bibliothèque.

Voici un benchmark de travail qui compare:

  • La version de référence.
  • Version SIMD.
  • Version multi-thread SIMD +.


baseline: 2.76582e+257, 6.399848397s
    simd: 2.76582e+257, 1.600373449s, 3.99897x speedup
 simd_mt: 2.76582e+257, 0.168638435s, 37.9501x speedup

Horaires :

#include <Vc/Vc>
#include <tbb/parallel_for.h>

#include <algorithm>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <random>
#include <vector>

constexpr int ITERATION_COUNT = 20;
constexpr int NUMBER_COUNT = 20000;

double baseline() {
    double result = 0;

    std::vector<double> newNumbers(NUMBER_COUNT);
    std::vector<double> numbers(NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers)
        n = rnd();

    for(int i = 0; i < ITERATION_COUNT; ++i) {
        for(int x = 0; x < NUMBER_COUNT; ++x) {
            double total = 0;
            for(int y = 0; y < NUMBER_COUNT; ++y) {
                auto d = (y - x);
                total += numbers[y] * (d * d);
            }
            newNumbers[x] = total;
        }
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), 0.);
        swap(numbers, newNumbers);
    }

    return result;
}

double simd() {
    double result = 0;

    constexpr int SIMD_NUMBER_COUNT = NUMBER_COUNT / Vc::double_v::Size;
    using vector_double_v = std::vector<Vc::double_v, Vc::Allocator<Vc::double_v>>;
    vector_double_v newNumbers(SIMD_NUMBER_COUNT);
    vector_double_v numbers(SIMD_NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers) {
        alignas(Vc::VectorAlignment) double t[Vc::double_v::Size];
        for(double& v : t)
            v = rnd();
        n.load(t, Vc::Aligned);
    }

    Vc::double_v const incv(Vc::double_v::Size);
    for(int i = 0; i < ITERATION_COUNT; ++i) {
        Vc::double_v x(Vc::IndexesFromZero);
        for(auto& new_n : newNumbers) {
            Vc::double_v totals;
            int y = 0;
            for(auto const& n : numbers) {
                for(unsigned j = 0; j < Vc::double_v::Size; ++j) {
                    auto d = y - x;
                    totals += n[j] * (d * d);
                    ++y;
                }
            }
            new_n = totals;
            x += incv;
        }
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), Vc::double_v{}).sum();
        swap(numbers, newNumbers);
    }

    return result;
}

double simd_mt() {
    double result = 0;

    constexpr int SIMD_NUMBER_COUNT = NUMBER_COUNT / Vc::double_v::Size;
    using vector_double_v = std::vector<Vc::double_v, Vc::Allocator<Vc::double_v>>;
    vector_double_v newNumbers(SIMD_NUMBER_COUNT);
    vector_double_v numbers(SIMD_NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers) {
        alignas(Vc::VectorAlignment) double t[Vc::double_v::Size];
        for(double& v : t)
            v = rnd();
        n.load(t, Vc::Aligned);
    }

    Vc::double_v const v0123(Vc::IndexesFromZero);
    for(int i = 0; i < ITERATION_COUNT; ++i) {
        constexpr int SIMD_STEP = 4;
        tbb::parallel_for(0, SIMD_NUMBER_COUNT, SIMD_STEP, [&](int ix) {
            Vc::double_v xs[SIMD_STEP];
            for(int is = 0; is < SIMD_STEP; ++is)
                xs[is] = v0123 + (ix + is) * Vc::double_v::Size;
            Vc::double_v totals[SIMD_STEP];
            int y = 0;
            for(auto const& n : numbers) {
                for(unsigned j = 0; j < Vc::double_v::Size; ++j) {
                    for(int is = 0; is < SIMD_STEP; ++is) {
                        auto d = y - xs[is];
                        totals[is] += n[j] * (d * d);
                    }
                    ++y;
                }
            }
            std::copy_n(totals, SIMD_STEP, &newNumbers[ix]);
        });
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), Vc::double_v{}).sum();
        swap(numbers, newNumbers);
    }

    return result;
}

struct Stopwatch {
    using Clock = std::chrono::high_resolution_clock;
    using Seconds = std::chrono::duration<double>;
    Clock::time_point start_ = Clock::now();

    Seconds elapsed() const {
        return std::chrono::duration_cast<Seconds>(Clock::now() - start_);
    }
};


std::ostream& operator<<(std::ostream& s, Stopwatch::Seconds const& a) {
    auto precision = s.precision(9);
    s << std::fixed << a.count() << std::resetiosflags(std::ios_base::floatfield) << 's';
    s.precision(precision);
    return s;
}

void benchmark() {
    Stopwatch::Seconds baseline_time;
    {
        Stopwatch s;
        double result = baseline();
        baseline_time = s.elapsed();
        std::cout << "baseline: " << result << ", " << baseline_time << '\n';
    }

    {
        Stopwatch s;
        double result = simd();
        auto time = s.elapsed();
        std::cout << "    simd: " << result << ", " << time << ", " << (baseline_time / time) << "x speedup\n";
    }

    {
        Stopwatch s;
        double result = simd_mt();
        auto time = s.elapsed();
        std::cout << " simd_mt: " << result << ", " << time << ", " << (baseline_time / time) << "x speedup\n";
    }
}

int main() {
    benchmark();
    benchmark();
    benchmark();
}

Remarques:

  • Ma machine prend en charge AVX mais pas AVX-512, donc il est environ 4x accéléré lors de l'utilisation de SIMD.
  • La version
  • simd_mt utilise 8 threads sur ma machine et des étapes SIMD plus importantes. L'accélération théorique est de 128x, en pratique - 38x.
  • clang ++ - 6 ne peut pas vectoriser automatiquement le code de base, pas plus que g ++ - 8 .
  • g ++ - 8 génère un code considérablement plus rapide pour les versions SIMD que clang ++ - 6 .


3 commentaires

Ceci, bien qu'utile, est moins utile car clang a une vectorisation automatique dans certains cas. Mon vrai programme est l'un de ces cas. ( source )


@ john01dav Ajout d'une version parallèle + SIMD pour vous.


@ john01dav Ajout de notes de vectorisation pour vous.



1
votes

Votre cœur est certainement au bon endroit moins un bogue ou deux.

par_for est un problème complexe en fonction de la charge utile de votre boucle. Il y a pas de solution universelle à ce problème. La charge utile peut être n'importe quoi de quelques ajouts à des blocs mutex presque infinis - par exemple en faisant de la mémoire allocation.

La variable atomique en tant que modèle d'élément de travail a toujours bien fonctionné pour moi mais rappelez-vous que les variables atomiques ont un coût élevé sur X86 (~ 400 cycles) et même encourent un coût élevé s'ils sont dans une branche non exécutée comme je l'ai trouvé à mes risques et périls.

Une certaine permutation de ce qui suit est généralement bonne. Le choix du bon chunks_per_thread (comme dans votre batchSize) est essentiel. Si vous ne faites pas confiance à votre utilisateurs, vous pouvez tester exécuter quelques itérations de la boucle pour deviner le meilleur niveau de segmentation.

...
k=0 174.925903ms total
...
k=1 27.924738ms total

résultats

#include <atomic>
#include <future>
#include <thread>
#include <vector>
#include <stdio.h>

template<typename Func>
void par_for(int start, int end, int step, int chunks_per_thread, Func func) {
  using namespace std;
  using namespace chrono;

  atomic<int> work_item{start};
  vector<future<void>> futures(std::thread::hardware_concurrency());

  for (auto &fut : futures) {
    fut = async(std::launch::async, [&work_item, end, step, chunks_per_thread, &func]() {
      for(;;) {
        int wi = work_item.fetch_add(step * chunks_per_thread);
        if (wi > end) break;
        int wi_max = std::min(end, wi+step * chunks_per_thread);
        while (wi < wi_max) {
          func(wi);
          wi += step;
        }
      }
    });
  }

  for (auto &fut : futures) {
    fut.wait();
  }
}

int main() {
  using namespace std;
  using namespace chrono;
  for (int k = 0; k != 2; ++k) {
    auto t0 = high_resolution_clock::now();
    constexpr int loops = 100000000;
    if (k == 0) {
      for (int i = 0; i != loops; ++i ) {
        if (i % 10000000 == 0) printf("%d\n", i);
      }
    } else {
      par_for(0, loops, 1, 100000, [](int i) {
        if (i % 10000000 == 0) printf("%d\n", i);
      });
    }
    auto t1 = high_resolution_clock::now();
    duration<double, milli> ns = t1 - t0;
    printf("k=%d %fms total\n", k, ns.count());
  }
}

Environ une accélération 6x.

J'évite le terme "parallèlement embarrassant" car ce n'est presque jamais le cas. Plus vous utilisez de ressources pour passer du cache de niveau 1 (latence ns) au cluster couvrant le globe (latence ms), vous payez des coûts exponentiellement plus élevés. Mais j'espère que cet extrait de code est utile comme réponse.


1 commentaires

Je ne vois pas clairement comment cela améliore les réponses précédentes; vous voudrez peut-être ajouter quelque chose qui met en évidence les différences par rapport à la réponse acceptée et à l'autre réponse avec une accélération de 37x. Sans rapport, mais il y a quelques classes de problèmes qui sont définitivement parallèles de manière embarrassante - ce vieux trope existe pour une raison. YMMV.