Tweaks to TaskManager code

Reported by: Vladislav
Differential Revision: https://code.wildfiregames.com/D4077
This was SVN commit r25667.
This commit is contained in:
wraitii 2021-06-04 12:55:15 +00:00
parent f9ed769355
commit 69901d9ffb
2 changed files with 27 additions and 32 deletions

View File

@ -51,12 +51,11 @@ std::unique_ptr<TaskManager> g_TaskManager;
class Thread;
using QueueItem = std::function<void()>;
}
/**
* Light wrapper around std::thread. Ensures Join has been called.
*/
class Threading::Thread
class Thread
{
public:
Thread() = default;
@ -66,7 +65,7 @@ public:
template<typename T, void(T::* callable)()>
void Start(T* object)
{
m_Thread = std::thread(Threading::HandleExceptions<DoStart<T, callable>>::Wrapper, object);
m_Thread = std::thread(HandleExceptions<DoStart<T, callable>>::Wrapper, object);
}
template<typename T, void(T::* callable)()>
static void DoStart(T* object);
@ -84,10 +83,10 @@ protected:
/**
* Worker thread: process the taskManager queues until killed.
*/
class Threading::WorkerThread : public Thread
class WorkerThread : public Thread
{
public:
WorkerThread(Threading::TaskManager::Impl& taskManager);
WorkerThread(TaskManager::Impl& taskManager);
~WorkerThread();
/**
@ -101,7 +100,7 @@ protected:
std::mutex m_Mutex;
std::condition_variable m_ConditionVariable;
Threading::TaskManager::Impl& m_TaskManager;
TaskManager::Impl& m_TaskManager;
};
/**
@ -109,7 +108,7 @@ protected:
*
* The normal priority queue is processed first, the low priority only if there are no higher-priority tasks
*/
class Threading::TaskManager::Impl
class TaskManager::Impl
{
friend class TaskManager;
friend class WorkerThread;
@ -156,32 +155,32 @@ protected:
mutable size_t m_RoundRobinIdx = 0;
};
Threading::TaskManager::TaskManager() : TaskManager(std::thread::hardware_concurrency() - 1)
TaskManager::TaskManager() : TaskManager(std::thread::hardware_concurrency() - 1)
{
}
Threading::TaskManager::TaskManager(size_t numberOfWorkers)
TaskManager::TaskManager(size_t numberOfWorkers)
{
m = std::make_unique<Impl>(*this);
numberOfWorkers = Clamp<size_t>(numberOfWorkers, MIN_THREADS, MAX_THREADS);
m->SetupWorkers(numberOfWorkers);
}
Threading::TaskManager::~TaskManager() {}
TaskManager::~TaskManager() {}
Threading::TaskManager::Impl::Impl(TaskManager& backref)
TaskManager::Impl::Impl(TaskManager& backref)
: m_TaskManager(backref)
{
}
void Threading::TaskManager::Impl::SetupWorkers(size_t numberOfWorkers)
void TaskManager::Impl::SetupWorkers(size_t numberOfWorkers)
{
for (size_t i = 0; i < numberOfWorkers; ++i)
m_Workers.emplace_back(*this);
}
void Threading::TaskManager::ClearQueue() { m->ClearQueue(); }
void Threading::TaskManager::Impl::ClearQueue()
void TaskManager::ClearQueue() { m->ClearQueue(); }
void TaskManager::Impl::ClearQueue()
{
{
std::lock_guard<std::mutex> lock(m_GlobalMutex);
@ -193,17 +192,17 @@ void Threading::TaskManager::Impl::ClearQueue()
}
}
size_t Threading::TaskManager::GetNumberOfWorkers() const
size_t TaskManager::GetNumberOfWorkers() const
{
return m->m_Workers.size();
}
void Threading::TaskManager::DoPushTask(std::function<void()>&& task, TaskPriority priority)
void TaskManager::DoPushTask(std::function<void()>&& task, TaskPriority priority)
{
m->PushTask(std::move(task), priority);
}
void Threading::TaskManager::Impl::PushTask(std::function<void()>&& task, TaskPriority priority)
void TaskManager::Impl::PushTask(std::function<void()>&& task, TaskPriority priority)
{
std::mutex& mutex = priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex;
std::deque<QueueItem>& queue = priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue;
@ -218,8 +217,8 @@ void Threading::TaskManager::Impl::PushTask(std::function<void()>&& task, TaskPr
worker.Wake();
}
template<Threading::TaskPriority Priority>
bool Threading::TaskManager::Impl::PopTask(std::function<void()>& taskOut)
template<TaskPriority Priority>
bool TaskManager::Impl::PopTask(std::function<void()>& taskOut)
{
std::mutex& mutex = Priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex;
std::deque<QueueItem>& queue = Priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue;
@ -237,13 +236,13 @@ bool Threading::TaskManager::Impl::PopTask(std::function<void()>& taskOut)
return false;
}
void Threading::TaskManager::Initialise()
void TaskManager::Initialise()
{
if (!g_TaskManager)
g_TaskManager = std::make_unique<TaskManager>();
}
Threading::TaskManager& Threading::TaskManager::Instance()
TaskManager& TaskManager::Instance()
{
ENSURE(g_TaskManager);
return *g_TaskManager;
@ -251,13 +250,13 @@ Threading::TaskManager& Threading::TaskManager::Instance()
// Thread definition
Threading::WorkerThread::WorkerThread(Threading::TaskManager::Impl& taskManager)
WorkerThread::WorkerThread(TaskManager::Impl& taskManager)
: m_TaskManager(taskManager)
{
Start<WorkerThread, &WorkerThread::RunUntilDeath>(this);
}
Threading::WorkerThread::~WorkerThread()
WorkerThread::~WorkerThread()
{
m_Kill = true;
m_ConditionVariable.notify_one();
@ -265,12 +264,12 @@ Threading::WorkerThread::~WorkerThread()
m_Thread.join();
}
void Threading::WorkerThread::Wake()
void WorkerThread::Wake()
{
m_ConditionVariable.notify_one();
}
void Threading::WorkerThread::RunUntilDeath()
void WorkerThread::RunUntilDeath()
{
// The profiler does better if the names are unique.
static std::atomic<int> n = 0;
@ -304,7 +303,9 @@ void Threading::WorkerThread::RunUntilDeath()
// Defined here - needs access to derived types.
template<typename T, void(T::* callable)()>
void Threading::Thread::DoStart(T* object)
void Thread::DoStart(T* object)
{
std::invoke(callable, object);
}
} // namespace Threading

View File

@ -23,14 +23,8 @@
#include <memory>
#include <vector>
class TestTaskManager;
class CConfigDB;
namespace Threading
{
class TaskManager;
class WorkerThread;
enum class TaskPriority
{
NORMAL,