62
votes

Comment Numpy est-il si rapide?

J'essaie de comprendre comment Numpy peut être si rapide, en fonction de ma comparaison choquante avec le code C / C ++ optimisé qui est encore loin de reproduire la vitesse de Numpy.

Considérons l'exemple suivant : Étant donné un tableau 2D avec forme = (n, n) et dtype = float32 , qui représente une liste de n vecteurs de n dimensions, je calcule les différences par paires entre chaque paire de vecteurs. Utilisation de la diffusion Numpy , cela écrit simplement comme:

#include <stdio.h>
#include <string.h>
#include <xmmintrin.h>   // compile with -mavx -msse4.1
#include <pmmintrin.h>
#include <immintrin.h>
#include <time.h>


#define X(i, j)     _x[(i)*N + (j)]
#define res(i, j, k)  _res[((i)*N + (j))*N + (k)]

float* pairwise_sub_naive( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    for (int i = 0; i < N; i++) {
        for (int j = 0; j < N; j++) {
            for (int k = 0; k < N; k++)
                res(i,j,k) = X(i,k) - X(j,k);
          }
    }
    return _res;
}

float* pairwise_sub_better( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));
    
    for (int i = 0; i < N; i++) {
        const float* xi = & X(i,0);

        for (int j = 0; j < N; j++) {
            const float* xj = & X(j,0);
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k++)
                 r[k] = xi[k] - xj[k];
        }
    }
    return _res;
}

float* pairwise_sub_simd( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    // create caches for row vectors which are memory-aligned
    float* xi = (float*)aligned_alloc(32, N * sizeof(float));
    float* xj = (float*)aligned_alloc(32, N * sizeof(float));
    
    for (int i = 0; i < N; i++) {
        memcpy(xi, & X(i,0), N*sizeof(float));
        
        for (int j = 0; j < N; j++) {
            memcpy(xj, & X(j,0), N*sizeof(float));
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k += 256/sizeof(float)) {
                const __m256 A = _mm256_load_ps(xi+k);
                const __m256 B = _mm256_load_ps(xj+k);
                _mm256_store_ps(r+k, _mm256_sub_ps( A, B ));
            }
        }
    }
    free(xi); 
    free(xj);
    return _res;
}


float* pairwise_sub_blocks( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    #define B 8
    float cache1[B*B], cache2[B*B];

    for (int bi = 0; bi < N; bi+=B)
      for (int bj = 0; bj < N; bj+=B)
        for (int bk = 0; bk < N; bk+=B) {
        
            // load first 8x8 block in the cache
            for (int i = 0; i < B; i++)
              for (int k = 0; k < B; k++)
                cache1[B*i + k] = X(bi+i, bk+k);

            // load second 8x8 block in the cache
            for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                cache2[B*j + k] = X(bj+j, bk+k);

            // compute local operations on the caches
            for (int i = 0; i < B; i++)
             for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                 res(bi+i,bj+j,bk+k) = cache1[B*i + k] - cache2[B*j + k];
         }
    return _res;
}

int main() 
{
    const int N = 512;
    float* _x = (float*) malloc( N * N * sizeof(float) );
    for( int i = 0; i < N; i++)
      for( int j = 0; j < N; j++)
        X(i,j) = ((i+j*j+17*i+101) % N) / float(N);

    double best = 9e9;
    for( int i = 0; i < 5; i++)
    {
        struct timespec start, stop;
        clock_gettime(CLOCK_THREAD_CPUTIME_ID, &start);
        
        //float* res = pairwise_sub_naive( _x, N );
        //float* res = pairwise_sub_better( _x, N );
        //float* res = pairwise_sub_simd( _x, N );
        float* res = pairwise_sub_blocks( _x, N );

        clock_gettime(CLOCK_THREAD_CPUTIME_ID, &stop);

        double t = (stop.tv_sec - start.tv_sec) * 1e6 + (stop.tv_nsec - start.tv_nsec) / 1e3;    // in microseconds
        if (t < best) best = t;
        free( res );
    }
    printf("Best of 5 = %f ms\n", best / 1000);

    free( _x );
    return 0;
}

en utilisant TimeIt Je peux mesurer les performances de n = 512 : il faut 88 ms par appel sur mon ordinateur portable.

Maintenant, en C / C ++, une implémentation naïve écrit comme:

xxx

Compilation à l'aide de gcc 7.3.0 avec -o3 Indicateur, j'obtiens 195 ms par appel pour pairwise_sub_nauve (x) , ce qui n'est pas trop mal Compte tenu de la simplicité du code, mais environ 2 fois plus lent que Numpy .

Maintenant, je commence à devenir sérieux et à ajouter de petites optimisations, en indexant directement les vecteurs de ligne: p >

import numpy as np

def pairwise_sub_numpy( X ):
    return X - X[:, None, :]

N = 512
X = np.random.rand(N,N).astype(np.float32)

import timeit
times = timeit.repeat('pairwise_sub_numpy( X )', globals=globals(), number=1, repeat=5)
print(f">> best of 5 = {1000*min(times):.3f} ms")

La vitesse reste la même à 195 ms, ce qui signifie que le compilateur a pu comprendre autant. Utilisons maintenant les instructions vectorielles SIMD:

float* pairwise_sub_blocks( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    #define B 8
    float cache1[B*B], cache2[B*B];

    for (int bi = 0; bi < N; bi+=B)
      for (int bj = 0; bj < N; bj+=B)
        for (int bk = 0; bk < N; bk+=B) {
        
            // load first 8x8 block in the cache
            for (int i = 0; i < B; i++)
              for (int k = 0; k < B; k++)
                cache1[B*i + k] = X(bi+i, bk+k);

            // load second 8x8 block in the cache
            for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                cache2[B*j + k] = X(bj+j, bk+k);

            // compute local operations on the caches
            for (int i = 0; i < B; i++)
             for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                 res(bi+i,bj+j,bk+k) = cache1[B*i + k] - cache2[B*j + k];
         }
    return _res;
}

Cela ne donne qu'un petit coup de pouce (178 ms au lieu de 194 ms par appel de fonction).

alors j'étais Vous vous demandez si une approche "en blocs", comme ce qui est utilisé pour optimiser les produits de points, pourrait être bénéfique:

float* pairwise_sub_simd( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    // create caches for row vectors which are memory-aligned
    float* xi = (float*)aligned_alloc(32, N * sizeof(float));
    float* xj = (float*)aligned_alloc(32, N * sizeof(float));
    
    for (int i = 0; i < N; i++) {
        memcpy(xi, & X(i,0), N*sizeof(float));
        
        for (int j = 0; j < N; j++) {
            memcpy(xj, & X(j,0), N*sizeof(float));
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k += 256/sizeof(float)) {
                const __m256 A = _mm256_load_ps(xi+k);
                const __m256 B = _mm256_load_ps(xj+k);
                _mm256_store_ps(r+k, _mm256_sub_ps( A, B ));
            }
        }
    }
    free(xi); 
    free(xj);
    return _res;
}

Et surtout, c'est la méthode la plus lente jusqu'à présent (jusqu'à présent ( 258 ms par appel de fonction).

Pour résumer, malgré certains efforts avec un code C ++ optimisé, je ne peux nulle part à la fermeture du 88 ms / appel que Numpy atteint sans effort. Une idée pourquoi?

Remarque: Soit dit en passant, je désactive Numpy multi-threading et de toute façon, ce type d'opération n'est pas multi-thread.

modifier : code exact pour comparer le code Numpy:

float* pairwise_sub_better( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));
    
    for (int i = 0; i < N; i++) {
        const float* xi = & X(i,0);

        for (int j = 0; j < N; j++) {
            const float* xj = & X(j,0);
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k++)
                 r[k] = xi[k] - xj[k];
        }
    }
    return _res;
}

Benchmark complet pour le code C:

#define X(i, j)     _X[(i)*N + (j)]
#define res(i, j, k)  _res[((i)*N + (j))*N + (k)]

float* pairwise_sub_naive( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    for (int i = 0; i < N; i++) {
        for (int j = 0; j < N; j++) {
            for (int k = 0; k < N; k++)
                res(i,j,k) = X(i,k) - X(j,k);
          }
    }
    return _res;
}


25 commentaires

Votre code C ++ alloue beaucoup de mémoire. pairwise_sub_numpy ne fait rien de tout cela. Mesurer les allocations et les calculs réels séparément


Oui, Numpy alloue également un tableau N x n x n qui est renvoyé. De plus, j'ai mesuré le temps d'allocation et il est plutôt négligeable (environ 0,01 ms par appel).


Les questions sur les performances nécessitent de nombreux détails. Comment avez-vous mesuré? Quel est le code Numpy complet (comment l'appelez-vous? Que mesurez-vous exactement?) Comment compilez-vous le code C ++? Quel compilateur? Quels drapeaux?


Pairwise_sub_numpy ne renvoie-t-il pas une sorte de générateur? Dans un tel cas, en fait, il fait quelque chose comme pairwise_sub_nauve .


Non, il renvoie un tableau Numpy, pas un générateur.


-o3 ne génère pas le code le plus rapide. Son objectif est de créer un exécutable plus petit. Non pas que j'attends une énorme différence entre -o2 et -o3 .


BTW, un petit commentaire: vous ne devez rien nommer à partir de soulignement, suivi directement d'une lettre de cas supérieure, car cela est réservé à la mise en œuvre. Voir ici .


Je ne sais pas si cela provoque le ralentissement, mais ne sous-réticulez pas le résultat? Votre résultat est n * n * n octets plutôt que n * n * n * sizeof (float) octets


@sweenish -o3 ajoute plus d'optimisations, ce qui entraîne souvent un plus grand binaire (bien que vous ayez raison, ce n'est pas nécessairement plus rapide et parfois plus lent!); Le (rarement utilisé) -os est destiné aux binaires plus petits gcc.gnu.org/onlinedocs/gcc/optimize-options.html Stackoverflow.com/questions/19689014/…


Merci pour la référence. Je me souvenais incorrectement.


Publiez le code complet de votre référence C ++.


@Paulhankin Oui, vous avez raison, mais mon code d'origine était correct. J'ai fait l'erreur lors de la copie / édition sur StackOverflow. Je publierai le code de référence C ++ complet. Je l'ai corrigé ici.


Cela semble que GCC ne parvient pas à optimiser la multiplication. Que se passe-t-il si vous réécrivez votre code pour éviter la multiplication dans le calcul d'index?


Êtes-vous sûr de ne pas avoir gâché quelque chose avec vos mesures? Votre code Numpy prend 83 ms ici et votre code C ++ 92 ms pour le naïf et 60 ms pour les blocs ...


Fait intéressant, il fonctionne encore plus vite si je retire le -mavx et je laisse uniquement -MSSE4.1


@florestan en effet, la référence est maintenant plus rapide que Numpy avec la version bloquée !! Mais j'ai réécrit un peu l'indice de référence dans le processus de publication sur StackOverflow, et je suis sûr que mes anciens horaires étaient corrects. Je ne comprends pas vraiment où le ralentissement s'est produit et pourquoi il n'est plus là ...


@florestan donc en fait il y avait un bug dans mon #Define (parenthèses manquantes). Avec le bug corrigé, je ne peux pas améliorer plus de 2x plus lent que Numpy.


Le GCC 7.3 ne s'autoVERVE pas. Les GCC plus récents le font. Votre code SIMD est terrible car il fait des copies inutiles; Utilisez des charges non alignées si vous n'êtes pas sûr que l'entrée est alignée.


Numpy est écrit en C de toute façon, il pourrait donc être possible d'enquêter sur son code source


@Sopel Je sais, j'ai essayé de m'en débarrasser et cela n'a pas d'importance. J'ai également compilé avec GCC 8.3.1 et le perf reste le même avec SIMD.


@ M.M J'ai regardé mais je ne pouvais pas le localiser. Le code source Numpy est vraiment complexe.


aligned_alloc (32, n * n * n * sizeof (float)); est risqué: n * n * n pourrait déborder la plage de int . Vous devez écrire sizeof (float) * n * n * n qui force le calcul complet pour utiliser le type potentiellement plus grand size_t .


Pour référence, le code Numpy pour la boucle intérieure est ici et ici , et une fois que vous avez développé toutes les choses de génération de code, cela se résume à une boucle assez naïve sans utilisation SIMD explicite. (Je crois qu'il y a un morceau de set dans les boucles extérieures, mais je ne peux pas vous orienter vers le code pertinent.)


Attendez, non, il y a simd! github.com/ Numpy / Numpy / Blob / V1.19.5 / Numpy / Core / Src / Umath /… Je donne ce fichier depuis un certain temps maintenant. Je ne sais pas si cela se déclenche ici, cependant.


Assurez-vous d'éviter de mesurer les allocations de mémoire. Peut-être que votre environnement Python réutilise déjà la mémoire allouée que j'obtiens par exemple 230 ms pour votre exemple python et 65 ms pour np.Substract (x, x [:, non ,:], res) où Res est déjà alloué . J'ai également essayé une solution Numba naïve qui, qui prend environ 43 ms, sans allocation de mémoire et 211 avec allocation de mémoire. La même chose devrait être possible dans C.


3 Réponses :


31
votes

Comme le souligne certains commentaires, Numpy utilise SIMD dans son implémentation et il n'alloue pas de la mémoire au point de calcul. Si j'élimine l'allocation de mémoire de votre implémentation, pré-allocation de tous les tampons avant le calcul, je reçois un meilleur temps par rapport à Numpy même avec la version Scaller (c'est-à-dire celle sans optimisations).

également en termes de SIMD et pourquoi votre implémentation ne fonctionne pas beaucoup mieux que l'échelle est parce que vos modèles d'accès à la mémoire ne sont pas idéaux pour l'utilisation SIMD - vous faites de la mémoire et que vous chargez dans des registres SIMD à partir d'emplacements qui sont loin les uns des autres - par exemple Vous remplissez les vecteurs de la ligne 0 et de la ligne 511, qui pourraient ne pas bien jouer avec le cache ou avec le préfetcher SIMD.

Il y a aussi une erreur dans la façon dont vous chargez les registres SIMD (si je comprenais correctement ce que vous 're essaie de calculer): un registre SIMD 256 bits peut charger 8 numéros de point flottante à un seul précis (float) " qui est 256/4 = 64 ; _x et _res sont des pointeurs flottants et les intrinsèques SIMD s'attendent également à des pointeurs flottants comme arguments, donc au lieu de lire tous les éléments de ces lignes tous les 8 flotteurs que vous les lisez tous les 64 flotteurs.

Le calcul peut être optimisé davantage en modifiant les modèles d'accès mais aussi en observant que vous répétez certains calculs: par exemple Lorsque vous en itérant avec line0 en tant que base, vous calculez line0 - line1 mais à un moment futur, lorsque vous en itérant avec line1 comme base, vous devez calcul line1 - line0 qui est essentiellement - (line0 - line1) , c'est-à-dire pour chaque ligne après line0 beaucoup de résultats pourraient être réutilisés à partir de calculs précédents. Souvent, l'utilisation ou la parallélisation SIMD nécessite de modifier la façon dont les données sont accessibles ou motivées afin de fournir des améliorations significatives.

Voici ce que j'ai fait en première étape en fonction de votre implémentation initiale et informatique est plus rapide que le Numpy (ne vous dérange pas les trucs OpenMP car ce n'est pas comme ça que c'est censé être fait, je voulais juste voir comment il se comporte en essayant la manière naïve).

C++
Time scaler version: 55 ms
Time SIMD version: 53 ms
**Time SIMD 2 version: 33 ms**
Time SIMD 3 version: 168 ms
Time OpenMP version: 59 ms

Python numpy
>> best of 5 = 88.794 ms


#include <cstdlib>
#include <xmmintrin.h>   // compile with -mavx -msse4.1
#include <pmmintrin.h>
#include <immintrin.h>

#include <numeric>
#include <algorithm>
#include <chrono>
#include <iostream>
#include <cstring>

using namespace std;

float* pairwise_sub_naive (const float* input, float* output, int n) 
{
    for (int i = 0; i < n; i++) {
        for (int j = 0; j < n; j++) {
            for (int k = 0; k < n; k++)
                output[(i * n + j) * n + k] = input[i * n + k] - input[j * n + k];
          }
    }
    return output;
}

float* pairwise_sub_simd (const float* input, float* output, int n) 
{    
    for (int i = 0; i < n; i++) 
    {
        const int idxi = i * n;
        for (int j = 0; j < n; j++)
        {
            const int idxj = j * n;
            const int outidx = idxi + j;
            for (int k = 0; k < n; k += 8) 
            {
                __m256 A = _mm256_load_ps(input + idxi + k);
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(output + outidx * n + k, _mm256_sub_ps( A, B ));
            }
        }
    }
    
    return output;
}

float* pairwise_sub_simd_2 (const float* input, float* output, int n) 
{
    float* line_buffer = (float*) aligned_alloc(32, n * sizeof(float));

    for (int i = 0; i < n; i++) 
    {
        const int idxi = i * n;
        for (int j = 0; j < n; j++)
        {
            const int idxj = j * n;
            const int outidx = idxi + j;
            for (int k = 0; k < n; k += 8) 
            {
                __m256 A = _mm256_load_ps(input + idxi + k);
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(line_buffer + k, _mm256_sub_ps( A, B ));
            }
            memcpy(output + outidx * n, line_buffer, n);
        }
    }
    
    return output;
}

float* pairwise_sub_simd_3 (const float* input, float* output, int n) 
{    
    for (int i = 0; i < n; i++) 
    {
        const int idxi = i * n;
        for (int k = 0; k < n; k += 8) 
        {
            __m256 A = _mm256_load_ps(input + idxi + k);
            for (int j = 0; j < n; j++)
            {
                const int idxj = j * n;
                const int outidx = (idxi + j) * n;
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(output + outidx + k, _mm256_sub_ps( A, B     ));
             }
        }
    }

    return output;
}

float* pairwise_sub_openmp (const float* input, float* output, int n)
{
    int i, j;
    #pragma omp parallel for private(j)
    for (i = 0; i < n; i++) 
    {
        for (j = 0; j < n; j++)
        {
            const int idxi = i * n; 
            const int idxj = j * n;
            const int outidx = idxi + j;
            for (int k = 0; k < n; k += 8) 
            {
                __m256 A = _mm256_load_ps(input + idxi + k);
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(output + outidx * n + k, _mm256_sub_ps( A, B ));
            }
        }
    }
    /*for (i = 0; i < n; i++) 
    {
        for (j = 0; j < n; j++) 
        {
            for (int k = 0; k < n; k++)
            {
                output[(i * n + j) * n + k] = input[i * n + k] - input[j * n + k];
            }
        }
    }*/
    
    return output;
}

int main ()
{
    constexpr size_t n = 512;
    constexpr size_t input_size = n * n;
    constexpr size_t output_size = n * n * n;

    float* input = (float*) aligned_alloc(32, input_size * sizeof(float));
    float* output = (float*) aligned_alloc(32, output_size * sizeof(float));

    float* input_simd = (float*) aligned_alloc(32, input_size * sizeof(float));
    float* output_simd = (float*) aligned_alloc(32, output_size * sizeof(float));

    float* input_par = (float*) aligned_alloc(32, input_size * sizeof(float));
    float* output_par = (float*) aligned_alloc(32, output_size * sizeof(float));

    iota(input, input + input_size, float(0.0));
    fill(output, output + output_size, float(0.0));

    iota(input_simd, input_simd + input_size, float(0.0));
    fill(output_simd, output_simd + output_size, float(0.0));
    
    iota(input_par, input_par + input_size, float(0.0));
    fill(output_par, output_par + output_size, float(0.0));

    std::chrono::milliseconds best_scaler{100000};
    for (int i = 0; i < 5; ++i)
    {
        auto start = chrono::high_resolution_clock::now();
        pairwise_sub_naive(input, output, n);
        auto stop = chrono::high_resolution_clock::now();

        auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
        if (duration < best_scaler)
        {
            best_scaler = duration;
        }
    }
    cout << "Time scaler version: " << best_scaler.count() << " ms\n";

    std::chrono::milliseconds best_simd{100000};
for (int i = 0; i < 5; ++i)
{
    auto start = chrono::high_resolution_clock::now();
    pairwise_sub_simd(input_simd, output_simd, n);
    auto stop = chrono::high_resolution_clock::now();

    auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
     if (duration < best_simd)
    {
        best_simd = duration;
    }
}
cout << "Time SIMD version: " << best_simd.count() << " ms\n";

std::chrono::milliseconds best_simd_2{100000};
for (int i = 0; i < 5; ++i)
{
    auto start = chrono::high_resolution_clock::now();
    pairwise_sub_simd_2(input_simd, output_simd, n);
    auto stop = chrono::high_resolution_clock::now();

    auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
     if (duration < best_simd_2)
    {
        best_simd_2 = duration;
    }
}
cout << "Time SIMD 2 version: " << best_simd_2.count() << " ms\n";

std::chrono::milliseconds best_simd_3{100000};
for (int i = 0; i < 5; ++i)
{
    auto start = chrono::high_resolution_clock::now();
    pairwise_sub_simd_3(input_simd, output_simd, n);
    auto stop = chrono::high_resolution_clock::now();

    auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
     if (duration < best_simd_3)
    {
        best_simd_3 = duration;
    }
}
cout << "Time SIMD 3 version: " << best_simd_3.count() << " ms\n";

    std::chrono::milliseconds best_par{100000};
    for (int i = 0; i < 5; ++i)
    {
        auto start = chrono::high_resolution_clock::now();
        pairwise_sub_openmp(input_par, output_par, n);
        auto stop = chrono::high_resolution_clock::now();

        auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
         if (duration < best_par)
        {
            best_par = duration;
        }
    }
    cout << "Time OpenMP version: " << best_par.count() << " ms\n";

    cout << "Verification\n";
    if (equal(output, output + output_size, output_simd))
    {
        cout << "PASSED\n";
    }
    else
    {
        cout << "FAILED\n";
    }

    return 0;
}
modifier:

petite correction car il y avait un mauvais appel lié à la deuxième version de l'implémentation SIMD.

Comme vous pouvez le voir maintenant, la deuxième implémentation est la plus rapide telle qu'elle se comporte Le meilleur du point de vue de la localité de référence du cache. Les exemples 2 et 3 des implémentations SIMD sont là pour illustrer pour vous comment changer les modèles d'accès à la mémoire pour influencer les performances de vos optimisations SIMD. Pour résumer (sachant que je suis loin d'être complet à mon conseil), soyez conscient de vos modèles d'accès à la mémoire et des charges et des magasins à \ de l'unité SIMD; Le SIMD est une unité matérielle différente à l'intérieur du noyau du processeur, il y a donc une pénalité dans la mélange de données dans les deux sens, donc lorsque vous chargez un registre à partir de la mémoire, essayez de faire autant d'opérations que possible avec ces données et ne soyez pas trop désireux de stocker Il est de retour (bien sûr, dans votre exemple, cela pourrait être tout ce que vous devez faire avec les données). Soyez conscient également qu'il y a un nombre limité de registres SIMD disponibles et si vous en chargez trop, ils "déversent", c'est-à-dire qu'ils seront retrouvés dans des emplacements temporaires en mémoire principale dans les coulisses en tuant tous vos gains. Optimisation SIMD, c'est un véritable acte d'équilibre!

Il y a des efforts pour mettre un emballage intrinsèque multiplateforme dans la norme (je me suis développé une source fermée dans mon passé glorieux) et même c'est loin d'être Complet, cela vaut la peine de jeter un œil (lisez les articles d'accompagnement si vous êtes vraiment intéressé à savoir comment fonctionne SIMD). https://github.com/vcdevel/std-simd

p >


4 commentaires

Merci beaucoup pour votre réponse. En effet, en supprimant l'allocation de mon code naïf, je reçois le même perf que vous (40 ms par fonction appelle mon ordinateur portable). Mais je suis toujours perplexe: si j'exécute une fonction vide qui n'alloue que le tableau et le renvoie, il est super rapide (0,003 ms par appel). Comment cela peut-il être?


Tout est dans les optimisations que le compilateur est capable de faire. Lorsque vous avez plus de code autour de vos allocations, le compilateur peut ne pas être en mesure de supposer beaucoup sur le contexte comme lorsque vous n'avez qu'une allocation. Si vous voulez aller plus loin dans le terrier du lapin, alors prenez votre code dans les deux situations, inscrivez-vous dans le [compilateur Explorer | D'accord. En outre, vous avez dit que Numpy "n'alloue pas la mémoire au point de calcul". Qu'est-ce que cela signifie? Cela signifie-t-il que Numpy pré-alloue la mémoire? Mais il ne peut pas savoir à l'avance la taille résultante avant l'opération que j'ai effectuée, n'est-ce pas? Il devrait donc toujours allouer la mémoire au moment où l'opération est effectuée, qui aurait dû apparaître dans le temps mesuré final ... (?)


"N = 512 x = np.random.rand (n, n) .astype (np.float32)" Faites en fait votre soustraction par paire dans la fonction "pairwise_sub_numpy".



9
votes

Ceci est un complément à la réponse publiée par @celakev. Je pense que j'ai finalement pu comprendre quel était exactement le problème. Le problème était pas quant à l'allocation de la mémoire dans la fonction principale qui fait le calcul.

Ce qui prenait réellement le temps, c'est de accéder à une nouvelle (nouvelle) mémoire . Je crois que l'appel malloc renvoie des pages de mémoire virtuelles, c'est-à-dire que cela ne correspond pas à la mémoire physique réelle - jusqu'à ce qu'il soit explicitement accessible. Ce qui prend du temps, c'est le processus d'allocation de la mémoire physique à la volée (ce qui, je pense, est au niveau OS) lorsqu'il est accessible dans le code de fonction.

Voici une preuve. Considérez les deux fonctions triviales suivantes:

Time[0] = 0.000931 ms
Time[1] = 0.000540 ms
Time[2] = 0.000523 ms
Time[3] = 0.000524 ms
Time[4] = 0.000521 ms
Best of 5 for 'float* arr = just_alloc(N*N*N)' = 0.000521 ms

Time[0] = 189.822237 ms
Time[1] = 45.041083 ms
Time[2] = 46.331428 ms
Time[3] = 44.729433 ms
Time[4] = 42.241279 ms
Best of 5 for 'just_fill(arr, N*N*N)' = 42.241279 ms

J'obtiens les horaires suivants, que je détaille maintenant pour chacun des appels:

#include <stdio.h>
#include <stdlib.h>
#include <time.h>

float* just_alloc( size_t N ) 
{
    return (float*) aligned_alloc( 32, sizeof(float)*N );
}

void just_fill( float* _arr, size_t N ) 
{
    for (size_t i = 0; i < N; i++)
        _arr[i] = 1;
}

#define Time( code_to_benchmark, cleanup_code ) \
    do { \
        double best = 9e9; \
        for( int i = 0; i < 5; i++) { \
            struct timespec start, stop; \
            clock_gettime(CLOCK_THREAD_CPUTIME_ID, &start); \
            code_to_benchmark; \
            clock_gettime(CLOCK_THREAD_CPUTIME_ID, &stop); \
            double t = (stop.tv_sec - start.tv_sec) * 1e3 + (stop.tv_nsec - start.tv_nsec) / 1e6; \
            printf("Time[%d] = %f ms\n", i, t); \
            if (t < best) best = t; \
            cleanup_code; \
        } \
        printf("Best of 5 for '" #code_to_benchmark "' = %f ms\n\n", best); \
    } while(0)

int main() 
{
    const size_t N = 512;

    Time( float* arr = just_alloc(N*N*N), free(arr) );
    
    float* arr = just_alloc(N*N*N);
    Time( just_fill(arr, N*N*N), ; );
    free(arr);

    return 0;
}


2 commentaires

"Donc, fondamentalement, la raison pour laquelle mon code était lent était parce que je réalisais à chaque fois une nouvelle mémoire qui n'avait pas encore d'adresse physique. (Corrigez-moi si je me trompe mais je pense que c'est l'essentiel!)" Donc il s'agissait donc de Attribution de la mémoire à la fin :)


Le processus que vous venez de décrire fait partie de l'allocation de mémoire - Malloc & Friends appellent un système d'exploitation primitif (spécifique au système d'exploitation) afin d'allouer la mémoire car c'est le système d'exploitation qui gère cela. Cela a également un prix, car vous «traversez» de l'espace utilisateur à l'espace du noyau. Certaines personnes utilisent des allocateurs personnalisés pour "éviter" la partie o que



1
votes

Un peu en retard à la partie, mais je voulais ajouter une méthode paire avec eigen , qui est censé donner à C ++ une capacité de manipulation d'algèbre de haut niveau et utiliser SIMD sous le capot. Tout comme Numpy.

Voici l'implémentation

// inside the pairwise method
        for (int k = 0; k < input.cols(); ++k)
                output[k] -= input.col(k) * Eigen::RowVectorXf::Ones(input.cols());


// at allocation time
        std::vector<Eigen::MatrixXf> output(n, input);

Les tests de référence complets suggérés par @celavek sur mon système sont

Time scaler version: 57 ms
Time SIMD version: 58 ms
Time SIMD 2 version: 40 ms
Time SIMD 3 version: 58 ms
Time OpenMP version: 58 ms

Time Eigen version: 76 ms

Numpy >> best of 5 = 118.489 ms


0 commentaires