1
0
forked from 0ad/0ad

Fixed the RunWaitLoop, and some thread issues with it. Fixed similar glitches in StreamSocket as well

This was SVN commit r127.
This commit is contained in:
Simon Brenner 2003-11-30 17:00:27 +00:00
parent 323c58f945
commit 0ee356766b
4 changed files with 140 additions and 112 deletions

View File

@ -14,11 +14,9 @@ DEFINE_ERROR(NO_ROUTE_TO_HOST, "No route to host");
DEFINE_ERROR(CONNECTION_BROKEN, "The connection has been closed");
DEFINE_ERROR(CONNECT_IN_PROGRESS, "The connection attempt has started, but is not yet complete");
// The conditions that may cause this errors are at least as obscure as the message
DEFINE_ERROR(WAIT_LOOP_FAIL, "RunWaitLoop Internal Error");
DEFINE_ERROR(PORT_IN_USE, "The port is already in use by another process");
DEFINE_ERROR(INVALID_PORT, "The port specified is either invalid, or forbidden by system or firewall policy");
DEFINE_ERROR(NO_SOCKET_SUPPORT, "The socket type or protocol is not supported by the operating system. Make sure that the TCP/IP protocol is installed and activated");
DEFINE_ERROR(INVALID_PROTOCOL, "An incompatible or unsupported protocol was specified for the operation");
DEFINE_ERROR(INVALID_PROTOCOL, "The socket type or protocol is not supported by the operating system. Make sure that the TCP/IP protocol is installed and activated");
// Map an OS error number to a PS_RESULT
PS_RESULT GetPS_RESULT(int error)
@ -62,6 +60,8 @@ PS_RESULT SocketAddress::Resolve(const char *name, int port, SocketAddress &addr
{
hostent *he;
//FIXME IPv6 compatibilitise
// Construct address
// Try to parse dot-notation IP
addr.m_IPv4.sin_addr.s_addr=inet_addr(name);
@ -132,9 +132,7 @@ void *WaitLoopThreadMain(void *)
PS_RESULT CSocketBase::Initialize(SocketProtocol proto)
{
ONCE(
CSocketBase::InitWaitLoop();
pthread_create(&g_SocketSetInternal.m_Thread, NULL, WaitLoopThreadMain, NULL);
//pthread_detach(&thread);
);
int res=socket(proto, SOCK_STREAM, 0);
@ -305,6 +303,8 @@ PS_RESULT CSocketBase::Bind(const SocketAddress &address)
Initialize(address.GetProtocol());
SetOpMask(READ);
res=bind(m_pInternal->m_fd, (struct sockaddr *)&address, sizeof(address));
if (res == -1)
{
@ -338,8 +338,6 @@ PS_RESULT CSocketBase::Bind(const SocketAddress &address)
return PS_FAIL;
}
SetOpMask(READ);
m_State=SS_CONNECTED;
m_Error=PS_OK;
return PS_OK;
@ -411,10 +409,12 @@ bool ConnectError(CSocketBase *pSocket, CSocketInternal *pInt)
return false;
}
void CSocketBase::InitWaitLoop()
void CSocketBase::RunWaitLoop()
{
int res;
signal(SIGPIPE, SIG_IGN);
pthread_mutex_lock(&g_SocketSetInternal.m_Mutex);
// Create Control Pipe
@ -424,15 +424,6 @@ void CSocketBase::InitWaitLoop()
g_SocketSetInternal.m_Pipe[0] == -1;
return;
}
pthread_mutex_unlock(&g_SocketSetInternal.m_Mutex);
}
void CSocketBase::RunWaitLoop()
{
int res;
pthread_mutex_lock(&g_SocketSetInternal.m_Mutex);
if (g_SocketSetInternal.m_Pipe[0] == -1)
{
@ -602,7 +593,7 @@ void CSocketBase::SendWaitLoopAbort()
void CSocketBase::SendWaitLoopUpdate()
{
//printf("SendWaitLoopUpdate: fd %d, ops %u\n", pSocket->m_pInternal->m_fd, ops);
printf("SendWaitLoopUpdate: fd %d, ops %u\n", m_pInternal->m_fd, m_pInternal->m_Ops);
char msg='r';
write(g_SocketSetInternal.m_Pipe[1], &msg, 1);
}
@ -611,16 +602,76 @@ void CSocketBase::SendWaitLoopUpdate()
// Windows WindowProc for async event notification
#ifdef _WIN32
void CSocketBase::InitWaitLoop()
void WaitLoop_SocketUpdateProc(int fd, int error, uint event)
{
pthread_mutex_lock(&g_SocketSetInternal.m_Mutex);
CSocketBase *pSock=g_SocketSetInternal.m_HandleMap[fd];
pthread_mutex_unlock(&g_SocketSetInternal.m_Mutex);
if (error)
{
PS_RESULT res=GetPS_RESULT(error);
if (res == PS_FAIL)
pSock->OnClose(CONNECTION_BROKEN);
pSock->m_Error=res;
pSock->m_State=SS_UNCONNECTED;
return;
}
if (pSock->m_State==SS_CONNECT_STARTED)
{
pSock->m_Error=PS_OK;
pSock->m_State=SS_CONNECTED;
}
switch (event)
{
case FD_ACCEPT:
case FD_READ:
pSock->OnRead();
break;
case FD_CONNECT:
case FD_WRITE:
pSock->OnWrite();
break;
case FD_CLOSE:
// If FD_CLOSE and error, OnClose has already been called above
// with the appropriate PS_RESULT
pSock->OnClose(PS_OK);
break;
}
}
LRESULT WINAPI WaitLoop_WindowProc(HWND hWnd, UINT msg, WPARAM wParam, LPARAM lParam)
{
//printf("WaitLoop_WindowProc(): Windows message: %d:%d:%d\n", msg, wParam, lParam);
switch (msg)
{
case MSG_SOCKET_READY:
{
int event=LOWORD(lParam);
int error=HIWORD(lParam);
WaitLoop_SocketUpdateProc(wParam, error, event);
return FALSE;
}
default:
return DefWindowProc(hWnd, msg, wParam, lParam);
}
}
void CSocketBase::RunWaitLoop()
{
WNDCLASS wc;
ATOM atom;
int ret;
char errBuf[256];
MSG msg;
WNDCLASS wc;
ATOM atom;
memset(&wc, 0, sizeof(WNDCLASS));
wc.lpszClassName="Network Event WindowClass";
wc.lpfnWndProc=DefWindowProc;
wc.lpfnWndProc=WaitLoop_WindowProc;
atom=RegisterClass(&wc);
if (!atom)
@ -641,68 +692,37 @@ void CSocketBase::InitWaitLoop()
}
//pthread_cond_signal(&g_SocketSetInternal.m_CondVar);
pthread_mutex_unlock(&g_SocketSetInternal.m_Mutex);
}
void CSocketBase::RunWaitLoop()
{
int ret;
char errBuf[256];
MSG msg;
if (!g_SocketSetInternal.m_hWnd) return;
if (!g_SocketSetInternal.m_hWnd)
{
//TODO Some kind of error message, and exit
return;
}
pthread_mutex_lock(&g_SocketSetInternal.m_Mutex);
// If OpMasks where set in another thread before we got this far,
// WSAAsyncSelect will need to be called again
std::map<int, CSocketBase *>::iterator it;
it=g_SocketSetInternal.m_HandleMap.begin();
while (it != g_SocketSetInternal.m_HandleMap.end())
{
it->second->SetOpMask(it->second->GetOpMask());
++it;
}
pthread_mutex_unlock(&g_SocketSetInternal.m_Mutex);
printf("Commencing message loop. hWnd %p\n", g_SocketSetInternal.m_hWnd);
while ((ret=GetMessage(&msg, g_SocketSetInternal.m_hWnd, 0, 0))!=0)
{
printf("RunWaitLoop(): Windows message: %d:%d:%d\n", msg.message, msg.wParam, msg.lParam);
if (ret == -1)
{
ret=GetLastError();
Network_GetErrorString(ret, errBuf, sizeof(errBuf));
printf("GetMessage: %s [%d]\n", errBuf, ret);
}
if (msg.message==MSG_SOCKET_READY)
{
int event=LOWORD(msg.lParam);
int error=HIWORD(msg.lParam);
pthread_mutex_lock(&g_SocketSetInternal.m_Mutex);
CSocketBase *pSock=g_SocketSetInternal.m_HandleMap[msg.wParam];
pthread_mutex_unlock(&g_SocketSetInternal.m_Mutex);
if (error)
{
PS_RESULT res=GetPS_RESULT(error);
if (res == PS_FAIL)
pSock->OnClose(CONNECTION_BROKEN);
pSock->m_Error=res;
pSock->m_State=SS_UNCONNECTED;
break;
}
if (pSock->m_State==SS_CONNECT_STARTED)
{
pSock->m_Error=PS_OK;
pSock->m_State=SS_CONNECTED;
}
switch (event)
{
case FD_ACCEPT:
case FD_READ:
pSock->OnRead();
break;
case FD_CONNECT:
case FD_WRITE:
pSock->OnWrite();
break;
case FD_CLOSE:
// If FD_CLOSE and error, OnClose has already been called above
// with the appropriate PS_RESULT
pSock->OnClose(PS_OK);
break;
}
}
else
{
TranslateMessage(&msg);
DispatchMessage(&msg);
@ -736,6 +756,7 @@ void CSocketBase::SendWaitLoopUpdate()
if (m_pInternal->m_Ops & WRITE)
wsaOps |= FD_WRITE|FD_CONNECT;
pthread_mutex_unlock(&g_SocketSetInternal.m_Mutex);
printf("SendWaitLoopUpdate: %d: %u %x -> %p\n", m_pInternal->m_fd, m_pInternal->m_Ops, wsaOps, g_SocketSetInternal.m_hWnd);
WSAAsyncSelect(m_pInternal->m_fd, g_SocketSetInternal.m_hWnd, MSG_SOCKET_READY, wsaOps);
}
else
@ -765,13 +786,11 @@ void CSocketBase::SetOpMask(uint ops)
g_SocketSetInternal.m_HandleMap[m_pInternal->m_fd]=this;
m_pInternal->m_Ops=ops;
/*printf("SetOpMask(fd %d, ops %u) %u, %u\n",
pSocket->m_pInternal->m_fd,
printf("SetOpMask(fd %d, ops %u) %u\n",
m_pInternal->m_fd,
ops,
g_SocketSetInternal.m_Sockets[pSocket].m_Ops,
g_SocketSetInternal.m_HandleMap[pSocket->m_pInternal->m_fd]->m_Ops);*/
g_SocketSetInternal.m_HandleMap[m_pInternal->m_fd]->m_pInternal->m_Ops);
pthread_mutex_unlock(&g_SocketSetInternal.m_Mutex);
SendWaitLoopUpdate();
pthread_mutex_unlock(&g_SocketSetInternal.m_Mutex);
}

View File

@ -40,7 +40,9 @@ class CSocketInternal;
// struct and a PF_* value
enum SocketProtocol
{
UNSPEC=-1, // This should be an invalid value
// This should be a value that's invalid for most socket functions, so that
// you don't accidentally use an UNSPEC SocketAddress
UNSPEC=((sa_family_t)-1),
IPv4=PF_INET,
#ifdef USE_INET6
IPv6=PF_INET6,
@ -58,9 +60,7 @@ enum SocketProtocol
union SocketAddress
{
sockaddr_in m_IPv4;
#ifdef USE_INET6
sockaddr_in6 m_IPv6;
#endif
inline SocketProtocol GetProtocol() const
{
@ -168,11 +168,20 @@ private:
* The network thread entry point. Simply calls RunWaitLoop()
*/
friend void *WaitLoopThreadMain(void *);
#ifdef _WIN32
/**
* Used by the winsock AsyncSelect windowproc
*/
friend void WaitLoop_SocketUpdateProc(int fd, int error, uint eventmask);
#else
/**
* An internal utility function used by the UNIX select loop
*/
friend bool ConnectError(CSocketBase *, CSocketInternal *);
#endif
/**
* Abort the call to RunWaitLoop(), if one is currently running.
@ -187,18 +196,6 @@ private:
void SendWaitLoopUpdate();
protected:
// These values are bitwise or-ed to produce op masks
enum Ops
{
// Call OnRead() on a stream socket when there is data to read from the
// socket, or OnAccept() on a server socket when there are incoming
// connections pending
READ=1,
// Call OnWrite() when there is space available in the socket's output
// buffer. Has no effect on server sockets.
WRITE=2
};
/**
* Initialize a CSocketBase from a CSocketInternal pointer. Use in OnAccept
* callbacks to create an object of your subclass. This constructor should
@ -228,6 +225,22 @@ protected:
void SetOpMask(uint ops);
public:
/**
* These values are bitwise or-ed to produce op masks
*/
enum Ops
{
/**
* Call OnRead() on a stream socket when there is data to read from the
* socket, or OnAccept() on a server socket when there are incoming
* connections pending
*/
READ=1,
// Call OnWrite() when there is space available in the socket's output
// buffer. Has no effect on server sockets.
WRITE=2
};
/**
* Constructs a CSocketBase. The OS socket object is not created by the
* constructor, but by the protected Initialize method, which is called by
@ -352,14 +365,6 @@ public:
*/
const SocketAddress &GetRemoteAddress();
/**
* Get the address of the internal pointer. Can be used in an OnAccept
* callback to implement address-based protection.
*
* @return A reference to the socket address
*/
static const SocketAddress &GetRemoteAddress(CSocketInternal *pInt);
/**
* Attempt to read data from the socket. Any data available without blocking
* will be returned. Note that a successful return does not mean that the
@ -409,4 +414,4 @@ public:
virtual void OnClose(PS_RESULT errorCode)=0;
};
#endif
#endif

View File

@ -27,6 +27,11 @@ void *CStreamSocket_ConnectThread(void *data)
}
pSock->SetNonBlocking(true);
// This should call the right callbacks, so that you get the expected
// results if you call Read or Write before the connect actually is complete
pSock->SetOpMask((pSock->m_WriteContext.m_Valid?CSocketBase::WRITE:0)|CSocketBase::READ);
pSock->ConnectComplete(res);
free(pSock->m_pConnectHost);
@ -62,7 +67,6 @@ PS_RESULT CStreamSocket::Read(void *buf, uint len)
m_ReadContext.m_Length=len;
m_ReadContext.m_Completed=0;
OnRead();
SetOpMask(GetOpMask()|READ);
return PS_OK;
@ -79,12 +83,11 @@ PS_RESULT CStreamSocket::Write(void *buf, uint len)
return CONFLICTING_OP_IN_PROGRESS;
// Fill in read_cb
m_WriteContext.m_Valid=true;
m_WriteContext.m_pBuffer=buf;
m_WriteContext.m_Length=len;
m_WriteContext.m_Completed=0;
m_WriteContext.m_Valid=true;
OnWrite();
SetOpMask(GetOpMask()|WRITE);
return PS_OK;
@ -131,7 +134,7 @@ void CStreamSocket::OnWrite()
WriteComplete(res);
return;
}
printf("OnWrite(): %u bytes\n", bytes);
printf("CStreamSocket::OnWrite(): %u bytes\n", bytes);
m_WriteContext.m_Completed+=bytes;
if (m_WriteContext.m_Completed == m_WriteContext.m_Length)
{
@ -144,7 +147,8 @@ void CStreamSocket::OnRead()
{
if (!m_ReadContext.m_Valid)
{
SetOpMask(GetOpMask() & (~READ));
printf("CStreamSocket::OnRead(): No Read request in progress\n");
//SetOpMask(GetOpMask() & (~READ));
return;
}
uint bytes=0;
@ -152,12 +156,12 @@ void CStreamSocket::OnRead()
((char *)m_ReadContext.m_pBuffer)+m_ReadContext.m_Completed,
m_ReadContext.m_Length-m_ReadContext.m_Completed,
&bytes);
printf("CStreamSocket::OnRead(): %s, %u bytes read of %u\n", res, bytes, m_ReadContext.m_Length-m_ReadContext.m_Completed);
if (res != PS_OK)
{
ReadComplete(res);
return;
}
printf("OnRead(): %u bytes read of %u\n", bytes, m_ReadContext.m_Length-m_ReadContext.m_Completed);
m_ReadContext.m_Completed+=bytes;
if (m_ReadContext.m_Completed == m_ReadContext.m_Length)
{

View File

@ -142,4 +142,4 @@ public:
virtual void OnClose(PS_RESULT errorCode);
};
#endif
#endif