Resource icon

[C++] Threading & Producer/Consumer

Programmiersprache(n)
C++
Threads sind toll. Mit einem zweiten Thread kann man in derselben Zeit die doppelte Arbeit schaffen. (Wer hat heutzutage noch keine mehrkernige CPU?) - Auf einem 4, 8, 16-Kerner schafft man natürlich dementsprechend mehr.

Manchmal ist die Arbeit, die wir erledigen wollen, schön aufgeteilt, z.B. wollen wir manchmal einfach nur eine Menge Tasks abarbeiten, viele Dinge gleichzeitig tun, o.ä - dann hilft parallel_for_each.

Aber was, wenn die Arbeit noch gar nicht bereit steht, sie vielleicht nur über User-Eingabe oder das Internet quasi hereintröpfelt? Wir die Tasks einfach nur im Hintergrund abarbeiten möchten, ohne gleich sämtliche CPU-Kerne zu belegen?

In dem Fall hilft uns das Producer/Consumer-Beispiel. Im allgemeinen Fall produzieren mehrere Threads gleichzeitig Arbeit, die von mehreren Threads wiederum abgearbeitet werden sollen (weshalb dieses Problem in so ziemlich jedem Multithreading-Kurs zur Behandlung von Semaphoren herangezogen wird...).

Wir beschränken uns darauf, mit einem Thread die Arbeitspakete zu "konsumieren", von wie vielen Threads aus Pakete erzeugt werden, ist eigentlich nebensächlich; bloß wird die Synchronisierung per Mutex bei Verwendung von mehr und mehr Threads irgendwann suboptimal. Bei 2, 3 Threads auf beiden Seiten sollte es aber noch keine Probleme geben.
So oder so, dies ist eher ein Beispiel für verzögerte, nebenläufige Verarbeitung vergleichsweise sporadisch anfallender Tasks, keine hochperfomante Lösung für alles.

Erstmal brauchen wir einen Thread, der pausenlos Tasks abarbeitet, wenn er welche hat.
Dafür schreiben wir erstmal eine Klasse, nennen wir sie TaskQueue, mit einer (fast) Endlosschleife:

C++:
while(!mShallStop) //atomic bool, welches von draußen gesetzt werden kann
{
    try
    {
        //Tasks abarbeiten
    }
    catch(const std::exception& e)
    {
        std::cout << "exception while working on task, " << e.what() << ", dropping whole chunk" << std::endl;
    }
}
mShallStop ist ein std::atomic_bool und Membervariable der TaskQueue, wird mit false initialisiert und kann gegebenenfalls von draußen auf true gesetzt werden, um die Endlosschleife zu beenden und den Thread sauber zu joinen.

Für den Thread selber nehmen wir am besten den RAIIThread, dann müssen wir uns ums join() nicht kümmern. Alles in allem sieht unsere Klasse jetzt etwa so aus:

C++:
class TaskQueue
{
public:
    TaskQueue();
    ~TaskQueue()
    {
        scheduleShutdown();
    }

    template<typename T>
    void addTask(T&& task)
    {
        //TODO
    }

    void scheduleShutdown()
    {
        mShallStop = true;
    }

private:
    void operator()(); //nicht notwendig - sollte vllt sogar in einer anderen Klasse definiert sein.

    RAIIThread mWorkerThread;
    std::atomic_bool mShallStop;
};

TaskQueue::TaskQueue() : mShallStop(false)
{
    //delayed start because *this might not be fully initialized
    std::thread tempThread(std::ref(*this));
    mWorkerThread.get().swap(tempThread);
}

void TaskQueue::operator()()
{
    while(!mShallStop)
    {
        //TODO: retrieve pending tasks

        try
        {
            //TODO: execute all pending tasks
        }
        catch(const std::exception& e)
        {
            std::cout << "exception while working on task, " << e.what() << ", dropping whole chunk" << std::endl;
        }
    }
}
Ja!! Ich habe tatsächlich einen Destruktor geschrieben! Dies ist einer der seltenen Fälle, in denen man tatsächlich einen braucht, denn man kann sich nicht darauf verlassen, dass der Nutzer auch brav scheduleShutdown() aufruft, bevor das TaskQueue Objekt zerstört wird, und das würde dazu führen, dass wir ewig versuchen, die Endlosschleife zu join()en.

Das swap() im Konstruktor braucht man nur, weil ich unbedingt die Endlosschleife in dieselbe Klasse legen wollte, und natürlich musste der Klammer-Operator herhalten. Muss man nicht so machen, macht Dinge eher nur komplizierter....

Gut, bisher macht unsere Endlosschleife nur eins: Eine CPU voll auslasten, und das, ohne dabei etwas Produktives zu erreichen.
Implementieren wir erst das produktive: Wir wollen eine Aktion auf einer Reihe von Task-Paketen aufrufen.

Wer das ganze nicht sonderlich wiederverwendbar coden will, schreibt sich einfach eine Klasse Task, die exakt die zu behandelnden Daten enthält - sagen wir, ich will Bilder, die ich von einer Kamera bekomme, verarbeiten und abspeichern. In dem Fall könnte Task das aufgenommene Bild und den Dateinamen, unter dem es gespeichert werden soll enthalten.

Aus historischen Gründen (damals wurde noch Task für Task aus der Liste entnommen und abgearbeitet) verwenden wir eine std::deque<Task>, um unsere Tasks in der TaskQueue zu halten - wie wir später aber sehen werden, könnte das genausogut ein Vector sein.

Unsere Schleife soll also über unsere Liste von Tasks mTasks laufen, und diese abarbeiten. Aber natürlich muss diese Liste auch irgendwo gefüllt werden - gleichzeitig - wir müssen also sowohl addTask() implementieren als auch eine Form von Synchronisation einbauen.

Ich habe da mal etwas vorbereitet....
C++:
class TaskQueue
{
public:
    TaskQueue();
    ~TaskQueue()
    {
        scheduleShutdown();
    }

    void addTask(Task task)
    {
        std::lock_guard<std::mutex> lock(mAccessMutex);
        mTasks.push_back(task);
    }

    void scheduleShutdown()
    {
        mShallStop = true;
    }

private:
    void operator()(); //nicht notwendig - sollte vllt sogar in einer anderen Klasse definiert sein.

    std::mutex mAccessMutex;
    std::deque<Task> mTasks;

    RAIIThread mWorkerThread;
    std::atomic_bool mShallStop;
};

TaskQueue::TaskQueue() : mShallStop(false)
{
    //delayed start because *this might not be fully initialized
    std::thread tempThread(std::ref(*this));
    mWorkerThread.get().swap(tempThread);
}

void TaskQueue::operator()()
{
    while(!mShallStop)
    {
        //work off a bunch of tasks:
        std::deque<Task> chosenTasks;

        {
            std::lock_guard<std::mutex> lock(mAccessMutex);
            chosenTasks.swap(mTasks);
        } //lock is unlocked on destruction

        try
        {
            std::for_each(chosenTasks.begin(), chosenTasks.end(), myAwesomeAction);
        }
        catch(const std::exception& e)
        {
            std::cout << "exception while working on task, " << e.what() << ", dropping whole chunk" << std::endl;
        }
    }
}
Das std::mutex mAccessMutex sicher jetzt den Zugriff auf die (vom Haupt- und Arbeitsthread) geteilte Variable mTasks. addTask (und unsere Endlosschleife) benutzen einen lock_guard um im gegebenen Scope das Mutex zu locken - addTask greift währenddessen auf mTasks zu, und hängt einen Task an.

In unserer Endlosschleife wollen wir verhindern, für jeden einzelnen Task synchronisieren zu müssen, daher legen wir unsere eigene, interne Liste an und tauschen beide kurzerhand aus - das ist bloß ein swap von etwa 12 Bytes, die Objekte selbst werden weder kopiert noch verschoben. Dies ist natürlich viel kürzer, als sämtliche Objekte abzuarbeiten, und erlaubt dem Hauptthread, neue Elemente in die Liste zu schieben, während wir die Kopie zuende abarbeiten.
Wie immer gilt: So selten wie möglich synchronisieren, und so kurz wie möglich blocken.

An sich läuft jetzt alles. Haben allerdings noch ein Problem: Wenn man eine Weile lang keine Tasks produziert werden, rennt unsere Endlosschleife davon und belegt wieder eine CPU völlig - ist immerhin eine Endlosschleife.

Was man an dieser Stelle in anderem *hust*, minderwertigem Code oft findet, ist ein sleep()-Auruf.
Das ist schlecht. Warum?
Einmal führt es dazu, dass unsere Schleife immer noch durchrennt, wenn auch seltener.
Zweitens wird jetzt *jede* Abarbeitung nach einer "Nullrunde" verzögert.
Beide Probleme kann man gegeneinander abwiegen - eine Sekunde auf Tasks warten führt zu ignorierbarer CPU-Last, aber auch gewaltiger Latenz. Eine Millisekunde erzeugt messbare CPU-Last, aber die Latenz sollte vertretbar sein.
Im schlimmsten Fall - man hat sich auf eine Wartedauer von 33ms geinigt, da die Bilder mit etwa 30fps eintrudeln - produziert jetzt jeder zweite Aufruf einen Miss - noch kein Bild im Speicher - und jeder andere hat zwei Bilder, die dann sequentiell verarbeitet werden.
Steckt hinter dieser TaskQueue noch eine weitere, die z.B. die Bilder anzeigt oder übers Internet verschickt, verstärkt sich das Problem. Ohnehin kommt jetzt jedes Bild(paar) mit 33ms Verzögerung an, aber noch dazu ist es die doppelte Verarbeitungszeit (sagen wir, 2x2 = 4ms) verspätet, weil immer zwei Bilder verarbeitet werden. Je weiter man dieses Spiel treibt, desto mehr Threads hat man, die sinnlos warten, und dann plötzlich drauflos arbeiten müssen.

Sauber löst man dieses Problem mit einem Monitor. (Auf englisch eine condition variable.)
Auf eine condition variable kann man warten. Ein anderer Thread kann wartende Threads über die Variable aufwecken/triggern, und das alles ohne sleep.
In C++ ist eine condition variable immer abhängig von einem Mutex - damit wird sicher gestellt, dass die Zugriffe auch schön synchron erfolgen. Man holt sich also erst Zugriff auf das Mutex, wartet dann auf die variable - hierbei wird das Mutex temporär freigegeben, und zwar so lange, bis ein anderer Thread uns aufweckt, schön haben wir das Mutex wieder (und wissen auch, dass es wieder Arbeit gibt).

Bauen wir das ein!
C++:
class TaskQueue
{
public:
    TaskQueue(Action& action, std::size_t maxQueueSize = 0);
    ~TaskQueue()
    {
        scheduleShutdown();
    }

    void addTask(Task task)
    {
        std::lock_guard<std::mutex> lock(mAccessMutex); //Zugriff synchronisieren
        mTasks.push_back(task);
        mWait.notify_all(); //dem schlafenden Thread Bescheid geben
    }

    void operator()();

    void scheduleShutdown()
    {
        mShallStop = true;
        mWait.notify_all(); //eventuell schlafenden Thread aufwecken
    }

private:
    std::mutex mAccessMutex;
    std::condition_variable mWait;

    std::deque<Task> mTasks;

    RAIIThread mWorkerThread;
    std::atomic_bool mShallStop;
};

TaskQueue::TaskQueue() : mShallStop(false)
{
    //delayed start because *this might not be fully initialized
    std::thread tempThread(std::ref(*this));
    mWorkerThread.get().swap(tempThread);
}

void TaskQueue::operator()()
{
    while(!mShallStop)
    {
        //work off a bunch of tasks:
        std::deque<Parameter> chosenTasks;

        {
            std::unique_lock<std::mutex> lock(mAccessMutex);//Zugriff absichern
            mWait.wait(lock); //warten, bis wir aufgeweckt werden. Gibt das Lock temporär frei.
           
            //hier haben wir das Lock wieder, und wurden aufgeweckt.
            chosenTasks.swap(mTasks);
            mWait.notify_all(); //Zukunftssicher: Falls der Erzeuger eines Tages darauf warten soll, dass die Liste leer ist.
        }

        try
        {
            std::for_each(chosenTasks.begin(), chosenTasks.end(), myAwesomeAction);
        }
        catch(const std::exception& e)
        {
            std::cout << "exception while working on task, " << e.what() << ", dropping whole chunk" << std::endl;
        }
    }
}
Das zweite notify_all ist dafür gedacht, wenn man eines Tages den Producer (in diesem Fall: den Hauptthread) so lange anhalten will, bis sämtliche Objekte abgearbeitet sind. Wer nicht sämtliche Tasks gleich aus der Liste entfernen möchte, kann so auch darauf warten, dass wieder ausreichend Platz in der Warteschlange verfügbar ist. Kann man in diesem Beispiel auch weglassen, schadet aber auch nicht.

Es könnte so einfach sein. Leider gibt es da etwas, was sich spurious wakeup nennt - hin und wieder löst das Betriebssystem ein notify_all aus, ohne dass es im Code tatsächlich aufgerufen wurde. (Unter Linux passiert das z.B. wenn ein Signal an einen Prozess geschickt wird - wartende Threads können dann auf das Signal reagieren (z.B., beenden) auch wenn das, worauf man eigentlich gewartet hat, nicht eingetreten ist.)

Wir müssten also nach unserem wait()-Call noch testen, ob überhaupt Tasks verfügbar sind - oder ob mShallStop gesetzt wurde. C++ gibt uns die Möglichkeit, das in einem Callback automatisch testen zu lassen, so dass unser Code etwas sauberer ist: Aus mWait.wait(lock); wird mWait.wait(lock, [this]{return !mTasks.empty() || mShallStop;}); - wir übergeben wait zusätzlich noch eine Lambda-Funktion, die für uns testet, ob wir auch wirklich aufwachen wollen - in diesem Fall wollen wir nur aufwachen, wenn entweder Tasks verfügbar sind, oder mShallStop gesetzt wurde - in jedem anderen Fall schlafen wir einfach weiter.

Und hier noch eine Variante mit beliebigen Tasks, einer beliebigen Aktion, die ausgeführt werden kann, einer Möglichkeit, mehrere Tasks auf einmal einzufügen, einer Warteliste mit begrenzter Länge (aber keinem Warten, stattdessen einem bool, was darüber informiert, ob die Schlange voll war):

C++:
#include <thread>
#include <mutex>
#include <deque>
#include <future>
#include <iostream>
#include <algorithm>

template<typename Action, typename Parameter>
class TaskQueue
{
public:
    TaskQueue(Action& action, std::size_t maxQueueSize = 0);
    ~TaskQueue()
    {
        scheduleShutdown();
    }

    template<typename T>
    bool addTask(T&& task)
    {
        bool success = false;

        {
            std::lock_guard<std::mutex> lock(mAccessMutex);
            if(!mMaxQueueSize || mTasks.size() <= mMaxQueueSize)
            {
                mTasks.emplace_back(std::move(task));
                success = true;
            }
        }
        mWait.notify_all();

        return success;
    }

    template <typename Iterator>
    Iterator addTasks(Iterator begin, Iterator end)
    {
        Iterator result = begin;
        {
            std::lock_guard<std::mutex> lock(mAccessMutex);

            while((!mMaxQueueSize || mTasks.size() <= mMaxQueueSize)
                  &&
                  result != end)
            {
                mTasks.emplace_back(result++);
            }
        }
        mWait.notify_all();

        return result;
    }

    void operator()();

    void scheduleShutdown()
    {
        mShallStop = true;
        mWait.notify_all();
    }

private:
    std::mutex mAccessMutex;
    std::condition_variable mWait;

    std::deque<Parameter> mTasks;
    const std::size_t mMaxQueueSize;

    RAIIThread mWorkerThread;
    std::atomic_bool mShallStop;
    Action& mAction;
};

template<typename Action, typename Parameter>
TaskQueue<Action, Parameter>::TaskQueue(Action& action, std::size_t maxQueueSize) : mAction(action), mMaxQueueSize(maxQueueSize), mShallStop(false)
{
    //delayed start because *this might not be fully initialized
    std::thread tempThread(std::ref(*this));
    mWorkerThread.get().swap(tempThread);
}

template<typename Action, typename Parameter>
void TaskQueue<Action, Parameter>::operator()()
{
    while(!mShallStop)
    {
        //work off a bunch of tasks:
        std::deque<Parameter> chosenTasks;

        {
            std::unique_lock<std::mutex> lock(mAccessMutex);
            mWait.wait(lock, [this]{return !mTasks.empty() || mShallStop;}); //release mutex to the outside, when notified, check if tasks not empty
            chosenTasks.swap(mTasks);
        } //lock is unlocked on destruction

        mWait.notify_all(); //in case the producer is waiting to fill the queue. in the meantime, we are working...

        try
        {
            std::for_each(chosenTasks.begin(), chosenTasks.end(), std::ref(mAction));
        }
        catch(const std::exception& e)
        {
            std::cout << "exception while working on task, " << e.what() << ", dropping whole chunk" << std::endl;
        }
    }
}
In der Zukunft könnte man noch als Parameter die Abarbeite-Strategie übergeben - ob jeder Task einzeln abgeholt werden soll, oder wie oben im Paket - und ob man beim Erreichen der maximalen Warteschlangenlänge einen Fehler returnen, oder stattdessen warten will.

Reviews sind willkommen!
Autor
-AB-
First release
Last update
Bewertung
0,00 Stern(e) 0 Bewertungen

More resources from -AB-

Oben