- Naming Convention changes

- Proper message-based operation of CMessageSocket (error message,
connect complete message, close request message)

This was SVN commit r182.
This commit is contained in:
Simon Brenner 2004-03-08 02:06:06 +00:00
parent 9cb0a192d6
commit c5a3087464
2 changed files with 158 additions and 29 deletions

View File

@ -4,6 +4,10 @@
DEFINE_ERROR(CONFLICTING_OP_IN_PROGRESS, "A conflicting operation is already in progress");
#define ALIGN_UP(_n, _block) (_n+_block-(_n%_block))
#define BUFFER_BLOCK 4096
#define BUFFER_SIZE(_n) ALIGN_UP(_n, BUFFER_BLOCK)
/**
* The SNetHeader will always be stored in host-order
*/
@ -12,14 +16,14 @@ struct SNetHeader
u8 m_MsgType;
u16 m_MsgLength;
inline u8 *Deserialize(u8 *pos)
inline const u8 *Deserialize(const u8 *pos)
{
Deserialize_int_1(pos, m_MsgType);
Deserialize_int_2(pos, m_MsgLength);
return pos;
}
inline u8 *Serialize(u8 *pos)
inline u8 *Serialize(u8 *pos) const
{
Serialize_int_1(pos, m_MsgType);
Serialize_int_2(pos, m_MsgLength);
@ -67,6 +71,29 @@ CNetMessage *CMessagePipe::End::TryPop()
}
}*/
CStr CNetErrorMessage::GetString() const
{
static const char* const states[]={
"SS_UNCONNECTED",
"SS_CONNECT_STARTED",
"SS_CONNECTED",
"SS_CLOSED_LOCALLY"
};
return CStr("NetErrorMessage: ")+
m_Error+", Socket State "+states[m_State];
}
CStr CConnectCompleteMessage::GetString() const
{
return CStr("ConnectCompleteMessage");
}
CStr CCloseRequestMessage::GetString() const
{
return CStr("CloseRequestMessage");
}
void CMessageSocket::Push(CNetMessage *msg)
{
m_OutQ.Lock();
@ -93,11 +120,33 @@ void CMessageSocket::StartWriteNextMessage()
m_OutQ.Lock();
if (!m_IsWriting && m_OutQ.size())
{
// Pop next output message
CNetMessage *pMsg=m_OutQ.front();
m_OutQ.pop_front();
m_IsWriting=true;
m_OutQ.Unlock();
CNetMessage *pMsg=NULL;
while (pMsg == NULL)
{
// This may happen when the last message of the queue is an invalid
// message type (non-network or socket command)
if (m_OutQ.size() == 0)
return;
// Pop next output message
pMsg=m_OutQ.front();
m_OutQ.pop_front();
m_IsWriting=true;
m_OutQ.Unlock();
if (pMsg->GetType() == NMT_CLOSE_REQUEST)
{
Close();
delete pMsg;
pMsg=NULL;
}
else if (pMsg->GetType() < 0)
{
printf("CMessageSocket::StartWriteNextMessage(): Warning: non-network message\n");
delete pMsg;
pMsg=NULL;
}
}
// Prepare the header
SNetHeader hdr;
@ -107,8 +156,7 @@ void CMessageSocket::StartWriteNextMessage()
// Allocate buffer space
if ((uint)(hdr.m_MsgLength+HEADER_LENGTH) > m_WrBufferSize)
{
m_WrBufferSize = (hdr.m_MsgLength+HEADER_LENGTH);
m_WrBufferSize += m_WrBufferSize % 256;
m_WrBufferSize = BUFFER_SIZE(hdr.m_MsgLength+HEADER_LENGTH);
if (m_pWrBuffer)
m_pWrBuffer=(u8 *)realloc(m_pWrBuffer, m_WrBufferSize);
else
@ -127,7 +175,13 @@ void CMessageSocket::StartWriteNextMessage()
printf("CMessageSocket::StartWriteNextMessage(): Writing an MT %d, length %u (%u)\n", hdr.m_MsgType, hdr.m_MsgLength+HEADER_LENGTH, hdr.m_MsgLength);
PS_RESULT res=Write(m_pWrBuffer, hdr.m_MsgLength+HEADER_LENGTH);
if (res != PS_OK)
printf("CMessageSocket::StartWriteNextMessage(): %s\n", res); // Queue Error Message
{
printf("CMessageSocket::StartWriteNextMessage(): %s\n", res);
// Queue Error Message
m_InQ.Lock();
m_InQ.push_back(new CNetErrorMessage(res, GetState()));
m_InQ.Unlock();
}
}
else
{
@ -156,8 +210,8 @@ void CMessageSocket::WriteComplete(PS_RESULT ec)
}
else
{
CScopeLock scopeLock(m_InQ.m_Mutex);
// Push an error message
m_InQ.push_back(new CNetErrorMessage(ec, GetState()));
}
}
@ -165,7 +219,7 @@ void CMessageSocket::StartReadHeader()
{
if (m_RdBufferSize < HEADER_LENGTH)
{
m_RdBufferSize=256;
m_RdBufferSize=BUFFER_SIZE(HEADER_LENGTH);
if (m_pRdBuffer)
m_pRdBuffer=(u8 *)realloc(m_pRdBuffer, m_RdBufferSize);
else
@ -175,7 +229,12 @@ void CMessageSocket::StartReadHeader()
printf("CMessageSocket::StartReadHeader(): Trying to read %u\n", HEADER_LENGTH);
PS_RESULT res=Read(m_pRdBuffer, HEADER_LENGTH);
if (res != PS_OK)
printf("CMessageSocket::StartReadHeader(): %s\n", res); // Push an error message
{
printf("CMessageSocket::StartReadHeader(): %s\n", res);
// Push an error message
CScopeLock scopeLock(m_InQ.m_Mutex);
m_InQ.push_back(new CNetErrorMessage(res, GetState()));
}
}
void CMessageSocket::StartReadMessage()
@ -185,7 +244,7 @@ void CMessageSocket::StartReadMessage()
uint reqBufSize=HEADER_LENGTH+hdr.m_MsgLength;
if (m_RdBufferSize < reqBufSize)
{
m_RdBufferSize=reqBufSize+(reqBufSize%256);
m_RdBufferSize=BUFFER_SIZE(reqBufSize);
if (m_pRdBuffer)
m_pRdBuffer=(u8 *)realloc(m_pRdBuffer, m_RdBufferSize);
else
@ -195,12 +254,17 @@ void CMessageSocket::StartReadMessage()
printf("CMessageSocket::StartReadMessage(): Got type %d, trying to read %u\n", hdr.m_MsgType, hdr.m_MsgLength);
PS_RESULT res=Read(m_pRdBuffer+HEADER_LENGTH, hdr.m_MsgLength);
if (res != PS_OK)
printf("CMessageSocket::StartReadMessage(): %s\n", res); // Queue an error message
{
printf("CMessageSocket::StartReadMessage(): %s\n", res);
// Queue an error message
CScopeLock scopeLock(m_InQ);
m_InQ.push_back(new CNetErrorMessage(res, GetState()));
}
}
void CMessageSocket::ReadComplete(PS_RESULT ec)
{
printf("CMessageSocket::ReadComplete(%s): %s\n", m_ReadingData?"true":"false", ec);
printf("CMessageSocket::ReadComplete(%s): %s\n", m_ReadingData?"data":"header", ec);
// Check if we were reading header or message
// If header:
if (!m_ReadingData)
@ -212,7 +276,7 @@ void CMessageSocket::ReadComplete(PS_RESULT ec)
{
SNetHeader hdr;
hdr.Deserialize(m_pRdBuffer);
CNetMessage *pMsg=CNetMessage::DeserializeMessage((NetMessageType)hdr.m_MsgType, m_pRdBuffer+HEADER_LENGTH, hdr.m_MsgLength);
CNetMessage *pMsg=CNetMessage::DeserializeMessage((ENetMessageType)hdr.m_MsgType, m_pRdBuffer+HEADER_LENGTH, hdr.m_MsgLength);
if (pMsg)
{
m_InQ.Lock();
@ -226,7 +290,23 @@ void CMessageSocket::ReadComplete(PS_RESULT ec)
void CMessageSocket::ConnectComplete(PS_RESULT ec)
{
StartReadHeader();
if (ec == PS_OK)
{
StartReadHeader();
CScopeLock scopeLock(m_InQ);
m_InQ.push_back(new CConnectCompleteMessage());
}
else
{
CScopeLock scopeLock(m_InQ);
m_InQ.push_back(new CNetErrorMessage(ec, GetState()));
}
}
void CMessageSocket::OnClose(PS_RESULT errorCode)
{
CScopeLock scopeLock(m_InQ.m_Mutex);
m_InQ.push_back(new CNetErrorMessage(errorCode, GetState()));
}
CMessageSocket::CMessageSocket(CSocketInternal *pInt):
@ -248,10 +328,20 @@ CMessageSocket::CMessageSocket():
m_ReadingData(false),
m_pRdBuffer(NULL),
m_RdBufferSize(0)
{}
CMessageSocket::~CMessageSocket()
{
}
CMessageSocket::~CMessageSocket()
{
if (m_pRdBuffer)
free(m_pRdBuffer);
if (m_pWrBuffer)
free(m_pWrBuffer);
}
PS_RESULT CMessageSocket::BeginConnect(const char *address, int port)
{
return CStreamSocket::BeginConnect(address, port);
}
// End of Network.cpp

View File

@ -188,13 +188,51 @@ public:
* @see CSocketBase::Accept()
* @see CSocketBase::Reject()
*/
virtual void OnAccept(const SocketAddress &)=0;
virtual void OnAccept(const CSocketAddress &)=0;
};
class CNetErrorMessage: public CNetMessage
{
public:
PS_RESULT m_Error;
ESocketState m_State;
inline CNetErrorMessage():
CNetMessage(NMT_ERROR)
{}
inline CNetErrorMessage(PS_RESULT error, ESocketState state):
CNetMessage(NMT_ERROR),
m_Error(error),
m_State(state)
{}
virtual CStr GetString() const;
};
struct CCloseRequestMessage: public CNetMessage
{
inline CCloseRequestMessage(): CNetMessage(NMT_CLOSE_REQUEST)
{}
virtual CStr GetString() const;
};
struct CConnectCompleteMessage: public CNetMessage
{
inline CConnectCompleteMessage(): CNetMessage(NMT_CONNECT_COMPLETE)
{}
virtual CStr GetString() const;
};
/**
* Implements a Message Pipe over an Async IO stream socket.
*
* All methods that this class exposes are thread safe and may be called from
* any thread.
*/
class CMessageSocket: public CStreamSocket, public IMessagePipeEnd
class CMessageSocket: protected CStreamSocket, public IMessagePipeEnd
{
bool m_IsWriting;
u8 *m_pWrBuffer;
@ -214,6 +252,8 @@ class CMessageSocket: public CStreamSocket, public IMessagePipeEnd
protected:
virtual void ReadComplete(PS_RESULT);
virtual void WriteComplete(PS_RESULT);
virtual void OnClose(PS_RESULT);
virtual void ConnectComplete(PS_RESULT);
public:
CMessageSocket(CSocketInternal *pInt);
@ -221,14 +261,13 @@ public:
virtual ~CMessageSocket();
/**
* Beware! If you subclass and override this method, you must call this
* implementation from the subclass
*/
virtual void ConnectComplete(PS_RESULT errorCode);
virtual void Push(CNetMessage *);
virtual CNetMessage *TryPop();
/**
* See CStreamSocket::BeginConnect.
*/
PS_RESULT BeginConnect(const char *address, int port);
};
#endif