1
votes

Optimiser la sommation de 2 tableaux d'octets

Je parcours un tableau d'octets et j'ajoute les valeurs d'un autre tableau d'octets dans une boucle for.

        var random = new Random();
        byte[] bytes = new byte[20_000_000]; 
        byte[] bytes2 = new byte[20_000_000];

        for (int i = 0; i < bytes.Length; i++)
        {
            bytes[i] = (byte)random.Next(255);
        }

        for (int i = 0; i < bytes.Length; i++)
        {
            bytes2[i] = (byte)random.Next(255);
        }

        //how to optimize the part below
        for (int i = 0; i < bytes.Length; i++)
        {
            bytes[i] += bytes2[i];
        }

Existe-t-il un moyen d'accélérer le processus, afin qu'il puisse être plus rapide que linéaire.


3 commentaires

Demandez-vous de concaténer deux tableaux d'octets ou d'ajouter chaque élément d'un tableau d'octets?


@juharr Ajout de code.


@CoryNelson Ajoutez chaque élément.


3 Réponses :


2
votes

Vous pouvez utiliser Vector:

static void Add(Span<byte> dst, ReadOnlySpan<byte> src)
{
    Span<Vector<byte>> dstVec = MemoryMarshal.Cast<byte, Vector<byte>>(dst);
    ReadOnlySpan<Vector<byte>> srcVec = MemoryMarshal.Cast<byte, Vector<byte>>(src);

    for (int i = 0; i < dstVec.Length; ++i)
    {
        dstVec[i] += srcVec[i];
    }

    for (int i = dstVec.Length * Vector<byte>.Count; i < dst.Length; ++i)
    {
        dst[i] += src[i];
    }
}

Cela ira encore plus vite si vous utilisez un pointeur ici pour aligner l'un de vos tableaux.

p >


2 commentaires

Je voulais souligner que cela nécessite .NET Core> = 2.1 ou .NET Standard> = 2.1. MemoryMarshal n'est pas disponible sur .NET Framework.


Cela donne un coup de pouce 5X!



2
votes

Vous pouvez utiliser tous vos processeurs / cœurs, en supposant que votre machine en possède plusieurs.

using System.Numerics;

/// <summary>Adds each pair of elements in two arrays, and replaces the
/// left array element with the result.</summary>
public static void Add_UsingVector(byte[] left, byte[] right, int start, int length)
{
    int i = start;
    int step = Vector<byte>.Count; // the step is 16
    int end = start + length - step + 1;
    for (; i < end; i += step)
    {
        // Vectorize 16 bytes from each array
        var vector1 = new Vector<byte>(left, i);
        var vector2 = new Vector<byte>(right, i);
        vector1 += vector2; // Vector arithmetic is unchecked only
        vector1.CopyTo(left, i);
    }
    for (; i < start + length; i++) // Process the last few elements
    {
        unchecked { left[i] += right[i]; }
    }
}

Mise à jour: Le Vector classe peut également être utilisé dans .NET Framework. Il nécessite le package System.Numerics.Vectors . Il offre l'avantage de la parallélisation dans un seul cœur, en émettant une seule instruction à plusieurs données ( SIMD ). La plupart des processeurs actuels sont compatibles SIMD. Il n'est activé que pour les processus 64 bits, donc l'indicateur [Préférer 32 bits] doit être décoché. Sur les processus 32 bits, la propriété Vector. IsHardwareAccelerated renvoie false , et les performances sont mauvaises.

Parallel.ForEach(Partitioner.Create(0, bytes.Length), range =>
{
    for (int i = range.Item1; i < range.Item2; i++)
    {
        bytes[i] += bytes2[i];
    }
});

Cela fonctionne 4 à 5 fois plus vite qu'un simple boucle, sans utiliser plus d'un thread (25% de consommation CPU dans un PC à 4 cœurs).


1 commentaires

J'ai mis à jour ma réponse avec des informations sur l'utilisation du Classe Vector dans .NET Framework.



2
votes

Remplissez la longueur du tableau au prochain multiple le plus élevé de 8. (C'est déjà le cas dans votre exemple.)

Utilisez un contexte non sécurisé pour créer deux tableaux ulong pointant vers le début de l'existant. tableaux d'octets. Utilisez une boucle for pour itérer octets.Longueur / 8 fois en ajoutant 8 octets à la fois.

Sur mon système, cela dure moins de 13 millisecondes . Comparé à 105 millisecondes pour le code d'origine.

Vous devez ajouter l'option / unsafe pour utiliser ce code. Ouvrez les propriétés du projet et sélectionnez "Autoriser le code non sécurisé".

var random = new Random();
byte[] bytes = new byte[20_000_000]; 
byte[] bytes2 = new byte[20_000_000];




int Len = bytes.Length >> 3; // >>3 is the same as / 8

ulong MASK =    0x8080808080808080;
ulong MASKINV = 0x7f7f7f7f7f7f7f7f;

//Sanity check
if((bytes.Length & 7) != 0) throw new Exception("bytes.Length is not a multiple of 8");
if((bytes2.Length & 7) != 0) throw new Exception("bytes2.Length is not a multiple of 8");

unsafe
{
    //Add 8 bytes at a time, taking into account overflow between bytes
   fixed (byte* pbBytes = &bytes[0])
   fixed (byte* pbBytes2 = &bytes2[0])
   {
      ulong* pBytes = (ulong*)pbBytes;
      ulong* pBytes2 = (ulong*)pbBytes2;
      for (int i = 0; i < Len; i++)
      {
        pBytes[i] = ((pBytes2[i] & MASKINV) + (pBytes[i] & MASKINV)) ^ ((pBytes[i] ^ pBytes2[i]) & MASK);
      } 
   }
}


5 commentaires

La modification du contenu de octets2 est-elle vraiment nécessaire pour votre algorithme?


@PetSerAl, si le 8ème bit de byte2 n'était pas mis à 0, il y aurait la possibilité d'un débordement dans le bit bas de l'octet suivant en raison du report. Merci, la logique est nécessaire, les affectations ne l'étaient pas. 25 ms -> 12 ms. Le nouveau code est un peu difficile à lire.


@Strom Existe-t-il un moyen de conserver la valeur de survol, afin que je puisse calculer la moyenne si nécessaire.


@Strom que se passe-t-il si nous convertissons l'octet [] en court [] et après la somme le convertissons en octet []. Comment le MASK / MASKINV changerait-il?


@ Pavel, Les masques pour les valeurs ushort sont 0x8000800080008000, et l'inverse 0x7fff7fff7fff7fff, 1. cela perdrait la moitié de l'efficacité. 2. le masque serait inutile (si le 1 le plus élevé est inférieur au 15e bit pour chaque opérande).