Implement a global task manager using a pool of worker threads
Tasks are simple callables (e.g. lambdas), and can be pushed with 2 priority levels. Pushing a task returns a future. Futures can be waited on, can return results, and can be cancelled deterministically. Futures can also not be waited on. This gives 'hardware concurrency - 1' threads to maximize CPU usage in a work-stealing workflow. Reviewed by: vladislavbelov Refs #5874 Differential Revision: https://code.wildfiregames.com/D3848 This was SVN commit r25656.
This commit is contained in:
parent
c78ead79e6
commit
1b35d36daa
@ -30,7 +30,7 @@
|
|||||||
#include "ps/CLogger.h"
|
#include "ps/CLogger.h"
|
||||||
#include "ps/FileIo.h"
|
#include "ps/FileIo.h"
|
||||||
#include "ps/Profile.h"
|
#include "ps/Profile.h"
|
||||||
#include "ps/Threading.h"
|
#include "ps/TaskManager.h"
|
||||||
#include "ps/scripting/JSInterface_VFS.h"
|
#include "ps/scripting/JSInterface_VFS.h"
|
||||||
#include "scriptinterface/FunctionWrapper.h"
|
#include "scriptinterface/FunctionWrapper.h"
|
||||||
#include "scriptinterface/ScriptContext.h"
|
#include "scriptinterface/ScriptContext.h"
|
||||||
@ -69,9 +69,8 @@ CMapGeneratorWorker::CMapGeneratorWorker(ScriptInterface* scriptInterface) :
|
|||||||
|
|
||||||
CMapGeneratorWorker::~CMapGeneratorWorker()
|
CMapGeneratorWorker::~CMapGeneratorWorker()
|
||||||
{
|
{
|
||||||
// Wait for thread to end
|
// Cancel or wait for the task to end.
|
||||||
if (m_WorkerThread.joinable())
|
m_WorkerThread.CancelOrWait();
|
||||||
m_WorkerThread.join();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void CMapGeneratorWorker::Initialize(const VfsPath& scriptFile, const std::string& settings)
|
void CMapGeneratorWorker::Initialize(const VfsPath& scriptFile, const std::string& settings)
|
||||||
@ -83,35 +82,31 @@ void CMapGeneratorWorker::Initialize(const VfsPath& scriptFile, const std::strin
|
|||||||
m_ScriptPath = scriptFile;
|
m_ScriptPath = scriptFile;
|
||||||
m_Settings = settings;
|
m_Settings = settings;
|
||||||
|
|
||||||
// Launch the worker thread
|
// Start generating the map asynchronously.
|
||||||
m_WorkerThread = std::thread(Threading::HandleExceptions<RunThread>::Wrapper, this);
|
m_WorkerThread = Threading::TaskManager::Instance().PushTask([this]() {
|
||||||
}
|
PROFILE2("Map Generation");
|
||||||
|
|
||||||
void CMapGeneratorWorker::RunThread(CMapGeneratorWorker* self)
|
std::shared_ptr<ScriptContext> mapgenContext = ScriptContext::CreateContext(RMS_CONTEXT_SIZE);
|
||||||
{
|
|
||||||
debug_SetThreadName("MapGenerator");
|
|
||||||
g_Profiler2.RegisterCurrentThread("MapGenerator");
|
|
||||||
|
|
||||||
std::shared_ptr<ScriptContext> mapgenContext = ScriptContext::CreateContext(RMS_CONTEXT_SIZE);
|
// Enable the script to be aborted
|
||||||
|
JS_AddInterruptCallback(mapgenContext->GetGeneralJSContext(), MapGeneratorInterruptCallback);
|
||||||
|
|
||||||
// Enable the script to be aborted
|
m_ScriptInterface = new ScriptInterface("Engine", "MapGenerator", mapgenContext);
|
||||||
JS_AddInterruptCallback(mapgenContext->GetGeneralJSContext(), MapGeneratorInterruptCallback);
|
|
||||||
|
|
||||||
self->m_ScriptInterface = new ScriptInterface("Engine", "MapGenerator", mapgenContext);
|
// Run map generation scripts
|
||||||
|
if (!Run() || m_Progress > 0)
|
||||||
|
{
|
||||||
|
// Don't leave progress in an unknown state, if generator failed, set it to -1
|
||||||
|
std::lock_guard<std::mutex> lock(m_WorkerMutex);
|
||||||
|
m_Progress = -1;
|
||||||
|
}
|
||||||
|
|
||||||
// Run map generation scripts
|
SAFE_DELETE(m_ScriptInterface);
|
||||||
if (!self->Run() || self->m_Progress > 0)
|
|
||||||
{
|
|
||||||
// Don't leave progress in an unknown state, if generator failed, set it to -1
|
|
||||||
std::lock_guard<std::mutex> lock(self->m_WorkerMutex);
|
|
||||||
self->m_Progress = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SAFE_DELETE(self->m_ScriptInterface);
|
// At this point the random map scripts are done running, so the thread has no further purpose
|
||||||
|
// and can die. The data will be stored in m_MapData already if successful, or m_Progress
|
||||||
// At this point the random map scripts are done running, so the thread has no further purpose
|
// will contain an error value on failure.
|
||||||
// and can die. The data will be stored in m_MapData already if successful, or m_Progress
|
});
|
||||||
// will contain an error value on failure.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool CMapGeneratorWorker::Run()
|
bool CMapGeneratorWorker::Run()
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
#define INCLUDED_MAPGENERATOR
|
#define INCLUDED_MAPGENERATOR
|
||||||
|
|
||||||
#include "ps/FileIo.h"
|
#include "ps/FileIo.h"
|
||||||
|
#include "ps/Future.h"
|
||||||
#include "ps/TemplateLoader.h"
|
#include "ps/TemplateLoader.h"
|
||||||
#include "scriptinterface/StructuredClone.h"
|
#include "scriptinterface/StructuredClone.h"
|
||||||
|
|
||||||
@ -26,7 +27,6 @@
|
|||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <set>
|
#include <set>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <thread>
|
|
||||||
|
|
||||||
class CMapGeneratorWorker;
|
class CMapGeneratorWorker;
|
||||||
|
|
||||||
@ -176,11 +176,6 @@ private:
|
|||||||
*/
|
*/
|
||||||
std::vector<std::string> FindActorTemplates(const std::string& path, bool includeSubdirectories);
|
std::vector<std::string> FindActorTemplates(const std::string& path, bool includeSubdirectories);
|
||||||
|
|
||||||
/**
|
|
||||||
* Perform map generation in an independent thread.
|
|
||||||
*/
|
|
||||||
static void RunThread(CMapGeneratorWorker* self);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Perform the map generation.
|
* Perform the map generation.
|
||||||
*/
|
*/
|
||||||
@ -227,9 +222,10 @@ private:
|
|||||||
CTemplateLoader m_TemplateLoader;
|
CTemplateLoader m_TemplateLoader;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Holds the mapgeneration thread identifier.
|
* Holds the completion result of the asynchronous map generation.
|
||||||
|
* TODO: this whole class could really be a future on its own.
|
||||||
*/
|
*/
|
||||||
std::thread m_WorkerThread;
|
Future<void> m_WorkerThread;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Avoids thread synchronization issues.
|
* Avoids thread synchronization issues.
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
/* Copyright (C) 2010 Wildfire Games.
|
/* Copyright (C) 2021 Wildfire Games.
|
||||||
*
|
*
|
||||||
* Permission is hereby granted, free of charge, to any person obtaining
|
* Permission is hereby granted, free of charge, to any person obtaining
|
||||||
* a copy of this software and associated documentation files (the
|
* a copy of this software and associated documentation files (the
|
||||||
@ -28,6 +28,8 @@
|
|||||||
#include "lib/sysdep/sysdep.h"
|
#include "lib/sysdep/sysdep.h"
|
||||||
#include "lib/debug.h"
|
#include "lib/debug.h"
|
||||||
|
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
void* debug_GetCaller(void* UNUSED(context), const wchar_t* UNUSED(lastFuncToSkip))
|
void* debug_GetCaller(void* UNUSED(context), const wchar_t* UNUSED(lastFuncToSkip))
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
@ -43,7 +45,7 @@ Status debug_ResolveSymbol(void* UNUSED(ptr_of_interest), wchar_t* UNUSED(sym_na
|
|||||||
return ERR::NOT_SUPPORTED;
|
return ERR::NOT_SUPPORTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
void debug_SetThreadName(char const* UNUSED(name))
|
void debug_SetThreadName(char const* name)
|
||||||
{
|
{
|
||||||
// Currently unimplemented
|
pthread_setname_np(name);
|
||||||
}
|
}
|
||||||
|
@ -59,6 +59,7 @@ that of Atlas depending on commandline parameters.
|
|||||||
#include "ps/UserReport.h"
|
#include "ps/UserReport.h"
|
||||||
#include "ps/Util.h"
|
#include "ps/Util.h"
|
||||||
#include "ps/VideoMode.h"
|
#include "ps/VideoMode.h"
|
||||||
|
#include "ps/TaskManager.h"
|
||||||
#include "ps/World.h"
|
#include "ps/World.h"
|
||||||
#include "ps/GameSetup/GameSetup.h"
|
#include "ps/GameSetup/GameSetup.h"
|
||||||
#include "ps/GameSetup/Atlas.h"
|
#include "ps/GameSetup/Atlas.h"
|
||||||
@ -578,6 +579,9 @@ static void RunGameOrAtlas(int argc, const char* argv[])
|
|||||||
ScriptEngine scriptEngine;
|
ScriptEngine scriptEngine;
|
||||||
CXeromyces::Startup();
|
CXeromyces::Startup();
|
||||||
|
|
||||||
|
// Initialise the global task manager at this point (JS & Profiler2 are set up).
|
||||||
|
Threading::TaskManager::Initialise();
|
||||||
|
|
||||||
if (ATLAS_RunIfOnCmdLine(args, false))
|
if (ATLAS_RunIfOnCmdLine(args, false))
|
||||||
{
|
{
|
||||||
CXeromyces::Terminate();
|
CXeromyces::Terminate();
|
||||||
@ -704,6 +708,7 @@ static void RunGameOrAtlas(int argc, const char* argv[])
|
|||||||
ATLAS_RunIfOnCmdLine(args, true);
|
ATLAS_RunIfOnCmdLine(args, true);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
Threading::TaskManager::Instance().ClearQueue();
|
||||||
CXeromyces::Terminate();
|
CXeromyces::Terminate();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
326
source/ps/Future.h
Normal file
326
source/ps/Future.h
Normal file
@ -0,0 +1,326 @@
|
|||||||
|
/* Copyright (C) 2021 Wildfire Games.
|
||||||
|
* This file is part of 0 A.D.
|
||||||
|
*
|
||||||
|
* 0 A.D. is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 2 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* 0 A.D. is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with 0 A.D. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef INCLUDED_FUTURE
|
||||||
|
#define INCLUDED_FUTURE
|
||||||
|
|
||||||
|
#include "ps/FutureForward.h"
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <functional>
|
||||||
|
#include <mutex>
|
||||||
|
#include <type_traits>
|
||||||
|
|
||||||
|
template<typename ResultType>
|
||||||
|
class PackagedTask;
|
||||||
|
|
||||||
|
namespace FutureSharedStateDetail
|
||||||
|
{
|
||||||
|
enum class Status
|
||||||
|
{
|
||||||
|
PENDING,
|
||||||
|
STARTED,
|
||||||
|
DONE,
|
||||||
|
CANCELED
|
||||||
|
};
|
||||||
|
|
||||||
|
template<typename ResultType>
|
||||||
|
class SharedStateResult
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
void ResetResult()
|
||||||
|
{
|
||||||
|
if (m_HasResult)
|
||||||
|
m_Result.m_Result.~ResultType();
|
||||||
|
m_HasResult = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
union Result
|
||||||
|
{
|
||||||
|
std::aligned_storage_t<sizeof(ResultType), alignof(ResultType)> m_Bytes;
|
||||||
|
ResultType m_Result;
|
||||||
|
Result() : m_Bytes() {};
|
||||||
|
~Result() {};
|
||||||
|
};
|
||||||
|
// We don't use Result directly so the result doesn't have to be default constructible.
|
||||||
|
Result m_Result;
|
||||||
|
bool m_HasResult = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Don't have m_Result for void ReturnType
|
||||||
|
template<>
|
||||||
|
class SharedStateResult<void>
|
||||||
|
{
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The shared state between futures and packaged state.
|
||||||
|
* Holds all relevant data.
|
||||||
|
*/
|
||||||
|
template<typename ResultType>
|
||||||
|
class SharedState : public SharedStateResult<ResultType>
|
||||||
|
{
|
||||||
|
static constexpr bool VoidResult = std::is_same_v<ResultType, void>;
|
||||||
|
public:
|
||||||
|
SharedState(std::function<ResultType()>&& func) : m_Func(func) {}
|
||||||
|
~SharedState()
|
||||||
|
{
|
||||||
|
// For safety, wait on started task completion, but not on pending ones (auto-cancelled).
|
||||||
|
if (!Cancel())
|
||||||
|
{
|
||||||
|
Wait();
|
||||||
|
Cancel();
|
||||||
|
}
|
||||||
|
if constexpr (!VoidResult)
|
||||||
|
SharedStateResult<ResultType>::ResetResult();
|
||||||
|
}
|
||||||
|
|
||||||
|
SharedState(const SharedState&) = delete;
|
||||||
|
SharedState(SharedState&&) = delete;
|
||||||
|
|
||||||
|
bool IsDoneOrCanceled() const
|
||||||
|
{
|
||||||
|
return m_Status == Status::DONE || m_Status == Status::CANCELED;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Wait()
|
||||||
|
{
|
||||||
|
// Fast path: we're already done.
|
||||||
|
if (IsDoneOrCanceled())
|
||||||
|
return;
|
||||||
|
// Slow path: we aren't done when we run the above check. Lock and wait until we are.
|
||||||
|
std::unique_lock<std::mutex> lock(m_Mutex);
|
||||||
|
m_ConditionVariable.wait(lock, [this]() -> bool { return IsDoneOrCanceled(); });
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the task is pending, cancel it: the status becomes CANCELED and if the task was completed, the result is destroyed.
|
||||||
|
* @return true if the task was indeed cancelled, false otherwise (the task is running or already done).
|
||||||
|
*/
|
||||||
|
bool Cancel()
|
||||||
|
{
|
||||||
|
Status expected = Status::PENDING;
|
||||||
|
bool cancelled = m_Status.compare_exchange_strong(expected, Status::CANCELED);
|
||||||
|
// If we're done, invalidate, if we're pending, atomically cancel, otherwise fail.
|
||||||
|
if (cancelled || m_Status == Status::DONE)
|
||||||
|
{
|
||||||
|
if (m_Status == Status::DONE)
|
||||||
|
m_Status = Status::CANCELED;
|
||||||
|
if constexpr (!VoidResult)
|
||||||
|
SharedStateResult<ResultType>::ResetResult();
|
||||||
|
m_ConditionVariable.notify_all();
|
||||||
|
return cancelled;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Move the result away from the shared state, mark the future invalid.
|
||||||
|
*/
|
||||||
|
template<typename _ResultType = ResultType>
|
||||||
|
std::enable_if_t<!std::is_same_v<_ResultType, void>, ResultType> GetResult()
|
||||||
|
{
|
||||||
|
// The caller must ensure that this is only called if we have a result.
|
||||||
|
ENSURE(SharedStateResult<ResultType>::m_HasResult);
|
||||||
|
m_Status = Status::CANCELED;
|
||||||
|
SharedStateResult<ResultType>::m_HasResult = false;
|
||||||
|
return std::move(SharedStateResult<ResultType>::m_Result.m_Result);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::atomic<Status> m_Status = Status::PENDING;
|
||||||
|
std::mutex m_Mutex;
|
||||||
|
std::condition_variable m_ConditionVariable;
|
||||||
|
|
||||||
|
std::function<ResultType()> m_Func;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace FutureSharedStateDetail
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Corresponds to std::future.
|
||||||
|
* Unlike std::future, Future can request the cancellation of the task that would produce the result.
|
||||||
|
* This makes it more similar to Java's CancellableTask or C#'s Task.
|
||||||
|
* The name Future was kept over Task so it would be more familiar to C++ users,
|
||||||
|
* but this all should be revised once Concurrency TS wraps up.
|
||||||
|
*
|
||||||
|
* Future is _not_ thread-safe. Call it from a single thread or ensure synchronization externally.
|
||||||
|
*
|
||||||
|
* The destructor is never blocking. The promise may still be running on destruction.
|
||||||
|
* TODO:
|
||||||
|
* - Handle exceptions.
|
||||||
|
*/
|
||||||
|
template<typename ResultType>
|
||||||
|
class Future
|
||||||
|
{
|
||||||
|
template<typename T>
|
||||||
|
friend class PackagedTask;
|
||||||
|
|
||||||
|
static constexpr bool VoidResult = std::is_same_v<ResultType, void>;
|
||||||
|
|
||||||
|
using Status = FutureSharedStateDetail::Status;
|
||||||
|
using SharedState = FutureSharedStateDetail::SharedState<ResultType>;
|
||||||
|
public:
|
||||||
|
Future() {};
|
||||||
|
Future(const Future& o) = delete;
|
||||||
|
Future(Future&&) = default;
|
||||||
|
Future& operator=(Future&&) = default;
|
||||||
|
~Future() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make the future wait for the result of @a func.
|
||||||
|
*/
|
||||||
|
template<typename T>
|
||||||
|
PackagedTask<ResultType> Wrap(T&& func);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Move the result out of the future, and invalidate the future.
|
||||||
|
* If the future is not complete, calls Wait().
|
||||||
|
* If the future is canceled, asserts.
|
||||||
|
*/
|
||||||
|
template<typename SfinaeType = ResultType>
|
||||||
|
std::enable_if_t<!std::is_same_v<SfinaeType, void>, ResultType> Get()
|
||||||
|
{
|
||||||
|
ENSURE(!!m_SharedState);
|
||||||
|
|
||||||
|
Wait();
|
||||||
|
if constexpr (VoidResult)
|
||||||
|
return;
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ENSURE(m_SharedState->m_Status != Status::CANCELED);
|
||||||
|
|
||||||
|
// This mark the state invalid - can't call Get again.
|
||||||
|
return m_SharedState->GetResult();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the shared state is valid and has a result (i.e. Get can be called).
|
||||||
|
*/
|
||||||
|
bool IsReady() const
|
||||||
|
{
|
||||||
|
return !!m_SharedState && m_SharedState->m_Status == Status::DONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the future has a shared state and it's not been invalidated, ie. pending, started or done.
|
||||||
|
*/
|
||||||
|
bool Valid() const
|
||||||
|
{
|
||||||
|
return !!m_SharedState && m_SharedState->m_Status != Status::CANCELED;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Wait()
|
||||||
|
{
|
||||||
|
if (Valid())
|
||||||
|
m_SharedState->Wait();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancels the task, waiting if the task is currently started.
|
||||||
|
* Use this function over Cancel() if you need to ensure determinism (i.e. in the simulation).
|
||||||
|
* @see Cancel.
|
||||||
|
*/
|
||||||
|
void CancelOrWait()
|
||||||
|
{
|
||||||
|
if (!Valid())
|
||||||
|
return;
|
||||||
|
if (!m_SharedState->Cancel())
|
||||||
|
m_SharedState->Wait();
|
||||||
|
m_SharedState.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancels the task (without waiting).
|
||||||
|
* The result is always invalid, even if the task had completed before.
|
||||||
|
* Note that this cannot stop started tasks.
|
||||||
|
*/
|
||||||
|
void Cancel()
|
||||||
|
{
|
||||||
|
if (m_SharedState)
|
||||||
|
m_SharedState->Cancel();
|
||||||
|
m_SharedState.reset();
|
||||||
|
}
|
||||||
|
protected:
|
||||||
|
std::shared_ptr<SharedState> m_SharedState;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Corresponds somewhat to std::packaged_task.
|
||||||
|
* Like packaged_task, this holds a function acting as a promise.
|
||||||
|
* This type is mostly just the shared state and the call operator,
|
||||||
|
* handling the promise & continuation logic.
|
||||||
|
*/
|
||||||
|
template<typename ResultType>
|
||||||
|
class PackagedTask
|
||||||
|
{
|
||||||
|
static constexpr bool VoidResult = std::is_same_v<ResultType, void>;
|
||||||
|
public:
|
||||||
|
PackagedTask() = delete;
|
||||||
|
PackagedTask(const std::shared_ptr<typename Future<ResultType>::SharedState>& ss) : m_SharedState(ss) {}
|
||||||
|
|
||||||
|
void operator()()
|
||||||
|
{
|
||||||
|
typename Future<ResultType>::Status expected = Future<ResultType>::Status::PENDING;
|
||||||
|
if (!m_SharedState->m_Status.compare_exchange_strong(expected, Future<ResultType>::Status::STARTED))
|
||||||
|
return;
|
||||||
|
|
||||||
|
if constexpr (VoidResult)
|
||||||
|
m_SharedState->m_Func();
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// To avoid UB, explicitly placement-new the value.
|
||||||
|
new (&m_SharedState->m_Result) ResultType{std::move(m_SharedState->m_Func())};
|
||||||
|
m_SharedState->m_HasResult = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Because we might have threads waiting on us, we need to make sure that they either:
|
||||||
|
// - don't wait on our condition variable
|
||||||
|
// - receive the notification when we're done.
|
||||||
|
// This requires locking the mutex (@see Wait).
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_SharedState->m_Mutex);
|
||||||
|
m_SharedState->m_Status = Future<ResultType>::Status::DONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
m_SharedState->m_ConditionVariable.notify_all();
|
||||||
|
|
||||||
|
// We no longer need the shared state, drop it immediately.
|
||||||
|
m_SharedState.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Cancel()
|
||||||
|
{
|
||||||
|
m_SharedState->Cancel();
|
||||||
|
m_SharedState.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
std::shared_ptr<typename Future<ResultType>::SharedState> m_SharedState;
|
||||||
|
};
|
||||||
|
|
||||||
|
template<typename ResultType>
|
||||||
|
template<typename T>
|
||||||
|
PackagedTask<ResultType> Future<ResultType>::Wrap(T&& func)
|
||||||
|
{
|
||||||
|
static_assert(std::is_convertible_v<std::invoke_result_t<T>, ResultType>, "The return type of the wrapped function cannot be converted to the type of the Future.");
|
||||||
|
m_SharedState = std::make_shared<SharedState>(std::move(func));
|
||||||
|
return PackagedTask<ResultType>(m_SharedState);
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // INCLUDED_FUTURE
|
29
source/ps/FutureForward.h
Normal file
29
source/ps/FutureForward.h
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
/* Copyright (C) 2021 Wildfire Games.
|
||||||
|
* This file is part of 0 A.D.
|
||||||
|
*
|
||||||
|
* 0 A.D. is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 2 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* 0 A.D. is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with 0 A.D. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef INCLUDED_FUTURE_FORWARD
|
||||||
|
#define INCLUDED_FUTURE_FORWARD
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lightweight header / forward declarations for Future.
|
||||||
|
* Include this in your header files where possible.
|
||||||
|
*/
|
||||||
|
|
||||||
|
template<typename Ret>
|
||||||
|
class Future;
|
||||||
|
|
||||||
|
#endif // INCLUDED_FUTURE_FORWARD
|
310
source/ps/TaskManager.cpp
Normal file
310
source/ps/TaskManager.cpp
Normal file
@ -0,0 +1,310 @@
|
|||||||
|
/* Copyright (C) 2021 Wildfire Games.
|
||||||
|
* This file is part of 0 A.D.
|
||||||
|
*
|
||||||
|
* 0 A.D. is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 2 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* 0 A.D. is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with 0 A.D. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "precompiled.h"
|
||||||
|
|
||||||
|
#include "TaskManager.h"
|
||||||
|
|
||||||
|
#include "lib/debug.h"
|
||||||
|
#include "maths/MathUtil.h"
|
||||||
|
#include "ps/CLogger.h"
|
||||||
|
#include "ps/ConfigDB.h"
|
||||||
|
#include "ps/Threading.h"
|
||||||
|
#include "ps/ThreadUtil.h"
|
||||||
|
#include "ps/Profiler2.h"
|
||||||
|
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <deque>
|
||||||
|
#include <functional>
|
||||||
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
|
namespace Threading
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Minimum number of TaskManager workers.
|
||||||
|
*/
|
||||||
|
static constexpr size_t MIN_THREADS = 3;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum number of TaskManager workers.
|
||||||
|
*/
|
||||||
|
static constexpr size_t MAX_THREADS = 32;
|
||||||
|
|
||||||
|
std::unique_ptr<TaskManager> g_TaskManager;
|
||||||
|
|
||||||
|
class Thread;
|
||||||
|
|
||||||
|
using QueueItem = std::function<void()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Light wrapper around std::thread. Ensures Join has been called.
|
||||||
|
*/
|
||||||
|
class Threading::Thread
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
Thread() = default;
|
||||||
|
Thread(const Thread&) = delete;
|
||||||
|
Thread(Thread&&) = delete;
|
||||||
|
|
||||||
|
template<typename T, void(T::* callable)()>
|
||||||
|
void Start(T* object)
|
||||||
|
{
|
||||||
|
m_Thread = std::thread(Threading::HandleExceptions<DoStart<T, callable>>::Wrapper, object);
|
||||||
|
}
|
||||||
|
template<typename T, void(T::* callable)()>
|
||||||
|
static void DoStart(T* object);
|
||||||
|
|
||||||
|
protected:
|
||||||
|
~Thread()
|
||||||
|
{
|
||||||
|
ENSURE(!m_Thread.joinable());
|
||||||
|
}
|
||||||
|
|
||||||
|
std::thread m_Thread;
|
||||||
|
std::atomic<bool> m_Kill = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Worker thread: process the taskManager queues until killed.
|
||||||
|
*/
|
||||||
|
class Threading::WorkerThread : public Thread
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
WorkerThread(Threading::TaskManager::Impl& taskManager);
|
||||||
|
~WorkerThread();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wake the worker.
|
||||||
|
*/
|
||||||
|
void Wake();
|
||||||
|
|
||||||
|
protected:
|
||||||
|
void RunUntilDeath();
|
||||||
|
|
||||||
|
std::mutex m_Mutex;
|
||||||
|
std::condition_variable m_ConditionVariable;
|
||||||
|
|
||||||
|
Threading::TaskManager::Impl& m_TaskManager;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* PImpl-ed implementation of the Task manager.
|
||||||
|
*
|
||||||
|
* The normal priority queue is processed first, the low priority only if there are no higher-priority tasks
|
||||||
|
*/
|
||||||
|
class Threading::TaskManager::Impl
|
||||||
|
{
|
||||||
|
friend class TaskManager;
|
||||||
|
friend class WorkerThread;
|
||||||
|
public:
|
||||||
|
Impl(TaskManager& backref);
|
||||||
|
~Impl()
|
||||||
|
{
|
||||||
|
ClearQueue();
|
||||||
|
m_Workers.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 2-phase init to avoid having to think too hard about the order of class members.
|
||||||
|
*/
|
||||||
|
void SetupWorkers(size_t numberOfWorkers);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Push a task on the global queue.
|
||||||
|
* Takes ownership of @a task.
|
||||||
|
* May be called from any thread.
|
||||||
|
*/
|
||||||
|
void PushTask(std::function<void()>&& task, TaskPriority priority);
|
||||||
|
|
||||||
|
protected:
|
||||||
|
void ClearQueue();
|
||||||
|
|
||||||
|
template<TaskPriority Priority>
|
||||||
|
bool PopTask(std::function<void()>& taskOut);
|
||||||
|
|
||||||
|
// Back reference (keep this first).
|
||||||
|
TaskManager& m_TaskManager;
|
||||||
|
|
||||||
|
std::atomic<bool> m_HasWork;
|
||||||
|
std::atomic<bool> m_HasLowPriorityWork;
|
||||||
|
std::mutex m_GlobalMutex;
|
||||||
|
std::mutex m_GlobalLowPriorityMutex;
|
||||||
|
std::deque<QueueItem> m_GlobalQueue;
|
||||||
|
std::deque<QueueItem> m_GlobalLowPriorityQueue;
|
||||||
|
|
||||||
|
// Ideally this would be a vector, since it does get iterated, but that requires movable types.
|
||||||
|
std::deque<WorkerThread> m_Workers;
|
||||||
|
|
||||||
|
// Round-robin counter for GetWorker.
|
||||||
|
mutable size_t m_RoundRobinIdx = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
Threading::TaskManager::TaskManager() : TaskManager(std::thread::hardware_concurrency() - 1)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
Threading::TaskManager::TaskManager(size_t numberOfWorkers)
|
||||||
|
{
|
||||||
|
m = std::make_unique<Impl>(*this);
|
||||||
|
numberOfWorkers = Clamp<size_t>(numberOfWorkers, MIN_THREADS, MAX_THREADS);
|
||||||
|
m->SetupWorkers(numberOfWorkers);
|
||||||
|
}
|
||||||
|
|
||||||
|
Threading::TaskManager::~TaskManager() {}
|
||||||
|
|
||||||
|
Threading::TaskManager::Impl::Impl(TaskManager& backref)
|
||||||
|
: m_TaskManager(backref)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void Threading::TaskManager::Impl::SetupWorkers(size_t numberOfWorkers)
|
||||||
|
{
|
||||||
|
for (size_t i = 0; i < numberOfWorkers; ++i)
|
||||||
|
m_Workers.emplace_back(*this);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Threading::TaskManager::ClearQueue() { m->ClearQueue(); }
|
||||||
|
void Threading::TaskManager::Impl::ClearQueue()
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_GlobalMutex);
|
||||||
|
m_GlobalQueue.clear();
|
||||||
|
}
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_GlobalLowPriorityMutex);
|
||||||
|
m_GlobalLowPriorityQueue.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t Threading::TaskManager::GetNumberOfWorkers() const
|
||||||
|
{
|
||||||
|
return m->m_Workers.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Threading::TaskManager::DoPushTask(std::function<void()>&& task, TaskPriority priority)
|
||||||
|
{
|
||||||
|
m->PushTask(std::move(task), priority);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Threading::TaskManager::Impl::PushTask(std::function<void()>&& task, TaskPriority priority)
|
||||||
|
{
|
||||||
|
std::mutex& mutex = priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex;
|
||||||
|
std::deque<QueueItem>& queue = priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue;
|
||||||
|
std::atomic<bool>& hasWork = priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork;
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mutex);
|
||||||
|
queue.emplace_back(std::move(task));
|
||||||
|
hasWork = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (WorkerThread& worker : m_Workers)
|
||||||
|
worker.Wake();
|
||||||
|
}
|
||||||
|
|
||||||
|
template<Threading::TaskPriority Priority>
|
||||||
|
bool Threading::TaskManager::Impl::PopTask(std::function<void()>& taskOut)
|
||||||
|
{
|
||||||
|
std::mutex& mutex = Priority == TaskPriority::NORMAL ? m_GlobalMutex : m_GlobalLowPriorityMutex;
|
||||||
|
std::deque<QueueItem>& queue = Priority == TaskPriority::NORMAL ? m_GlobalQueue : m_GlobalLowPriorityQueue;
|
||||||
|
std::atomic<bool>& hasWork = Priority == TaskPriority::NORMAL ? m_HasWork : m_HasLowPriorityWork;
|
||||||
|
|
||||||
|
// Particularly critical section since we're locking the global queue.
|
||||||
|
std::lock_guard<std::mutex> globalLock(mutex);
|
||||||
|
if (!queue.empty())
|
||||||
|
{
|
||||||
|
taskOut = std::move(queue.front());
|
||||||
|
queue.pop_front();
|
||||||
|
hasWork = !queue.empty();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Threading::TaskManager::Initialise()
|
||||||
|
{
|
||||||
|
if (!g_TaskManager)
|
||||||
|
g_TaskManager = std::make_unique<TaskManager>();
|
||||||
|
}
|
||||||
|
|
||||||
|
Threading::TaskManager& Threading::TaskManager::Instance()
|
||||||
|
{
|
||||||
|
ENSURE(g_TaskManager);
|
||||||
|
return *g_TaskManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Thread definition
|
||||||
|
|
||||||
|
Threading::WorkerThread::WorkerThread(Threading::TaskManager::Impl& taskManager)
|
||||||
|
: m_TaskManager(taskManager)
|
||||||
|
{
|
||||||
|
Start<WorkerThread, &WorkerThread::RunUntilDeath>(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
Threading::WorkerThread::~WorkerThread()
|
||||||
|
{
|
||||||
|
m_Kill = true;
|
||||||
|
m_ConditionVariable.notify_one();
|
||||||
|
if (m_Thread.joinable())
|
||||||
|
m_Thread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Threading::WorkerThread::Wake()
|
||||||
|
{
|
||||||
|
m_ConditionVariable.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Threading::WorkerThread::RunUntilDeath()
|
||||||
|
{
|
||||||
|
// The profiler does better if the names are unique.
|
||||||
|
static std::atomic<int> n = 0;
|
||||||
|
std::string name = "Task Mgr #" + std::to_string(n++);
|
||||||
|
debug_SetThreadName(name.c_str());
|
||||||
|
g_Profiler2.RegisterCurrentThread(name);
|
||||||
|
|
||||||
|
|
||||||
|
std::function<void()> task;
|
||||||
|
bool hasTask = false;
|
||||||
|
std::unique_lock<std::mutex> lock(m_Mutex, std::defer_lock);
|
||||||
|
while (!m_Kill)
|
||||||
|
{
|
||||||
|
lock.lock();
|
||||||
|
m_ConditionVariable.wait(lock, [this](){
|
||||||
|
return m_Kill || m_TaskManager.m_HasWork || m_TaskManager.m_HasLowPriorityWork;
|
||||||
|
});
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
|
if (m_Kill)
|
||||||
|
break;
|
||||||
|
|
||||||
|
// Fetch work from the global queues.
|
||||||
|
hasTask = m_TaskManager.PopTask<TaskPriority::NORMAL>(task);
|
||||||
|
if (!hasTask)
|
||||||
|
hasTask = m_TaskManager.PopTask<TaskPriority::LOW>(task);
|
||||||
|
if (hasTask)
|
||||||
|
task();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Defined here - needs access to derived types.
|
||||||
|
template<typename T, void(T::* callable)()>
|
||||||
|
void Threading::Thread::DoStart(T* object)
|
||||||
|
{
|
||||||
|
std::invoke(callable, object);
|
||||||
|
}
|
90
source/ps/TaskManager.h
Normal file
90
source/ps/TaskManager.h
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
/* Copyright (C) 2021 Wildfire Games.
|
||||||
|
* This file is part of 0 A.D.
|
||||||
|
*
|
||||||
|
* 0 A.D. is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 2 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* 0 A.D. is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with 0 A.D. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef INCLUDED_THREADING_TASKMANAGER
|
||||||
|
#define INCLUDED_THREADING_TASKMANAGER
|
||||||
|
|
||||||
|
#include "ps/Future.h"
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
class TestTaskManager;
|
||||||
|
class CConfigDB;
|
||||||
|
|
||||||
|
namespace Threading
|
||||||
|
{
|
||||||
|
class TaskManager;
|
||||||
|
class WorkerThread;
|
||||||
|
|
||||||
|
enum class TaskPriority
|
||||||
|
{
|
||||||
|
NORMAL,
|
||||||
|
LOW
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The task manager creates all worker threads on initialisation,
|
||||||
|
* and manages the task queues.
|
||||||
|
* See implementation for additional comments.
|
||||||
|
*/
|
||||||
|
class TaskManager
|
||||||
|
{
|
||||||
|
friend class WorkerThread;
|
||||||
|
public:
|
||||||
|
TaskManager();
|
||||||
|
~TaskManager();
|
||||||
|
TaskManager(const TaskManager&) = delete;
|
||||||
|
TaskManager(TaskManager&&) = delete;
|
||||||
|
TaskManager& operator=(const TaskManager&) = delete;
|
||||||
|
TaskManager& operator=(TaskManager&&) = delete;
|
||||||
|
|
||||||
|
static void Initialise();
|
||||||
|
static TaskManager& Instance();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clears all tasks from the queue. This blocks on started tasks.
|
||||||
|
*/
|
||||||
|
void ClearQueue();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the number of threaded workers.
|
||||||
|
*/
|
||||||
|
size_t GetNumberOfWorkers() const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Push a task to be executed.
|
||||||
|
*/
|
||||||
|
template<typename T>
|
||||||
|
Future<std::invoke_result_t<T>> PushTask(T&& func, TaskPriority priority = TaskPriority::NORMAL)
|
||||||
|
{
|
||||||
|
Future<std::invoke_result_t<T>> ret;
|
||||||
|
DoPushTask(std::move(ret.Wrap(std::move(func))), priority);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
TaskManager(size_t numberOfWorkers);
|
||||||
|
|
||||||
|
void DoPushTask(std::function<void()>&& task, TaskPriority priority);
|
||||||
|
|
||||||
|
class Impl;
|
||||||
|
std::unique_ptr<Impl> m;
|
||||||
|
};
|
||||||
|
} // namespace Threading
|
||||||
|
|
||||||
|
#endif // INCLUDED_THREADING_TASKMANAGER
|
136
source/ps/tests/test_Future.h
Normal file
136
source/ps/tests/test_Future.h
Normal file
@ -0,0 +1,136 @@
|
|||||||
|
/* Copyright (C) 2021 Wildfire Games.
|
||||||
|
* This file is part of 0 A.D.
|
||||||
|
*
|
||||||
|
* 0 A.D. is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 2 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* 0 A.D. is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with 0 A.D. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "lib/self_test.h"
|
||||||
|
|
||||||
|
#include "ps/Future.h"
|
||||||
|
|
||||||
|
#include <functional>
|
||||||
|
#include <type_traits>
|
||||||
|
|
||||||
|
class TestFuture : public CxxTest::TestSuite
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
void test_future_basic()
|
||||||
|
{
|
||||||
|
int counter = 0;
|
||||||
|
{
|
||||||
|
Future<void> noret;
|
||||||
|
std::function<void()> task = noret.Wrap([&counter]() mutable { counter++; });
|
||||||
|
task();
|
||||||
|
TS_ASSERT_EQUALS(counter, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
Future<void> noret;
|
||||||
|
{
|
||||||
|
std::function<void()> task = noret.Wrap([&counter]() mutable { counter++; });
|
||||||
|
// Auto-cancels the task.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
TS_ASSERT_EQUALS(counter, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test_future_return()
|
||||||
|
{
|
||||||
|
{
|
||||||
|
Future<int> future;
|
||||||
|
std::function<void()> task = future.Wrap([]() { return 1; });
|
||||||
|
task();
|
||||||
|
TS_ASSERT_EQUALS(future.Get(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convertible type.
|
||||||
|
{
|
||||||
|
Future<int> future;
|
||||||
|
std::function<void()> task = future.Wrap([]() -> u8 { return 1; });
|
||||||
|
task();
|
||||||
|
TS_ASSERT_EQUALS(future.Get(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int destroyed = 0;
|
||||||
|
// No trivial constructor or destructor. Also not copiable.
|
||||||
|
struct NonDef
|
||||||
|
{
|
||||||
|
NonDef() = delete;
|
||||||
|
NonDef(int input) : value(input) {};
|
||||||
|
NonDef(const NonDef&) = delete;
|
||||||
|
NonDef(NonDef&& o)
|
||||||
|
{
|
||||||
|
value = o.value;
|
||||||
|
o.value = 0;
|
||||||
|
}
|
||||||
|
~NonDef() { if (value != 0) destroyed++; }
|
||||||
|
int value = 0;
|
||||||
|
};
|
||||||
|
TS_ASSERT_EQUALS(destroyed, 0);
|
||||||
|
{
|
||||||
|
Future<NonDef> future;
|
||||||
|
std::function<void()> task = future.Wrap([]() { return 1; });
|
||||||
|
task();
|
||||||
|
TS_ASSERT_EQUALS(future.Get().value, 1);
|
||||||
|
}
|
||||||
|
TS_ASSERT_EQUALS(destroyed, 1);
|
||||||
|
{
|
||||||
|
Future<NonDef> future;
|
||||||
|
std::function<void()> task = future.Wrap([]() { return 1; });
|
||||||
|
}
|
||||||
|
TS_ASSERT_EQUALS(destroyed, 1);
|
||||||
|
/**
|
||||||
|
* TODO: find a way to test this
|
||||||
|
{
|
||||||
|
Future<NonDef> future;
|
||||||
|
std::function<void()> task = future.Wrap([]() { return 1; });
|
||||||
|
future.Cancel();
|
||||||
|
future.Wait();
|
||||||
|
TS_ASSERT_THROWS(future.Get(), const Future<NonDef>::BadFutureAccess&);
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
TS_ASSERT_EQUALS(destroyed, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test_future_moving()
|
||||||
|
{
|
||||||
|
Future<int> future;
|
||||||
|
std::function<int()> function;
|
||||||
|
|
||||||
|
// Set things up so all temporaries passed into the futures will be reset to obviously invalid memory.
|
||||||
|
std::aligned_storage_t<sizeof(Future<int>), alignof(Future<int>)> futureStorage;
|
||||||
|
std::aligned_storage_t<sizeof(std::function<int()>), alignof(std::function<int()>)> functionStorage;
|
||||||
|
Future<int>* f = &future; // CppCheck doesn't read placement new correctly, do this to silence errors.
|
||||||
|
std::function<int()>* c = &function;
|
||||||
|
f = new (&futureStorage) Future<int>{};
|
||||||
|
c = new (&functionStorage) std::function<int()>{};
|
||||||
|
|
||||||
|
*c = []() { return 7; };
|
||||||
|
std::function<void()> task = f->Wrap(std::move(*c));
|
||||||
|
|
||||||
|
future = std::move(*f);
|
||||||
|
function = std::move(*c);
|
||||||
|
|
||||||
|
// Destroy and clear the memory
|
||||||
|
f->~Future();
|
||||||
|
c->~function();
|
||||||
|
memset(&futureStorage, 0xFF, sizeof(decltype(futureStorage)));
|
||||||
|
memset(&functionStorage, 0xFF, sizeof(decltype(functionStorage)));
|
||||||
|
|
||||||
|
// Let's move the packaged task while at it.
|
||||||
|
std::function<void()> task2 = std::move(task);
|
||||||
|
task2();
|
||||||
|
TS_ASSERT_EQUALS(future.Get(), 7);
|
||||||
|
}
|
||||||
|
};
|
118
source/ps/tests/test_TaskManager.h
Normal file
118
source/ps/tests/test_TaskManager.h
Normal file
@ -0,0 +1,118 @@
|
|||||||
|
/* Copyright (C) 2021 Wildfire Games.
|
||||||
|
* This file is part of 0 A.D.
|
||||||
|
*
|
||||||
|
* 0 A.D. is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 2 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* 0 A.D. is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with 0 A.D. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "lib/self_test.h"
|
||||||
|
|
||||||
|
#include "ps/Future.h"
|
||||||
|
#include "ps/TaskManager.h"
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
|
class TestTaskManager : public CxxTest::TestSuite
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
void test_basic()
|
||||||
|
{
|
||||||
|
Threading::TaskManager& taskManager = Threading::TaskManager::Instance();
|
||||||
|
// There is a minimum of 3.
|
||||||
|
TS_ASSERT(taskManager.GetNumberOfWorkers() >= 3);
|
||||||
|
|
||||||
|
std::atomic<int> tasks_run = 0;
|
||||||
|
auto increment_run = [&tasks_run]() { tasks_run++; };
|
||||||
|
Future future = taskManager.PushTask(increment_run);
|
||||||
|
future.Wait();
|
||||||
|
TS_ASSERT_EQUALS(tasks_run.load(), 1);
|
||||||
|
|
||||||
|
// Test Execute.
|
||||||
|
std::condition_variable cv;
|
||||||
|
std::mutex mutex;
|
||||||
|
std::atomic<bool> go = false;
|
||||||
|
future = taskManager.PushTask([&]() {
|
||||||
|
std::unique_lock<std::mutex> lock(mutex);
|
||||||
|
cv.wait(lock, [&go]() -> bool { return go; });
|
||||||
|
lock.unlock();
|
||||||
|
increment_run();
|
||||||
|
lock.lock();
|
||||||
|
go = false;
|
||||||
|
lock.unlock();
|
||||||
|
cv.notify_all();
|
||||||
|
});
|
||||||
|
TS_ASSERT_EQUALS(tasks_run.load(), 1);
|
||||||
|
std::unique_lock<std::mutex> lock(mutex);
|
||||||
|
go = true;
|
||||||
|
lock.unlock();
|
||||||
|
cv.notify_all();
|
||||||
|
lock.lock();
|
||||||
|
cv.wait(lock, [&go]() -> bool { return !go; });
|
||||||
|
TS_ASSERT_EQUALS(tasks_run.load(), 2);
|
||||||
|
// Wait on the future before the mutex/cv go out of scope.
|
||||||
|
future.Wait();
|
||||||
|
}
|
||||||
|
|
||||||
|
void test_Priority()
|
||||||
|
{
|
||||||
|
Threading::TaskManager& taskManager = Threading::TaskManager::Instance();
|
||||||
|
std::atomic<int> tasks_run = 0;
|
||||||
|
// Push general tasks
|
||||||
|
auto increment_run = [&tasks_run]() { tasks_run++; };
|
||||||
|
Future future = taskManager.PushTask(increment_run);
|
||||||
|
Future futureLow = taskManager.PushTask(increment_run, Threading::TaskPriority::LOW);
|
||||||
|
future.Wait();
|
||||||
|
futureLow.Wait();
|
||||||
|
TS_ASSERT_EQUALS(tasks_run.load(), 2);
|
||||||
|
// Also check with no waiting expected.
|
||||||
|
taskManager.PushTask(increment_run).Wait();
|
||||||
|
TS_ASSERT_EQUALS(tasks_run.load(), 3);
|
||||||
|
taskManager.PushTask(increment_run, Threading::TaskPriority::LOW).Wait();
|
||||||
|
TS_ASSERT_EQUALS(tasks_run.load(), 4);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test_Load()
|
||||||
|
{
|
||||||
|
Threading::TaskManager& taskManager = Threading::TaskManager::Instance();
|
||||||
|
|
||||||
|
#define ITERATIONS 100000
|
||||||
|
std::vector<Future<int>> futures;
|
||||||
|
futures.resize(ITERATIONS);
|
||||||
|
std::vector<u32> values(ITERATIONS);
|
||||||
|
|
||||||
|
auto f1 = taskManager.PushTask([&taskManager, &futures]() {
|
||||||
|
for (u32 i = 0; i < ITERATIONS; i+=3)
|
||||||
|
futures[i] = taskManager.PushTask([]() { return 5; });
|
||||||
|
});
|
||||||
|
|
||||||
|
auto f2 = taskManager.PushTask([&taskManager, &futures]() {
|
||||||
|
for (u32 i = 1; i < ITERATIONS; i+=3)
|
||||||
|
futures[i] = taskManager.PushTask([]() { return 5; }, Threading::TaskPriority::LOW);
|
||||||
|
});
|
||||||
|
|
||||||
|
auto f3 = taskManager.PushTask([&taskManager, &futures]() {
|
||||||
|
for (u32 i = 2; i < ITERATIONS; i+=3)
|
||||||
|
futures[i] = taskManager.PushTask([]() { return 5; });
|
||||||
|
});
|
||||||
|
|
||||||
|
f1.Wait();
|
||||||
|
f2.Wait();
|
||||||
|
f3.Wait();
|
||||||
|
|
||||||
|
for (size_t i = 0; i < ITERATIONS; ++i)
|
||||||
|
TS_ASSERT_EQUALS(futures[i].Get(), 5);
|
||||||
|
#undef ITERATIONS
|
||||||
|
}
|
||||||
|
};
|
@ -36,6 +36,7 @@
|
|||||||
#include "lib/timer.h"
|
#include "lib/timer.h"
|
||||||
#include "lib/sysdep/sysdep.h"
|
#include "lib/sysdep/sysdep.h"
|
||||||
#include "ps/Profiler2.h"
|
#include "ps/Profiler2.h"
|
||||||
|
#include "ps/TaskManager.h"
|
||||||
#include "scriptinterface/FunctionWrapper.h"
|
#include "scriptinterface/FunctionWrapper.h"
|
||||||
#include "scriptinterface/ScriptEngine.h"
|
#include "scriptinterface/ScriptEngine.h"
|
||||||
#include "scriptinterface/ScriptContext.h"
|
#include "scriptinterface/ScriptContext.h"
|
||||||
@ -84,11 +85,14 @@ class MiscSetup : public CxxTest::GlobalFixture
|
|||||||
m_ScriptEngine = new ScriptEngine;
|
m_ScriptEngine = new ScriptEngine;
|
||||||
g_ScriptContext = ScriptContext::CreateContext();
|
g_ScriptContext = ScriptContext::CreateContext();
|
||||||
|
|
||||||
|
Threading::TaskManager::Initialise();
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual bool tearDownWorld()
|
virtual bool tearDownWorld()
|
||||||
{
|
{
|
||||||
|
Threading::TaskManager::Instance().ClearQueue();
|
||||||
g_ScriptContext.reset();
|
g_ScriptContext.reset();
|
||||||
SAFE_DELETE(m_ScriptEngine);
|
SAFE_DELETE(m_ScriptEngine);
|
||||||
g_Profiler2.Shutdown();
|
g_Profiler2.Shutdown();
|
||||||
|
Loading…
Reference in New Issue
Block a user