Resource icon

[C++ Quicktip] Multithreaded iterieren dank parallel_for_each

Programmiersprache(n)
C++
Du hast eine Liste mit 30.000 Dateinamen, willst alle Dateien öffnen, konvertieren und wieder speichern? 5 Milliarden Hashes, die berechnet werden wollen? Dann brauchst DU auch HEUTE NOCH Multithreading, denn damit kann man die Rechenzeit immerhin durch die Anzahl der verfügbaren Prozessorkerne teilen.

Aber erst - der RAIIThread:
C++:
enum class OnThreadDestruction
{
    join,
    detach,
    none
};

// inspired by Scott Meyer's "ThreadRAII" class in "Effective Modern C++"
template <OnThreadDestruction ActionOnClose = OnThreadDestruction::join>
class VariableActionOnCloseThread
{
public:
    VariableActionOnCloseThread()
    {
    }

    template <typename Func, typename ...Args>
    explicit VariableActionOnCloseThread(Func&& func, Args&&... args)
                : mThread(std::move(func), std::forward<Args>(args)...)
    {
    }

    ~VariableActionOnCloseThread()
    {
        if(mThread.joinable())
            Destruct(mThread);
    }

    std::thread& get()
    {
        return mThread;
    }
    std::thread& operator*()
    {
        return mThread;
    }

    //enable moving
    VariableActionOnCloseThread(VariableActionOnCloseThread&&) = default;
    VariableActionOnCloseThread& operator=(VariableActionOnCloseThread&&) = default;

private:
    std::thread mThread;

    template <OnThreadDestruction DestructAction = ActionOnClose>
    static typename std::enable_if<DestructAction == OnThreadDestruction::join, void>::type
    Destruct(std::thread& thread)
    {
        thread.join();
    }

    template <OnThreadDestruction DestructAction = ActionOnClose>
    static typename std::enable_if<DestructAction == OnThreadDestruction::detach, void>::type
    Destruct(std::thread& thread)
    {
        thread.detach();
    }

    template <OnThreadDestruction DestructAction = ActionOnClose>
    static typename std::enable_if<DestructAction == OnThreadDestruction::none, void>::type
    Destruct(std::thread&)
    {
    }
};

using RAIIThread = VariableActionOnCloseThread<OnThreadDestruction::join>;
using DetachedThread = VariableActionOnCloseThread<OnThreadDestruction::detach>;
using WrappedThread = VariableActionOnCloseThread<OnThreadDestruction::none>;
Der ist eigentlich nichts besonderes, ich verwende für gewöhnlich auch nur die Variante, die join aufruft. Diese Klasse kapselt eigentlich nur einen std::thread, und sorgt dafür, dass der Thread beim Zerstören des Handles ge-joined wird, sollte der Thread noch laufen. (Das ist eigentlich das Verhalten, was man sich üblicherweise wünscht.)

Kommen wir zum interessanteren Teil - dem Aufteilen von Arbeitspaketen und verteilen an Threads:
C++:
std::size_t inline getSuitableNumberOfThreads()
{
    static const std::size_t suitableNumberOfThreads = std::thread::hardware_concurrency() ? std::thread::hardware_concurrency() : 4;
    return suitableNumberOfThreads;
}


template <typename InputIterator, typename Action>
typename std::enable_if<std::is_same<typename InputIterator::iterator_category, std::random_access_iterator_tag>::value, Action>::type
parallel_for_each(InputIterator first, InputIterator last, Action action, const std::size_t numberOfThreads = getSuitableNumberOfThreads())
{
    ASSERT(numberOfThreads, "numberOfThreads must not be 0.");

    const auto count = std::distance(first, last);
    ASSERT(count >= 0, "iterators in wrong order, first should be <= last");

    const auto per_thread = count / numberOfThreads;
    const auto overhang = count % numberOfThreads;

    //prevents creation of additional, empty threads for empty ranges
    const auto countUntil = std::min(static_cast<std::size_t>(count), numberOfThreads);

    std::vector<RAIIThread> workingThreads;

    for(std::size_t i = 0; i< countUntil; ++i)
    {
        auto endOfSegment = first + per_thread + (i < overhang);
        workingThreads.emplace_back(
                    [first, endOfSegment, &action]()
                    {
                        std::for_each(first, endOfSegment, action);
                    }
                );

        first = endOfSegment;
    }
    return std::move(action);
}
Die erste Funktion versucht nur, heraus zu finden, wie viele Threads sie wohl benutzen sollte. Steht die Information nicht zur Verfügung, nehmen wir einfach mal 4 an.

Unsere parallel_for_each-Funktion nimmt 3 Parameter, so wie auch std::for_each - zwei Iteratoren und eine Aktion, die ausgeführt werden soll. Optional kann man per viertem Parameter noch die Anzahl Threads angeben, die benutzt werden soll - hilfreich, wenn man nicht alle Kerne belegen will.

Als nächstes bestimmen wir die Anzahl der Arbeitspakete, und teilen diese auf die Anzahl verwendeter Threads auf. Hierbei wollen wir sicherstellen, dass die maximale Differenz an Paketen pro Thread 1 ist. Dies ist rein statisches load balancing, es wird also davon ausgegangen, dass jeder Task gleich lange dauert.

Als nächstes legen wir countUntil Threads an (auf einem 40-Kerner wollen wir nicht 38 leere Threads spawnen, bloß weil wir diesmal nur 2 Arbeitspakete haben) und weisem jeden Thread einen Job zu: Mittels std::for_each über das entsprechende Segment zu laufen. Der Funktor action wird hierbei als Referenz übergeben, eventuelle Zugriffe auf geteilten Speicher muss der Aufrufer also selbst verhindern/synchronisieren. (Aber das geht immerhin leichter mit einem geteilten Objekt als mit mehreren Kopien.)

Und das wars auch schon! Unsere Threads laufen nebeneinander her, und werden vor dem return aus der Funktion gejoined. Sind somit alle Threads fertig, beendet die Funktion, und schon hat man mehrere kerne (hoffentlich) effektiv genutzt.

Und ja, ich weiß, dass es quasi auch nur ein omp parallel for ist - noch dazu ohne load balancing - aber nicht immer will oder kann man OpenMP nutzen...

Bei Fragen oder Anmerkungen wie immer nicht zögern, sondern in den entsprechenden Diskussionsthread schreiben!
Autor
-AB-
First release
Last update
Bewertung
5,00 Stern(e) 1 Bewertungen

More resources from -AB-

Latest reviews

Kann mich noch gut dran erinnern genau dieses Stück Software schon mal live gesehen zu haben ;)
-AB-
-AB-
Richtig, ich dachte auch schon, es schon eingestellt zu haben, aber wir hatten es damals wohl nur im Thread diskutiert. Ich schreibe gerade ein follow-up. :)
Oben