1
0
forked from 0ad/0ad

Thread the NetClient session.

This threads the netclient session, which avoids timeouts when the
main-thread is not polling (map creation, very long simulation frames).

Unlike the NetServer, which should be as independent as possible from
the main thread, the NetClient is fundamentally tied to the game thread.
Therefore, this only threads the session object.
To ensure good performance and ease-of-use, lock-free queues for
in/out-going messages are used.

This fixes artificial timeouts, while also improving actual ping reports
(since frame-time is no longer a factor).
It effectively reverts D1513/eda236522c and 2e7e1c0b2b, all hacks around
lag-timeouts (and bits of 1a3fb29ff3).

Based on a patch by: Stan
Comments by: Vladislavbelov
Fixes #3700, refs #3373

Differential Revision: https://code.wildfiregames.com/D2848
This was SVN commit r24518.
This commit is contained in:
wraitii 2021-01-06 15:26:11 +00:00
parent 8ae3c09dc0
commit 2d40068cd1
8 changed files with 166 additions and 189 deletions

View File

@ -474,7 +474,6 @@ name_id = "0ad"
duplicateplayernames = false ; Rename joining player to "User (2)" if "User" is already connected, otherwise prohibit join.
lateobservers = everyone ; Allow observers to join the game after it started. Possible values: everyone, buddies, disabled.
observerlimit = 8 ; Prevent further observer joins in running games if this limit is reached
gamestarttimeout = 60000 ; Don't disconnect clients timing out in the loading screen and rejoin process before exceeding this timeout.
[overlay]
fps = "false" ; Show frames per second in top right corner

View File

@ -592,7 +592,7 @@ function setup_all_libs ()
extern_libs = {
"spidermonkey",
"enet",
"boost", -- dragged in via server->simulation.h->random
"boost", -- dragged in via server->simulation.h->random and NetSession.h->lockfree
"fmt",
}
if not _OPTIONS["without-miniupnpc"] then

View File

@ -421,10 +421,6 @@ static void Frame()
g_Game->GetView()->Update(float(realTimeSinceLastFrame));
}
// Immediately flush any messages produced by simulation code
if (g_NetClient)
g_NetClient->Flush();
// Keep us connected to any XMPP servers
if (g_XmppClient)
g_XmppClient->recv();

View File

@ -33,6 +33,7 @@
#include "ps/CStr.h"
#include "ps/Game.h"
#include "ps/Loader.h"
#include "ps/Profile.h"
#include "scriptinterface/ScriptInterface.h"
#include "simulation2/Simulation2.h"
@ -143,6 +144,10 @@ CNetClient::CNetClient(CGame* game, bool isLocalClient) :
CNetClient::~CNetClient()
{
// Try to flush messages before dying (probably fails).
if (m_ClientTurnManager)
m_ClientTurnManager->OnDestroyConnection();
DestroyConnection();
JS_RemoveExtraGCRootsTracer(GetScriptInterface().GetGeneralJSContext(), CNetClient::Trace, this);
}
@ -170,6 +175,7 @@ bool CNetClient::SetupConnection(const CStr& server, const u16 port, ENetHost* e
CNetClientSession* session = new CNetClientSession(*this);
bool ok = session->Connect(server, port, m_IsLocalClient, enetClient);
SetAndOwnSession(session);
m_PollingThread = std::thread(CNetClientSession::RunNetLoop, m_Session);
return ok;
}
@ -181,13 +187,17 @@ void CNetClient::SetAndOwnSession(CNetClientSession* session)
void CNetClient::DestroyConnection()
{
// Attempt to send network messages from the current frame before connection is destroyed.
if (m_ClientTurnManager)
{
m_ClientTurnManager->OnDestroyConnection();
Flush();
}
SAFE_DELETE(m_Session);
if (m_Session)
m_Session->Shutdown();
if (m_PollingThread.joinable())
// Use detach() over join() because we don't want to wait for the session
// (which may be polling or trying to send messages).
m_PollingThread.detach();
// The polling thread will cleanup the session on its own,
// mark it as nullptr here so we know we're done using it.
m_Session = nullptr;
}
void CNetClient::Poll()
@ -195,8 +205,10 @@ void CNetClient::Poll()
if (!m_Session)
return;
PROFILE3("NetClient::poll");
CheckServerConnection();
m_Session->Poll();
m_Session->ProcessPolledMessages();
}
void CNetClient::CheckServerConnection()
@ -231,12 +243,6 @@ void CNetClient::CheckServerConnection()
}
}
void CNetClient::Flush()
{
if (m_Session)
m_Session->Flush();
}
void CNetClient::GuiPoll(JS::MutableHandleValue ret)
{
if (m_GuiMessageQueue.empty())
@ -316,7 +322,7 @@ void CNetClient::HandleDisconnect(u32 reason)
"status", "disconnected",
"reason", reason);
SAFE_DELETE(m_Session);
DestroyConnection();
// Update the state immediately to UNCONNECTED (don't bother with FSM transitions since
// we'd need one for every single state, and we don't need to use per-state actions)
@ -643,8 +649,6 @@ bool CNetClient::OnGameStart(void* context, CFsmEvent* event)
CNetClient* client = static_cast<CNetClient*>(context);
client->m_Session->SetLongTimeout(true);
// Find the player assigned to our GUID
int player = -1;
if (client->m_PlayerAssignments.find(client->m_GUID) != client->m_PlayerAssignments.end())
@ -770,21 +774,10 @@ bool CNetClient::OnClientsLoading(void *context, CFsmEvent *event)
CNetClient* client = static_cast<CNetClient*>(context);
CClientsLoadingMessage* message = static_cast<CClientsLoadingMessage*>(event->GetParamRef());
bool finished = true;
std::vector<CStr> guids;
guids.reserve(message->m_Clients.size());
for (const CClientsLoadingMessage::S_m_Clients& mClient : message->m_Clients)
{
if (client->m_GUID == mClient.m_GUID)
finished = false;
guids.push_back(mClient.m_GUID);
}
// Disable the timeout here after processing the enet message, so as to ensure that the connection isn't currently
// timing out (as it is when just leaving the loading screen in LoadFinished).
if (finished)
client->m_Session->SetLongTimeout(false);
client->PushGuiMessage(
"type", "clients-loading",
@ -825,9 +818,6 @@ bool CNetClient::OnLoadedGame(void* context, CFsmEvent* event)
if (client->m_Rejoin)
client->SendRejoinedMessage();
// The last client to leave the loading screen didn't receive the CClientsLoadingMessage, so disable here.
client->m_Session->SetLongTimeout(false);
return true;
}

View File

@ -25,8 +25,10 @@
#include "ps/CStr.h"
#include <atomic>
#include <ctime>
#include <deque>
#include <thread>
class CGame;
class CNetClientSession;
@ -128,12 +130,6 @@ public:
*/
void CheckServerConnection();
/**
* Flush any queued outgoing network messages.
* This should be called soon after sending a group of messages that may be batched together.
*/
void Flush();
/**
* Retrieves the next queued GUI message, and removes it from the queue.
* The returned value is in the GetScriptInterface() JS context.
@ -232,6 +228,10 @@ public:
*/
void SendPausedMessage(bool pause);
/**
* @return Whether the NetClient is shutting down.
*/
bool ShouldShutdown() const;
private:
void SendAuthenticateMessage();
@ -275,6 +275,8 @@ private:
/// Current network session (or NULL if not connected)
CNetClientSession* m_Session;
std::thread m_PollingThread;
/// Turn manager associated with the current game (or NULL if we haven't started the game yet)
CNetClientTurnManager* m_ClientTurnManager;

View File

@ -56,7 +56,7 @@
#define DEFAULT_SERVER_NAME L"Unnamed Server"
static const int CHANNEL_COUNT = 1;
constexpr int CHANNEL_COUNT = 1;
/**
* enet_host_service timeout (msecs).
@ -1112,8 +1112,6 @@ bool CNetServerWorker::OnAuthenticate(void* context, CFsmEvent* event)
// the most efficient client to request a copy from
CNetServerSession* sourceSession = server.m_Sessions.at(0);
session->SetLongTimeout(true);
sourceSession->GetFileTransferer().StartTask(
shared_ptr<CNetFileReceiveTask>(new CNetFileReceiveTask_ServerRejoin(server, newHostID))
);
@ -1291,8 +1289,6 @@ bool CNetServerWorker::OnLoadedGame(void* context, CFsmEvent* event)
CNetServerSession* loadedSession = (CNetServerSession*)context;
CNetServerWorker& server = loadedSession->GetServer();
loadedSession->SetLongTimeout(false);
// We're in the loading state, so wait until every client has loaded
// before starting the game
ENSURE(server.m_State == SERVER_STATE_LOADING);
@ -1390,8 +1386,6 @@ bool CNetServerWorker::OnRejoined(void* context, CFsmEvent* event)
session->SendMessage(&pausedMessage);
}
session->SetLongTimeout(false);
return true;
}
@ -1488,10 +1482,7 @@ void CNetServerWorker::StartGame()
m_ServerTurnManager = new CNetServerTurnManager(*this);
for (CNetServerSession* session : m_Sessions)
{
m_ServerTurnManager->InitialiseClient(session->GetHostID(), 0); // TODO: only for non-observers
session->SetLongTimeout(true);
}
m_State = SERVER_STATE_LOADING;

View File

@ -16,49 +16,32 @@
*/
#include "precompiled.h"
#include "NetSession.h"
#include "NetClient.h"
#include "NetServer.h"
#include "NetMessage.h"
#include "NetServer.h"
#include "NetStats.h"
#include "lib/external_libraries/enet.h"
#include "ps/CLogger.h"
#include "ps/ConfigDB.h"
#include "ps/Profile.h"
#include "scriptinterface/ScriptInterface.h"
const u32 NETWORK_WARNING_TIMEOUT = 2000;
constexpr int NETCLIENT_POLL_TIMEOUT = 50;
const u32 MAXIMUM_HOST_TIMEOUT = std::numeric_limits<u32>::max();
static const int CHANNEL_COUNT = 1;
// Only disable long timeouts after a packet from the remote enet peer has been processed.
// Otherwise a long timeout can still be in progress when disabling it here.
void SetEnetLongTimeout(ENetPeer* peer, bool isLocalClient, bool enabled)
{
#if (ENET_VERSION >= ENET_VERSION_CREATE(1, 3, 4))
if (!peer || isLocalClient)
return;
if (enabled)
{
u32 timeout;
CFG_GET_VAL("network.gamestarttimeout", timeout);
enet_peer_timeout(peer, 0, timeout, timeout);
}
else
enet_peer_timeout(peer, 0, 0, 0);
#endif
}
constexpr int CHANNEL_COUNT = 1;
CNetClientSession::CNetClientSession(CNetClient& client) :
m_Client(client), m_FileTransferer(this), m_Host(nullptr), m_Server(nullptr), m_Stats(nullptr), m_IsLocalClient(false)
m_Client(client), m_FileTransferer(this), m_Host(nullptr), m_Server(nullptr),
m_Stats(nullptr), m_IsLocalClient(false), m_IncomingMessages(16), m_OutgoingMessages(16),
m_LoopRunning(false), m_ShouldShutdown(false), m_MeanRTT(0), m_LastReceivedTime(0)
{
}
CNetClientSession::~CNetClientSession()
{
ENSURE(!m_LoopRunning);
delete m_Stats;
if (m_Host && m_Server)
@ -74,6 +57,7 @@ CNetClientSession::~CNetClientSession()
bool CNetClientSession::Connect(const CStr& server, const u16 port, const bool isLocalClient, ENetHost* enetClient)
{
ENSURE(!m_LoopRunning);
ENSURE(!m_Host);
ENSURE(!m_Server);
@ -102,12 +86,6 @@ bool CNetClientSession::Connect(const CStr& server, const u16 port, const bool i
m_Server = peer;
m_IsLocalClient = isLocalClient;
// Prevent the local client of the host from timing out too quickly.
#if (ENET_VERSION >= ENET_VERSION_CREATE(1, 3, 4))
if (isLocalClient)
enet_peer_timeout(peer, 1, MAXIMUM_HOST_TIMEOUT, MAXIMUM_HOST_TIMEOUT);
#endif
m_Stats = new CNetStatsTable(m_Server);
if (CProfileViewer::IsInitialised())
g_ProfileViewer.AddRootTable(m_Stats);
@ -115,60 +93,92 @@ bool CNetClientSession::Connect(const CStr& server, const u16 port, const bool i
return true;
}
void CNetClientSession::Disconnect(NetDisconnectReason reason)
void CNetClientSession::RunNetLoop(CNetClientSession* session)
{
if (reason == NDR_UNKNOWN)
LOGWARNING("Disconnecting from the server without communicating the disconnect reason!");
ENSURE(!session->m_LoopRunning);
session->m_LoopRunning = true;
ENSURE(m_Host && m_Server);
debug_SetThreadName("NetClientSession loop");
// TODO: ought to do reliable async disconnects, probably
enet_peer_disconnect_now(m_Server, static_cast<enet_uint32>(reason));
enet_host_destroy(m_Host);
while (!session->m_ShouldShutdown)
{
ENSURE(session->m_Host && session->m_Server);
m_Host = NULL;
m_Server = NULL;
session->m_FileTransferer.Poll();
session->Poll();
session->Flush();
SAFE_DELETE(m_Stats);
session->m_LastReceivedTime = enet_time_get() - session->m_Server->lastReceiveTime;
session->m_MeanRTT = session->m_Server->roundTripTime;
}
session->m_LoopRunning = false;
// Deleting the session is handled in this thread as it might outlive the CNetClient.
SAFE_DELETE(session);
}
void CNetClientSession::Shutdown()
{
m_ShouldShutdown = true;
}
void CNetClientSession::Poll()
{
PROFILE3("net client poll");
ENSURE(m_Host && m_Server);
m_FileTransferer.Poll();
ENetEvent event;
while (enet_host_service(m_Host, &event, 0) > 0)
{
switch (event.type)
{
case ENET_EVENT_TYPE_CONNECT:
// Use the timeout to make the thread wait and save CPU time.
if (enet_host_service(m_Host, &event, NETCLIENT_POLL_TIMEOUT) <= 0)
return;
if (event.type == ENET_EVENT_TYPE_CONNECT)
{
ENSURE(event.peer == m_Server);
// Report the server address
// Report the server address immediately.
char hostname[256] = "(error)";
enet_address_get_host_ip(&event.peer->address, hostname, ARRAY_SIZE(hostname));
LOGMESSAGE("Net client: Connected to %s:%u", hostname, (unsigned int)event.peer->address.port);
m_Client.HandleConnect();
break;
m_IncomingMessages.push(event);
}
case ENET_EVENT_TYPE_DISCONNECT:
else if (event.type == ENET_EVENT_TYPE_DISCONNECT)
{
ENSURE(event.peer == m_Server);
// Report immediately.
LOGMESSAGE("Net client: Disconnected");
m_Client.HandleDisconnect(event.data);
return;
}
case ENET_EVENT_TYPE_RECEIVE:
m_IncomingMessages.push(event);
}
else if (event.type == ENET_EVENT_TYPE_RECEIVE)
m_IncomingMessages.push(event);
}
void CNetClientSession::Flush()
{
ENetPacket* packet;
while (m_OutgoingMessages.pop(packet))
if (enet_peer_send(m_Server, CNetHost::DEFAULT_CHANNEL, packet) < 0)
LOGERROR("NetClient: Failed to send packet to server");
enet_host_flush(m_Host);
}
void CNetClientSession::ProcessPolledMessages()
{
ENetEvent event;
while(m_IncomingMessages.pop(event))
{
if (event.type == ENET_EVENT_TYPE_CONNECT)
m_Client.HandleConnect();
else if (event.type == ENET_EVENT_TYPE_DISCONNECT)
{
// This deletes the session, so we must break;
m_Client.HandleDisconnect(event.data);
break;
}
else if (event.type == ENET_EVENT_TYPE_RECEIVE)
{
CNetMessage* msg = CNetMessageFactory::CreateMessage(event.packet->data, event.packet->dataLength, m_Client.GetScriptInterface());
if (msg)
@ -176,36 +186,29 @@ void CNetClientSession::Poll()
LOGMESSAGE("Net client: Received message %s of size %lu from server", msg->ToString().c_str(), (unsigned long)msg->GetSerializedLength());
m_Client.HandleMessage(msg);
delete msg;
}
// Thread-safe
enet_packet_destroy(event.packet);
break;
}
case ENET_EVENT_TYPE_NONE:
break;
}
}
}
void CNetClientSession::Flush()
{
PROFILE3("net client flush");
ENSURE(m_Host && m_Server);
enet_host_flush(m_Host);
}
bool CNetClientSession::SendMessage(const CNetMessage* message)
{
ENSURE(m_Host && m_Server);
return CNetHost::SendMessage(message, m_Server, "server");
// Thread-safe.
ENetPacket* packet = CNetHost::CreatePacket(message);
if (!packet)
return false;
if (!m_OutgoingMessages.push(packet))
{
LOGERROR("NetClient: Failed to push message on the outgoing queue.");
return false;
}
return true;
}
u32 CNetClientSession::GetLastReceivedTime() const
@ -213,7 +216,7 @@ u32 CNetClientSession::GetLastReceivedTime() const
if (!m_Server)
return 0;
return enet_time_get() - m_Server->lastReceiveTime;
return m_LastReceivedTime;
}
u32 CNetClientSession::GetMeanRTT() const
@ -221,12 +224,7 @@ u32 CNetClientSession::GetMeanRTT() const
if (!m_Server)
return 0;
return m_Server->roundTripTime;
}
void CNetClientSession::SetLongTimeout(bool enabled)
{
SetEnetLongTimeout(m_Server, m_IsLocalClient, enabled);
return m_MeanRTT;
}
CNetServerSession::CNetServerSession(CNetServerWorker& server, ENetPeer* peer) :
@ -289,14 +287,4 @@ void CNetServerSession::SetLocalClient(bool isLocalClient)
if (!isLocalClient)
return;
// Prevent the local client of the host from timing out too quickly
#if (ENET_VERSION >= ENET_VERSION_CREATE(1, 3, 4))
enet_peer_timeout(m_Peer, 0, MAXIMUM_HOST_TIMEOUT, MAXIMUM_HOST_TIMEOUT);
#endif
}
void CNetServerSession::SetLongTimeout(bool enabled)
{
SetEnetLongTimeout(m_Peer, m_IsLocalClient, enabled);
}

View File

@ -18,20 +18,20 @@
#ifndef NETSESSION_H
#define NETSESSION_H
#include "lib/external_libraries/enet.h"
#include "network/fsm.h"
#include "network/NetFileTransfer.h"
#include "network/NetHost.h"
#include "ps/CStr.h"
#include <boost/lockfree/queue.hpp>
#include <atomic>
/**
* Report the peer if we didn't receive a packet after this time (milliseconds).
*/
extern const u32 NETWORK_WARNING_TIMEOUT;
/**
* Maximum timeout of the local client of the host (milliseconds).
*/
extern const u32 MAXIMUM_HOST_TIMEOUT;
inline constexpr u32 NETWORK_WARNING_TIMEOUT = 2000;
class CNetClient;
class CNetServerWorker;
@ -62,6 +62,7 @@ public:
/**
* The client end of a network session.
* Provides an abstraction of the network interface, allowing communication with the server.
* The NetClientSession is threaded, so all calls to the public interface must be thread-safe.
*/
class CNetClientSession : public INetSession
{
@ -74,25 +75,25 @@ public:
bool Connect(const CStr& server, const u16 port, const bool isLocalClient, ENetHost* enetClient);
/**
* Process queued incoming messages.
* The client NetSession is threaded to avoid getting timeouts if the main thread hangs.
* Call Connect() before starting this loop.
*/
void Poll();
static void RunNetLoop(CNetClientSession* session);
/**
* Flush queued outgoing network messages.
* Shut down the net session.
*/
void Flush();
void Shutdown();
/**
* Disconnect from the server.
* Sends a disconnection notification to the server.
* Processes pending messages.
*/
void Disconnect(NetDisconnectReason reason);
void ProcessPolledMessages();
/**
* Send a message to the server.
* Queue up a message to send to the server on the next Loop() call.
*/
virtual bool SendMessage(const CNetMessage* message);
virtual bool SendMessage(const CNetMessage* message) override;
/**
* Number of milliseconds since the most recent packet of the server was received.
@ -104,19 +105,35 @@ public:
*/
u32 GetMeanRTT() const;
/**
* Allows increasing the timeout to prevent drops during an expensive operation,
* and decreasing it back to normal afterwards.
*/
void SetLongTimeout(bool longTimeout);
CNetFileTransferer& GetFileTransferer() { return m_FileTransferer; }
private:
/**
* Process queued incoming messages.
*/
void Poll();
/**
* Flush queued outgoing network messages.
*/
void Flush();
CNetClient& m_Client;
CNetFileTransferer m_FileTransferer;
// Net messages received and waiting for fetching.
boost::lockfree::queue<ENetEvent> m_IncomingMessages;
// Net messages to send on the next flush() call.
boost::lockfree::queue<ENetPacket*> m_OutgoingMessages;
// Wrapper around enet stats - those are atomic as the code is lock-free.
std::atomic<u32> m_LastReceivedTime;
std::atomic<u32> m_MeanRTT;
// If this is true, calling Connect() or deleting the session is an error.
std::atomic<bool> m_LoopRunning;
std::atomic<bool> m_ShouldShutdown;
ENetHost* m_Host;
ENetPeer* m_Server;
CNetStatsTable* m_Stats;
@ -188,12 +205,6 @@ public:
*/
void SetLocalClient(bool isLocalClient);
/**
* Allows increasing the timeout to prevent drops during an expensive operation,
* and decreasing it back to normal afterwards.
*/
void SetLongTimeout(bool longTimeout);
/**
* Send a message to the client.
*/