From 547601559cc0ff73cbfd2f3cbfc13dabac27f910 Mon Sep 17 00:00:00 2001 From: Stenzek Date: Fri, 3 Jan 2025 18:48:20 +1000 Subject: [PATCH] Common: Add TaskQueue class --- src/common/CMakeLists.txt | 2 + src/common/common.vcxproj | 2 + src/common/common.vcxproj.filters | 2 + src/common/task_queue.cpp | 98 +++++++++++++++++++++++++++++++ src/common/task_queue.h | 59 +++++++++++++++++++ 5 files changed, 163 insertions(+) create mode 100644 src/common/task_queue.cpp create mode 100644 src/common/task_queue.h diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index f67e649e1..f1146aa21 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -60,6 +60,8 @@ add_library(common thirdparty/SmallVector.h thirdparty/aes.cpp thirdparty/aes.h + task_queue.cpp + task_queue.h threading.cpp threading.h timer.cpp diff --git a/src/common/common.vcxproj b/src/common/common.vcxproj index d08734283..c4c36981c 100644 --- a/src/common/common.vcxproj +++ b/src/common/common.vcxproj @@ -45,6 +45,7 @@ + @@ -74,6 +75,7 @@ + diff --git a/src/common/common.vcxproj.filters b/src/common/common.vcxproj.filters index 7e60a05f0..09070d4e1 100644 --- a/src/common/common.vcxproj.filters +++ b/src/common/common.vcxproj.filters @@ -52,6 +52,7 @@ + @@ -82,6 +83,7 @@ + diff --git a/src/common/task_queue.cpp b/src/common/task_queue.cpp new file mode 100644 index 000000000..d240e35ef --- /dev/null +++ b/src/common/task_queue.cpp @@ -0,0 +1,98 @@ +// SPDX-FileCopyrightText: 2019-2025 Connor McLaughlin +// SPDX-License-Identifier: CC-BY-NC-ND-4.0 + +#include "task_queue.h" +#include "assert.h" + +TaskQueue::TaskQueue() = default; + +TaskQueue::~TaskQueue() +{ + SetWorkerCount(0); + Assert(m_tasks.empty()); +} + +void TaskQueue::SetWorkerCount(u32 count) +{ + std::unique_lock lock(m_mutex); + + WaitForAll(lock); + + if (!m_threads.empty()) + { + m_threads_done = true; + m_task_wait_cv.notify_all(); + + auto threads = std::move(m_threads); + m_threads = decltype(threads)(); + + lock.unlock(); + for (std::thread& t : threads) + t.join(); + lock.lock(); + } + + if (count > 0) + { + m_threads_done = false; + for (u32 i = 0; i < count; i++) + m_threads.emplace_back(&TaskQueue::WorkerThreadEntryPoint, this); + } +} + +void TaskQueue::SubmitTask(TaskFunctionType func) +{ + std::unique_lock lock(m_mutex); + m_tasks.push_back(std::move(func)); + m_tasks_outstanding++; + m_task_wait_cv.notify_one(); +} + +void TaskQueue::WaitForAll() +{ + std::unique_lock lock(m_mutex); + WaitForAll(lock); +} + +void TaskQueue::WaitForAll(std::unique_lock& lock) +{ + // while we're waiting, execute work on the calling thread + m_tasks_done_cv.wait(lock, [this, &lock]() { + if (m_tasks_outstanding == 0) + return true; + + while (!m_tasks.empty()) + ExecuteOneTask(lock); + + return (m_tasks_outstanding == 0); + }); +} + +void TaskQueue::ExecuteOneTask(std::unique_lock& lock) +{ + TaskFunctionType func = std::move(m_tasks.front()); + m_tasks.pop_front(); + lock.unlock(); + func(); + lock.lock(); + m_tasks_outstanding--; + if (m_tasks_outstanding == 0) + m_tasks_done_cv.notify_all(); +} + +void TaskQueue::WorkerThreadEntryPoint() +{ + Threading::SetNameOfCurrentThread("TaskQueue Worker"); + + std::unique_lock lock(m_mutex); + while (!m_threads_done) + { + if (m_tasks.empty()) + { + m_task_wait_cv.wait(lock); + continue; + } + + ExecuteOneTask(lock); + } +} diff --git a/src/common/task_queue.h b/src/common/task_queue.h new file mode 100644 index 000000000..6220b4659 --- /dev/null +++ b/src/common/task_queue.h @@ -0,0 +1,59 @@ +// SPDX-FileCopyrightText: 2019-2025 Connor McLaughlin +// SPDX-License-Identifier: CC-BY-NC-ND-4.0 + +#pragma once + +#include "threading.h" +#include "types.h" + +#include +#include +#include +#include +#include +#include +#include + +/// Implements a simple task queue with multiple worker threads. +class TaskQueue +{ +public: + using TaskFunctionType = std::function; + + TaskQueue(); + ~TaskQueue(); + + /// Sets the number of worker threads to be used by the task queue. + /// Setting this to zero threads completes tasks on the calling thread. + /// @param count The desired number of worker threads. + void SetWorkerCount(u32 count); + + /// Submits a task to the queue for execution. + /// @param func The task function to execute. + void SubmitTask(TaskFunctionType func); + + /// Waits for all submitted tasks to complete execution. + void WaitForAll(); + +private: + /// Waits for all submitted tasks to complete execution. + /// This is a helper function that assumes a lock is already held. + /// @param lock A unique_lock object holding the mutex. + void WaitForAll(std::unique_lock& lock); + + /// Executes one task from the queue. + /// This is a helper function that assumes a lock is already held. + /// @param lock A unique_lock object holding the mutex. + void ExecuteOneTask(std::unique_lock& lock); + + /// Entry point for worker threads. Executes tasks from the queue until termination is signaled. + void WorkerThreadEntryPoint(); + + std::mutex m_mutex; + std::deque m_tasks; + size_t m_tasks_outstanding = 0; + std::condition_variable m_task_wait_cv; + std::condition_variable m_tasks_done_cv; + std::vector m_threads; + bool m_threads_done = false; +};