Skip to main content

Threads

  • CMake
find_package(Threads REQUIRED)

add_executable(main main.cpp)
target_link_libraries(main PRIVATE Threads::Threads)
  • Include
#include <thread>
  • Current Thread
std::this_thread::get_id();
  • Get Number of cores:
unsigned int nCores = std::thread::hardware_concurrency();
  • Create a thread
void printMessage(std::string message) {
}

std::string message = "My Message";

std::thread t1(printMessage, message);
  • Create a thread with lambda
std::thread t2([message] {
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // simulate work
std::cout << "Thread 2: " << message << std::endl;
});
  • Create thread class

class Vehicle {
public:
void operator()() {
// do_stuff
}
}

std::thread t{ Vehicle() };
  • Wait on thread to finish:
t2.join();

Promise Feature

  • Simpler than using threads

  • But dont use when: I/O, if you need mutexes, or interact with other threads

  • Include

#include <future>
  • Create Future / Promise
void doStuff(std::promise<std::string> &&promise, std::string message) {
promise.set_value(message + " has been modified");
}

std::promise<std::string> promise;
std::future<std::string> future = promise.get_future();

std::string message = "My Message";

std::thread t{doStuff, std::mode(promise), message};

std::string messageFromThread = future.get();
  • Wait for
auto status = future.wait_for(std::chrono::milliseconds(1000));
if (status == std::future_status::ready) {
// future.get()
} else if (status == std::future_status::timeout || status == std::future_status::deferred) {
// deferred means not yet started
// result unavailable yet
}
  • Exception Handling
void doStuff(/**/) {
try {

} catch(...) {
promise.set_exception(std::current_exception());
}
}

int main() {
try {
future.get()
} catch(std::runtime_error e) {
std::cout << e.what() << std::endl;
}
}
std::string doStuff() {
std::this_thread::sleep_for(std::chrono::milliseconds (4000));
return "bla blubb";
}

std::future<std::string> future = std::async(std::launch::async, doStuff);

Message Queue

#include <iostream>
#include <thread>
#include <queue>
#include <future>
#include <mutex>

template <class T>
class MessageQueue {
public:
T receive() {
// perform queue modification under the lock
std::unique_lock<std::mutex> uLock(_mutex);
_cond.wait(uLock, [this] { return !_messages.empty(); }); // pass unique lock to condition variable
// remove last vector element from queue
T msg = std::move(_messages.back());
_messages.pop_back();
return msg; // will not be copied due to return value optimization (RVO) in C++
}

void send(T &&msg) {
// simulate some work
std::this_thread::sleep_for(std::chrono::milliseconds(100));

// perform vector modification under the lock
std::lock_guard<std::mutex> uLock(_mutex);

// add vector to queue
std::cout << " Message " << msg << " has been sent to the queue" << std::endl;
_messages.push_back(std::move(msg));
_cond.notify_one(); // notify client after pushing new Vehicle into vector
}
private:
std::mutex _mutex;
std::condition_variable _cond;
std::deque<T> _messages;
};

int main() {
// create monitor object as a shared pointer to enable access by multiple threads
std::shared_ptr<MessageQueue<int>> queue(new MessageQueue<int>);
std::cout << "Spawning threads..." << std::endl;
std::vector<std::future<void>> futures;
for (int i = 0; i < 10; ++i) {
int message = i;
futures.emplace_back(std::async(std::launch::async, &MessageQueue<int>::send, queue, std::move(message)));
}

std::cout << "Collecting results..." << std::endl;
while (true) {
int message = queue->receive();
std::cout << " Message #" << message << " has been removed from the queue" << std::endl;
}

std::for_each(futures.begin(), futures.end(), [](std::future<void> &ftr) {
ftr.wait();
});

std::cout << "Finished!" << std::endl;

return 0;
}