1
0
forked from 0ad/0ad

Rework the pathfinder path computation setup for threading.

Essentially reverts D1918 / d592bf9cb6.
Instead of copying path requests to workers, setup the result vector,
then setup an index, and compute 'in-place'.
To send messages, the result vectors are read in order. This makes the
order trivially constant no matter how many workers there are, and the
architecture overall makes it much easier to efficiently paralellise.

Tested by: Langbart, Stan
Differential Revision: https://code.wildfiregames.com/D3849
This was SVN commit r25256.
This commit is contained in:
wraitii 2021-04-14 07:23:47 +00:00
parent 990265d974
commit 0308c2390a
4 changed files with 105 additions and 164 deletions

View File

@ -520,7 +520,7 @@ void CSimulation2Impl::UpdateComponents(CSimContext& simContext, fixed turnLengt
CmpPtr<ICmpPathfinder> cmpPathfinder(simContext, SYSTEM_ENTITY);
if (cmpPathfinder)
cmpPathfinder->FetchAsyncResultsAndSendMessages();
cmpPathfinder->SendRequestedPaths();
{
PROFILE2("Sim - Update Start");
@ -541,7 +541,7 @@ void CSimulation2Impl::UpdateComponents(CSimContext& simContext, fixed turnLengt
if (cmpPathfinder)
{
cmpPathfinder->StartProcessingMoves(true);
cmpPathfinder->FetchAsyncResultsAndSendMessages();
cmpPathfinder->SendRequestedPaths();
}
// Send all the update phases
{
@ -559,7 +559,7 @@ void CSimulation2Impl::UpdateComponents(CSimContext& simContext, fixed turnLengt
if (cmpPathfinder)
{
cmpPathfinder->StartProcessingMoves(true);
cmpPathfinder->FetchAsyncResultsAndSendMessages();
cmpPathfinder->SendRequestedPaths();
}
{

View File

@ -43,6 +43,8 @@
#include "ps/XML/Xeromyces.h"
#include "renderer/Scene.h"
#include <type_traits>
REGISTER_COMPONENT_TYPE(Pathfinder)
void CCmpPathfinder::Init(const CParamNode& UNUSED(paramNode))
@ -70,21 +72,13 @@ void CCmpPathfinder::Init(const CParamNode& UNUSED(paramNode))
CParamNode externalParamNode;
CParamNode::LoadXML(externalParamNode, L"simulation/data/pathfinder.xml", "pathfinder");
// Previously all move commands during a turn were
// queued up and processed asynchronously at the start
// of the next turn. Now we are processing queued up
// events several times duing the turn. This improves
// responsiveness and units move more smoothly especially.
// when in formation. There is still a call at the
// beginning of a turn to process all outstanding moves -
// this will handle any moves above the MaxSameTurnMoves
// threshold.
//
// TODO - The moves processed at the beginning of the
// turn do not count against the maximum moves per turn
// currently. The thinking is that this will eventually
// happen in another thread. Either way this probably
// will require some adjustment and rethinking.
// Paths are computed:
// - Before MT_Update
// - Before MT_MotionUnitFormation
// - 'in-between' turns (effectively at the start until threading is implemented).
// The latter of these must compute all outstanding requests, but the former two are capped
// to avoid spending too much time there (since the latter are designed to be threaded and thus not block the GUI).
// This loads that maximum number (note that it's per computation call, not per turn for now).
const CParamNode pathingSettings = externalParamNode.GetChild("Pathfinder");
m_MaxSameTurnMoves = (u16)pathingSettings.GetChild("MaxSameTurnMoves").ToInt();
@ -98,16 +92,12 @@ void CCmpPathfinder::Init(const CParamNode& UNUSED(paramNode))
m_PassClasses.push_back(PathfinderPassability(mask, it->second));
m_PassClassMasks[name] = mask;
}
m_Workers.emplace_back(PathfinderWorker{});
}
CCmpPathfinder::~CCmpPathfinder() {};
void CCmpPathfinder::Deinit()
{
m_Workers.clear();
SetDebugOverlay(false); // cleans up memory
SAFE_DELETE(m_AtlasOverlay);
@ -152,8 +142,8 @@ struct SerializeHelper<ShortPathRequest>
template<typename S>
void CCmpPathfinder::SerializeCommon(S& serialize)
{
Serializer(serialize, "long requests", m_LongPathRequests);
Serializer(serialize, "short requests", m_ShortPathRequests);
Serializer(serialize, "long requests", m_LongPathRequests.m_Requests);
Serializer(serialize, "short requests", m_ShortPathRequests.m_Requests);
serialize.NumberU32_Unbounded("next ticket", m_NextAsyncTicket);
serialize.NumberU16_Unbounded("map size", m_MapSize);
}
@ -195,7 +185,7 @@ void CCmpPathfinder::HandleMessage(const CMessage& msg, bool UNUSED(global))
case MT_Deserialized:
UpdateGrid();
// In case we were serialised with requests pending, we need to process them.
if (!m_ShortPathRequests.empty() || !m_LongPathRequests.empty())
if (!m_ShortPathRequests.m_Requests.empty() || !m_LongPathRequests.m_Requests.empty())
{
ENSURE(CmpPtr<ICmpObstructionManager>(GetSystemEntity()));
StartProcessingMoves(false);
@ -734,52 +724,10 @@ void CCmpPathfinder::TerrainUpdateHelper(bool expandPassability, int itile0, int
//////////////////////////////////////////////////////////
// Async pathfinder workers
CCmpPathfinder::PathfinderWorker::PathfinderWorker() {}
template<typename T>
void CCmpPathfinder::PathfinderWorker::PushRequests(std::vector<T>&, ssize_t)
{
static_assert(sizeof(T) == 0, "Only specializations can be used");
}
template<> void CCmpPathfinder::PathfinderWorker::PushRequests(std::vector<LongPathRequest>& from, ssize_t amount)
{
m_LongRequests.insert(m_LongRequests.end(), std::make_move_iterator(from.end() - amount), std::make_move_iterator(from.end()));
}
template<> void CCmpPathfinder::PathfinderWorker::PushRequests(std::vector<ShortPathRequest>& from, ssize_t amount)
{
m_ShortRequests.insert(m_ShortRequests.end(), std::make_move_iterator(from.end() - amount), std::make_move_iterator(from.end()));
}
void CCmpPathfinder::PathfinderWorker::Work(const CCmpPathfinder& pathfinder)
{
while (!m_LongRequests.empty())
{
const LongPathRequest& req = m_LongRequests.back();
WaypointPath path;
pathfinder.m_LongPathfinder->ComputePath(*pathfinder.m_PathfinderHier, req.x0, req.z0, req.goal, req.passClass, path);
m_Results.emplace_back(req.ticket, req.notify, path);
m_LongRequests.pop_back();
}
while (!m_ShortRequests.empty())
{
const ShortPathRequest& req = m_ShortRequests.back();
WaypointPath path = pathfinder.m_VertexPathfinder->ComputeShortPath(req, CmpPtr<ICmpObstructionManager>(pathfinder.GetSystemEntity()));
m_Results.emplace_back(req.ticket, req.notify, path);
m_ShortRequests.pop_back();
}
}
u32 CCmpPathfinder::ComputePathAsync(entity_pos_t x0, entity_pos_t z0, const PathGoal& goal, pass_class_t passClass, entity_id_t notify)
{
LongPathRequest req = { m_NextAsyncTicket++, x0, z0, goal, passClass, notify };
m_LongPathRequests.push_back(req);
m_LongPathRequests.m_Requests.push_back(req);
return req.ticket;
}
@ -788,7 +736,7 @@ u32 CCmpPathfinder::ComputeShortPathAsync(entity_pos_t x0, entity_pos_t z0, enti
entity_id_t group, entity_id_t notify)
{
ShortPathRequest req = { m_NextAsyncTicket++, x0, z0, clearance, range, goal, passClass, avoidMovingUnits, group, notify };
m_ShortPathRequests.push_back(req);
m_ShortPathRequests.m_Requests.push_back(req);
return req.ticket;
}
@ -802,79 +750,65 @@ WaypointPath CCmpPathfinder::ComputeShortPathImmediate(const ShortPathRequest& r
return m_VertexPathfinder->ComputeShortPath(request, CmpPtr<ICmpObstructionManager>(GetSystemEntity()));
}
void CCmpPathfinder::FetchAsyncResultsAndSendMessages()
template<typename T>
template<typename U>
void CCmpPathfinder::PathRequests<T>::Compute(const CCmpPathfinder& cmpPathfinder, const U& pathfinder)
{
PROFILE2("FetchAsyncResults");
// We may now clear existing requests.
m_ShortPathRequests.clear();
m_LongPathRequests.clear();
// WARNING: the order in which moves are pulled must be consistent when using 1 or n workers.
// We fetch in the same order we inserted in, but we push moves backwards, so this works.
std::vector<PathResult> results;
for (PathfinderWorker& worker : m_Workers)
static_assert((std::is_same_v<T, LongPathRequest> && std::is_same_v<U, LongPathfinder>) ||
(std::is_same_v<T, ShortPathRequest> && std::is_same_v<U, VertexPathfinder>));
size_t maxN = m_Results.size();
size_t startIndex = m_Requests.size() - m_Results.size();
do
{
results.insert(results.end(), std::make_move_iterator(worker.m_Results.begin()), std::make_move_iterator(worker.m_Results.end()));
worker.m_Results.clear();
size_t workIndex = m_NextPathToCompute++;
if (workIndex >= maxN)
break;
const T& req = m_Requests[startIndex + workIndex];
PathResult& result = m_Results[workIndex];
result.ticket = req.ticket;
result.notify = req.notify;
if constexpr (std::is_same_v<T, LongPathRequest>)
pathfinder.ComputePath(*cmpPathfinder.m_PathfinderHier, req.x0, req.z0, req.goal, req.passClass, result.path);
else
result.path = pathfinder.ComputeShortPath(req, CmpPtr<ICmpObstructionManager>(cmpPathfinder.GetSystemEntity()));
if (workIndex == maxN - 1)
m_ComputeDone = true;
}
while (true);
}
void CCmpPathfinder::SendRequestedPaths()
{
PROFILE2("SendRequestedPaths");
if (!m_LongPathRequests.m_ComputeDone || !m_ShortPathRequests.m_ComputeDone)
{
m_ShortPathRequests.Compute(*this, *m_VertexPathfinder);
m_LongPathRequests.Compute(*this, *m_LongPathfinder);
}
{
PROFILE2("PostMessages");
for (PathResult& path : results)
for (PathResult& path : m_ShortPathRequests.m_Results)
{
CMessagePathResult msg(path.ticket, path.path);
GetSimContext().GetComponentManager().PostMessage(path.notify, msg);
}
for (PathResult& path : m_LongPathRequests.m_Results)
{
CMessagePathResult msg(path.ticket, path.path);
GetSimContext().GetComponentManager().PostMessage(path.notify, msg);
}
}
m_ShortPathRequests.ClearComputed();
m_LongPathRequests.ClearComputed();
}
void CCmpPathfinder::StartProcessingMoves(bool useMax)
{
std::vector<LongPathRequest> longRequests = GetMovesToProcess(m_LongPathRequests, useMax, m_MaxSameTurnMoves);
std::vector<ShortPathRequest> shortRequests = GetMovesToProcess(m_ShortPathRequests, useMax, m_MaxSameTurnMoves - longRequests.size());
PushRequestsToWorkers(longRequests);
PushRequestsToWorkers(shortRequests);
for (PathfinderWorker& worker : m_Workers)
worker.Work(*this);
}
template <typename T>
std::vector<T> CCmpPathfinder::GetMovesToProcess(std::vector<T>& requests, bool useMax, size_t maxMoves)
{
// Keep the original requests in which we need to serialize.
std::vector<T> copiedRequests;
if (useMax)
{
size_t amount = std::min(requests.size(), maxMoves);
if (amount > 0)
copiedRequests.insert(copiedRequests.begin(), requests.end() - amount, requests.end());
}
else
copiedRequests = requests;
return copiedRequests;
}
template <typename T>
void CCmpPathfinder::PushRequestsToWorkers(std::vector<T>& from)
{
if (from.empty())
return;
// Trivial load-balancing, / rounds towards zero so add 1 to ensure we do push all requests.
size_t amount = from.size() / m_Workers.size() + 1;
// WARNING: the order in which moves are pushed must be consistent when using 1 or n workers.
// In this instance, work is distributed in a strict LIFO order, effectively reversing tickets.
for (PathfinderWorker& worker : m_Workers)
{
amount = std::min(amount, from.size()); // Since we are rounding up before, ensure we aren't pushing beyond the end.
worker.PushRequests(from, amount);
from.erase(from.end() - amount, from.end());
}
m_ShortPathRequests.PrepareForComputation(useMax ? m_MaxSameTurnMoves : 0);
m_LongPathRequests.PrepareForComputation(useMax ? m_MaxSameTurnMoves : 0);
}
//////////////////////////////////////////////////////////

View File

@ -1,4 +1,4 @@
/* Copyright (C) 2020 Wildfire Games.
/* Copyright (C) 2021 Wildfire Games.
* This file is part of 0 A.D.
*
* 0 A.D. is free software: you can redistribute it and/or modify
@ -39,6 +39,7 @@
#include "simulation2/components/ICmpObstructionManager.h"
#include "simulation2/helpers/Grid.h"
#include <vector>
class HierarchicalPathfinder;
class LongPathfinder;
@ -58,33 +59,6 @@ class AtlasOverlay;
*/
class CCmpPathfinder final : public ICmpPathfinder
{
protected:
class PathfinderWorker
{
friend CCmpPathfinder;
public:
PathfinderWorker();
// Process path requests, checking if we should stop before each new one.
void Work(const CCmpPathfinder& pathfinder);
private:
// Insert requests in m_[Long/Short]Requests depending on from.
// This could be removed when we may use if-constexpr in CCmpPathfinder::PushRequestsToWorkers
template<typename T>
void PushRequests(std::vector<T>& from, ssize_t amount);
// Stores our results, the main thread will fetch this.
std::vector<PathResult> m_Results;
std::vector<LongPathRequest> m_LongRequests;
std::vector<ShortPathRequest> m_ShortRequests;
};
// Allow the workers to access our private variables
friend class PathfinderWorker;
public:
static void ClassInit(CComponentManager& componentManager)
{
@ -103,14 +77,10 @@ public:
std::map<std::string, pass_class_t> m_PassClassMasks;
std::vector<PathfinderPassability> m_PassClasses;
u16 m_MaxSameTurnMoves; // Compute only this many paths when useMax is true in StartProcessingMoves.
// Dynamic state:
std::vector<LongPathRequest> m_LongPathRequests;
std::vector<ShortPathRequest> m_ShortPathRequests;
u32 m_NextAsyncTicket; // Unique IDs for asynchronous path requests.
u16 m_MaxSameTurnMoves; // Compute only this many paths when useMax is true in StartProcessingMoves.
// Lazily-constructed dynamic state (not serialized):
u16 m_MapSize; // tiles per side
@ -128,8 +98,45 @@ public:
std::unique_ptr<HierarchicalPathfinder> m_PathfinderHier;
std::unique_ptr<LongPathfinder> m_LongPathfinder;
// Workers process pathing requests.
std::vector<PathfinderWorker> m_Workers;
template<typename T>
class PathRequests {
public:
std::vector<T> m_Requests;
std::vector<PathResult> m_Results;
// This is the array index of the next path to compute.
size_t m_NextPathToCompute = 0;
// This is false until all scheduled paths have been computed.
bool m_ComputeDone = true;
void ClearComputed()
{
if (m_Results.size() == m_Requests.size())
m_Requests.clear();
else
m_Requests.erase(m_Requests.end() - m_Results.size(), m_Requests.end());
m_Results.clear();
}
/**
* @param max - if non-zero, how many paths to process.
*/
void PrepareForComputation(u16 max)
{
size_t n = m_Requests.size();
if (max && n > max)
n = max;
m_NextPathToCompute = 0;
m_Results.resize(n);
m_ComputeDone = n == 0;
}
template<typename U>
void Compute(const CCmpPathfinder& cmpPathfinder, const U& pathfinder);
};
PathRequests<LongPathRequest> m_LongPathRequests;
PathRequests<ShortPathRequest> m_ShortPathRequests;
u32 m_NextAsyncTicket; // Unique IDs for asynchronous path requests.
AtlasOverlay* m_AtlasOverlay;
@ -222,7 +229,7 @@ public:
virtual ICmpObstruction::EFoundationCheck CheckBuildingPlacement(const IObstructionTestFilter& filter, entity_pos_t x, entity_pos_t z, entity_pos_t a, entity_pos_t w, entity_pos_t h, entity_id_t id, pass_class_t passClass, bool onlyCenterPoint) const;
virtual void FetchAsyncResultsAndSendMessages();
virtual void SendRequestedPaths();
virtual void StartProcessingMoves(bool useMax);

View File

@ -185,7 +185,7 @@ public:
/**
* Finish computing asynchronous path requests and send the CMessagePathResult messages.
*/
virtual void FetchAsyncResultsAndSendMessages() = 0;
virtual void SendRequestedPaths() = 0;
/**
* Tell asynchronous pathfinder threads that they can begin computing paths.