1
0
forked from 0ad/0ad
0ad/source/network/SocketBase.cpp
Matei 92578ae553 # Some initial work on networking, fixing session setup, game startup, and command queueing.
- Fixed outdated / buggy networking code in the GUI scripts.
- Finished the API to CNetClient so that it's possible to start a CGame
from it.
- Some enhancements for debugging networking: Enabled updates while the
game is minimized/out-of-focus if it's in a network session. Also
reduced the turn length to something slightly more manageable but still
unplayable (1 sec versus 3 sec).
- Fixed a bug where IssueCommand used to access the order it creates
after queueing it, which is bad if the order gets deleted while being
queued (e.g. in CNetClient).

This was SVN commit r5139.
2007-06-04 07:41:05 +00:00

993 lines
22 KiB
C++

#include "precompiled.h"
#if OS_WIN
#include "lib/sysdep/win/win.h"
#include "lib/sysdep/win/wposix/wsock_internal.h"
#endif
#include "Network.h"
#include "NetworkInternal.h"
#include "ps/CStr.h"
// ERROR is defined by some windows header. Undef it
#undef ERROR
#include "ps/CLogger.h"
#include "NetLog.h"
#if !OS_WIN
# include <fcntl.h>
# include <signal.h>
# include <sys/ioctl.h>
#endif
// Record global transfer statistics (sent/recvd bytes). This will put a lock
// /unlock pair in all read and write operations.
#define RECORD_GLOBAL_STATS 1
#define GLOBAL_LOCK() pthread_mutex_lock(&g_SocketSetInternal.m_Mutex)
#define GLOBAL_UNLOCK() pthread_mutex_unlock(&g_SocketSetInternal.m_Mutex)
CSocketSetInternal g_SocketSetInternal;
DEFINE_ERROR(NO_SUCH_HOST, "Host not found");
DEFINE_ERROR(CONNECT_TIMEOUT, "The connection attempt timed out");
DEFINE_ERROR(CONNECT_REFUSED, "The connection attempt was refused");
DEFINE_ERROR(NO_ROUTE_TO_HOST, "No route to host");
DEFINE_ERROR(CONNECTION_BROKEN, "The connection has been closed");
DEFINE_ERROR(CONNECT_IN_PROGRESS, "The connect attempt has started, but is not yet complete");
DEFINE_ERROR(PORT_IN_USE, "The port is already in use by another process");
DEFINE_ERROR(INVALID_PORT, "The port specified is either invalid, or forbidden by system or firewall policy");
DEFINE_ERROR(INVALID_PROTOCOL, "The socket type or protocol is not supported by the operating system. Make sure that the TCP/IP protocol is installed and activated");
// Map an OS error number to a PS_RESULT
PS_RESULT GetPS_RESULT(int error)
{
switch (error)
{
case EWOULDBLOCK:
case EINPROGRESS:
return PS_OK;
case ENETUNREACH:
case ENETDOWN:
case EADDRNOTAVAIL:
return NO_ROUTE_TO_HOST;
case ETIMEDOUT:
return CONNECT_TIMEOUT;
case ECONNREFUSED:
return CONNECT_REFUSED;
default:
char buf[256];
Network_GetErrorString(error, buf, sizeof(buf));
LOG(ERROR, LOG_CAT_NET, "SocketBase.cpp::GetPS_RESULT(): Unrecognized error %s[%d]", buf, error);
return PS_FAIL;
}
}
CSocketAddress::CSocketAddress(int port, ESocketProtocol proto)
{
memset(&m_Union, 0, sizeof(m_Union));
switch (proto)
{
case IPv4:
m_Union.m_IPv4.sin_family=PF_INET;
m_Union.m_IPv4.sin_addr.s_addr=htonl(INADDR_ANY);
m_Union.m_IPv4.sin_port=htons(port);
break;
case IPv6:
m_Union.m_IPv6.sin6_family=PF_INET6;
cpu_memcpy(&m_Union.m_IPv6.sin6_addr, &in6addr_any, sizeof(in6addr_any));
m_Union.m_IPv6.sin6_port=htons(port);
break;
default:
debug_warn("CSocketAddress::CSocketAddress: Bad proto");
}
}
CSocketAddress CSocketAddress::Loopback(int port, ESocketProtocol proto)
{
CSocketAddress ret;
switch (proto)
{
case IPv4:
ret.m_Union.m_IPv4.sin_family=PF_INET;
ret.m_Union.m_IPv4.sin_addr.s_addr=htonl(INADDR_LOOPBACK);
ret.m_Union.m_IPv4.sin_port=htons(port);
break;
case IPv6:
ret.m_Union.m_IPv6.sin6_family=PF_INET6;
cpu_memcpy(&ret.m_Union.m_IPv6.sin6_addr, &in6addr_loopback, sizeof(in6addr_loopback));
ret.m_Union.m_IPv6.sin6_port=htons(port);
break;
default:
debug_warn("CSocketAddress::CSocketAddress: Bad proto");
}
return ret;
}
PS_RESULT CSocketAddress::Resolve(const char *name, int port, CSocketAddress &addr)
{
if ((getaddrinfo) != NULL)
{
addrinfo *ai;
int res=getaddrinfo(name, NULL, NULL, &ai);
if (res == 0)
{
if (ai->ai_addrlen < sizeof(addr.m_Union))
cpu_memcpy(&addr.m_Union, ai->ai_addr, ai->ai_addrlen);
switch (addr.m_Union.m_Family)
{
case IPv4:
addr.m_Union.m_IPv4.sin_port=htons(port);
break;
case IPv6:
addr.m_Union.m_IPv6.sin6_port=htons(port);
break;
}
freeaddrinfo(ai);
return PS_OK;
}
else
return NO_SUCH_HOST;
}
else
{
hostent *he;
addr.m_Union.m_IPv4.sin_family=AF_INET;
addr.m_Union.m_IPv4.sin_port=htons(port);
// Try to parse dot-notation IP
addr.m_Union.m_IPv4.sin_addr.s_addr=inet_addr(name);
if (addr.m_Union.m_IPv4.sin_addr.s_addr==INADDR_NONE) // Not a dotted IP, try name resolution
{
he=gethostbyname(name);
if (!he)
{
return NO_SUCH_HOST;
}
addr.m_Union.m_IPv4.sin_addr=*(struct in_addr *)(he->h_addr_list[0]);
}
return PS_OK;
}
}
CStr CSocketAddress::GetString() const
{
char convBuf[NI_MAXHOST];
if ((getnameinfo) != NULL)
{
int res=getnameinfo((sockaddr *)&m_Union, sizeof(m_Union), convBuf, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
if (res == 0)
{
return CStr(convBuf);
}
// getnameinfo won't return a string for the IPv6 unspecified address
else if (m_Union.m_Family == IPv6 && res==EAI_NONAME)
return "::";
else
return "";
}
else if (m_Union.m_Family == IPv4)
{
sprintf(convBuf, "%d.%d.%d.%d",
m_Union.m_IPv4.sin_addr.s_addr&0xff,
(m_Union.m_IPv4.sin_addr.s_addr>>8)&0xff,
(m_Union.m_IPv4.sin_addr.s_addr>>16)&0xff,
(m_Union.m_IPv4.sin_addr.s_addr>>24)&0xff);
return CStr(convBuf);
}
else
return CStr();
}
int CSocketAddress::GetPort() const
{
switch (m_Union.m_Family)
{
case IPv4:
case IPv6:
return ntohs(m_Union.m_IPv4.sin_port);
}
return -1;
}
CSocketBase::CSocketBase()
{
m_pInternal=new CSocketInternal;
m_Proto=UNSPEC;
m_NonBlocking=true;
m_State=SS_UNCONNECTED;
m_Error=PS_OK;
}
CSocketBase::CSocketBase(CSocketInternal *pInt)
{
m_pInternal=pInt;
m_Proto=pInt->m_RemoteAddr.GetProtocol();
m_State=SS_CONNECTED;
m_Error=PS_OK;
SetNonBlocking(true);
NET_LOG("CSocketBase::CSocketBase(): Created socket from fd %d", pInt->m_fd);
}
CSocketBase::~CSocketBase()
{
NET_LOG("CSocketBase::~CSocketBase(): fd is %d. "
"Received: %lld bytes. Sent: %lld bytes.", m_pInternal->m_fd,
m_pInternal->m_RecvBytes, m_pInternal->m_SentBytes);
Destroy();
delete m_pInternal;
}
void CSocketBase::Shutdown()
{
GLOBAL_LOCK();
if (g_SocketSetInternal.m_NumSockets)
{
NET_LOG("CSocketBase::Shutdown(): %d sockets still open! (forcing network shutdown)", g_SocketSetInternal.m_NumSockets);
}
#if RECORD_GLOBAL_STATS
NET_LOG("GLOBAL SOCKET STATISTICS: "
"Received: %lld bytes. Sent: %lld bytes.",
g_SocketSetInternal.m_GlobalRecvBytes,
g_SocketSetInternal.m_GlobalSentBytes);
#endif
GLOBAL_UNLOCK();
if (g_SocketSetInternal.m_Thread)
{
AbortWaitLoop();
pthread_join(g_SocketSetInternal.m_Thread, NULL);
}
}
void *WaitLoopThreadMain(void *)
{
debug_set_thread_name("net_wait");
GLOBAL_LOCK();
CSocketBase::RunWaitLoop();
g_SocketSetInternal.m_Thread=0;
GLOBAL_UNLOCK();
return NULL;
}
PS_RESULT CSocketBase::Initialize(ESocketProtocol proto)
{
int res=socket(proto, SOCK_STREAM, 0);
NET_LOG("CSocketBase::Initialize(): socket() res: %d", res);
if (res == -1)
{
return INVALID_PROTOCOL;
}
m_pInternal->m_fd=res;
m_Proto=proto;
GLOBAL_LOCK();
if (g_SocketSetInternal.m_NumSockets == 0)
pthread_create(&g_SocketSetInternal.m_Thread, NULL, WaitLoopThreadMain, NULL);
g_SocketSetInternal.m_NumSockets++;
GLOBAL_UNLOCK();
SetNonBlocking(m_NonBlocking);
return PS_OK;
}
void CSocketBase::Close()
{
shutdown(m_pInternal->m_fd, SHUT_WR);
m_State=SS_CLOSED_LOCALLY;
}
void CSocketBase::Destroy()
{
if (m_pInternal->m_fd == -1)
{
m_State=SS_UNCONNECTED;
return;
}
// Remove any data associated with the file descriptor
GLOBAL_LOCK();
debug_assert(g_SocketSetInternal.m_NumSockets > 0);
g_SocketSetInternal.m_NumSockets--;
g_SocketSetInternal.m_HandleMap.erase(m_pInternal->m_fd);
if (!g_SocketSetInternal.m_NumSockets)
AbortWaitLoop();
GLOBAL_UNLOCK();
// Disconnect the socket, if it is still connected
if (m_State == SS_CONNECTED || m_State == SS_CLOSED_LOCALLY)
{
// This makes the other end receive a RST, but since
// we've had no chance to close cleanly and the socket must
// be destroyed immediately, we've got no choice
shutdown(m_pInternal->m_fd, SHUT_RDWR);
m_State=SS_UNCONNECTED;
}
// Destroy the socket
closesocket(m_pInternal->m_fd);
m_pInternal->m_fd=-1;
}
void CSocketBase::SetNonBlocking(bool nonblocking)
{
#if OS_WIN
unsigned long nb=nonblocking;
int res=ioctlsocket(m_pInternal->m_fd, FIONBIO, &nb);
if (res == -1)
NET_LOG("SetNonBlocking: res %d", res);
#else
int oldflags=fcntl(m_pInternal->m_fd, F_GETFL, 0);
if (oldflags != -1)
{
if (nonblocking)
oldflags |= O_NONBLOCK;
else
oldflags &= ~O_NONBLOCK;
fcntl(m_pInternal->m_fd, F_SETFL, oldflags);
}
#endif
m_NonBlocking=nonblocking;
}
void CSocketBase::SetTcpNoDelay(bool tcpNoDelay)
{
// Disable Nagle's Algorithm
int data=tcpNoDelay;
setsockopt(m_pInternal->m_fd, SOL_SOCKET, TCP_NODELAY, (const char *)&data, sizeof(data));
}
PS_RESULT CSocketBase::Read(void *buf, uint len, uint *bytesRead)
{
int res;
char errbuf[256];
res=recv(m_pInternal->m_fd, (char *)buf, len, 0);
if (res < 0)
{
*bytesRead=0;
int error=Network_LastError;
switch (error)
{
case EWOULDBLOCK:
return PS_OK;
/*case ENETDOWN:
case ENETRESET:
case ENOTCONN:
case ESHUTDOWN:
case ECONNABORTED:
case ECONNRESET:
case ETIMEDOUT:*/
default:
Network_GetErrorString(error, errbuf, sizeof(errbuf));
NET_LOG("Read error %s [%d]", errbuf, error);
m_State=SS_UNCONNECTED;
m_Error=GetPS_RESULT(error);
return m_Error;
}
}
if (res == 0 && len > 0) // EOF - Cleanly closed socket
{
*bytesRead=0;
m_State=SS_UNCONNECTED;
m_Error=PS_OK;
return CONNECTION_BROKEN;
}
*bytesRead=res;
m_pInternal->m_RecvBytes += res;
#if RECORD_GLOBAL_STATS
GLOBAL_LOCK();
g_SocketSetInternal.m_GlobalRecvBytes += res;
GLOBAL_UNLOCK();
#endif
return PS_OK;
}
PS_RESULT CSocketBase::Write(void *buf, uint len, uint *bytesWritten)
{
int res;
char errbuf[256];
res=send(m_pInternal->m_fd, (char *)buf, len, 0);
if (res < 0)
{
*bytesWritten=0;
int err=Network_LastError;
switch (err)
{
case EWOULDBLOCK:
return PS_OK;
/*case ENETDOWN:
case ENETRESET:
case ENOTCONN:
case ESHUTDOWN:
case ECONNABORTED:
case ECONNRESET:
case ETIMEDOUT:
case EHOSTUNREACH:*/
default:
Network_GetErrorString(err, errbuf, sizeof(errbuf));
NET_LOG("Write error %s [%d]", errbuf, err);
m_State=SS_UNCONNECTED;
return CONNECTION_BROKEN;
}
}
*bytesWritten=res;
m_pInternal->m_SentBytes += res;
#if RECORD_GLOBAL_STATS
GLOBAL_LOCK();
g_SocketSetInternal.m_GlobalSentBytes += res;
GLOBAL_UNLOCK();
#endif
return PS_OK;
}
PS_RESULT CSocketBase::Connect(const CSocketAddress &addr)
{
int res=connect(m_pInternal->m_fd, (struct sockaddr *)&addr, sizeof(addr));
if (res != 0)
{
int error=Network_LastError;
if (m_NonBlocking && error == EWOULDBLOCK)
m_State=SS_CONNECT_STARTED;
else
{
m_State=SS_UNCONNECTED;
m_Error=GetPS_RESULT(error);
}
}
else
{
m_State=SS_CONNECTED;
m_Error=PS_OK;
}
return m_Error;
}
PS_RESULT CSocketBase::Bind(const CSocketAddress &address)
{
char errBuf[256];
int res;
Initialize(address.GetProtocol());
SetOpMask(READ);
res=bind(m_pInternal->m_fd, (struct sockaddr *)&address, sizeof(address));
if (res == -1)
{
PS_RESULT ret=PS_FAIL;
int err=Network_LastError;
switch (err)
{
case EADDRINUSE:
ret=PORT_IN_USE;
break;
case EACCES:
case EADDRNOTAVAIL:
ret=INVALID_PORT;
break;
default:
Network_GetErrorString(err, errBuf, sizeof(errBuf));
LOG(ERROR, LOG_CAT_NET, "CServerSocket::Bind(): bind: %s [%d] => PS_FAIL", errBuf, err);
}
m_State=SS_UNCONNECTED;
m_Error=ret;
return ret;
}
res=listen(m_pInternal->m_fd, 5);
if (res == -1)
{
int err=Network_LastError;
Network_GetErrorString(err, errBuf, sizeof(errBuf));
LOG(ERROR, LOG_CAT_NET, "CServerSocket::Bind(): listen: %s [%d] => PS_FAIL", errBuf, err);
m_State=SS_UNCONNECTED;
return PS_FAIL;
}
m_State=SS_CONNECTED;
m_Error=PS_OK;
return PS_OK;
}
PS_RESULT CSocketBase::PreAccept(CSocketAddress &addr)
{
socklen_t addrLen=sizeof(addr.m_Union);
int fd=accept(m_pInternal->m_fd, (struct sockaddr *)&addr.m_Union, &addrLen);
m_pInternal->m_AcceptFd=fd;
m_pInternal->m_AcceptAddr=addr;
if (fd != -1)
return PS_OK;
else
{
PS_RESULT res=GetPS_RESULT(Network_LastError);
// GetPS_RESULT considers some errors non-failures
if (res == PS_OK)
return PS_FAIL;
else
return res;
}
}
CSocketInternal *CSocketBase::Accept()
{
if (m_pInternal->m_AcceptFd != -1)
{
CSocketInternal *pInt=new CSocketInternal();
pInt->m_fd=m_pInternal->m_AcceptFd;
pInt->m_RemoteAddr=m_pInternal->m_AcceptAddr;
GLOBAL_LOCK();
g_SocketSetInternal.m_NumSockets++;
GLOBAL_UNLOCK();
m_pInternal->m_AcceptFd=-1;
return pInt;
}
else
return NULL;
}
void CSocketBase::Reject()
{
shutdown(m_pInternal->m_AcceptFd, SHUT_RDWR);
closesocket(m_pInternal->m_AcceptFd);
}
// UNIX select loop
#if !OS_WIN
// ConnectError is called on a socket the first time it selects as ready
// after the BeginConnect, to check errors on the socket and update the
// connection status information
//
// Returns: true if error callback should be called, false if it should not
bool CSocketBase::ConnectError(CSocketBase *pSocket)
{
CSocketInternal *pInt=pSocket->m_pInternal;
uint buf;
int res;
if (pSocket->m_State==SS_CONNECT_STARTED)
{
res=read(pInt->m_fd, &buf, 0);
// read of zero bytes should be a successful no-op, unless
// there was an error
if (res == -1)
{
pSocket->m_State=SS_UNCONNECTED;
PS_RESULT connErr=GetPS_RESULT(errno);
NET_LOG("Connect error: %s [%d:%s]", connErr, errno, strerror(errno));
pSocket->m_Error=connErr;
return true;
}
else
{
pSocket->m_State=SS_CONNECTED;
pSocket->m_Error=PS_OK;
}
}
return false;
}
// SocketWritable is called whenever a socket selects as writable in the unix
// select loop. This will call the callback after checking for a connect error
// if there's a connect in progress, as well as update the socket's state.
//
// Locking: The global mutex must be held when entering this function, and it
// will be held upon return.
void CSocketBase::SocketWritable(CSocketBase *pSock)
{
//CSocketInternal *pInt=pSock->m_pInternal;
bool isConnectError=false;
if (pSock->m_State != SS_CONNECTED)
isConnectError=ConnectError(pSock);
GLOBAL_UNLOCK();
if (isConnectError)
pSock->OnClose(pSock->m_Error);
else
pSock->OnWrite();
GLOBAL_LOCK();
}
// SocketReadable is called whenever a socket selects as writable in the unix
// select loop. This will call the callback after checking for a connect error
// if there's a connect in progress, as well as update the socket's state.
//
// Locking: The global mutex must be held when entering this function, and it
// will be held upon return.
void CSocketBase::SocketReadable(CSocketBase *pSock)
{
bool isError=false;
if (pSock->m_State == SS_CONNECT_STARTED)
isError=ConnectError(pSock);
else if (pSock->m_State == SS_UNCONNECTED)
{
// UNCONNECTED sockets don't get callbacks
// Note that server sockets that are bound have state==SS_CONNECTED
return;
}
else if (pSock->m_State != SS_UNCONNECTED)
{
uint nRead;
errno=0;
int res=ioctl(pSock->m_pInternal->m_fd, FIONREAD, &nRead);
// failure, errno=EINVAL means server socket
// success, nRead != 0 means alive stream socket
if ((res == -1 && errno != EINVAL) ||
(res == 0 && nRead == 0))
{
NET_LOG("RunWaitLoop:ioctl: Connection broken [%d:%s]", errno, strerror(errno));
// Don't use API function - we both hold a lock and
// it is unnecessary to SendWaitLoopUpdate at this
// stage
pSock->m_pInternal->m_Ops=0;
pSock->m_State=SS_UNCONNECTED;
if (errno)
pSock->m_Error=GetPS_RESULT(errno);
else
pSock->m_Error=PS_OK;
isError=true;
}
}
GLOBAL_UNLOCK();
if (isError)
pSock->OnClose(pSock->m_Error);
else
pSock->OnRead();
GLOBAL_LOCK();
}
void CSocketBase::RunWaitLoop()
{
int res;
signal(SIGPIPE, SIG_IGN);
// Create Control Pipe
res=pipe(g_SocketSetInternal.m_Pipe);
if (res != 0)
return;
// The lock is held upon entry and exit of this loop. There are a few places
// where the lock is released and then re-acquired: when calling callbacks
// and when calling select().
while (true)
{
std::map<int, CSocketBase *>::iterator it;
fd_set rfds;
fd_set wfds;
int fd_max=g_SocketSetInternal.m_Pipe[0];
// Prepare fd_set: Read
FD_ZERO(&rfds);
FD_SET(g_SocketSetInternal.m_Pipe[0], &rfds);
// Prepare fd_set: Write
FD_ZERO(&wfds);
it=g_SocketSetInternal.m_HandleMap.begin();
while (it != g_SocketSetInternal.m_HandleMap.end())
{
uint ops=it->second->m_pInternal->m_Ops;
if (ops && it->first > fd_max)
fd_max=it->first;
if (ops & READ)
FD_SET(it->first, &rfds);
if (ops & WRITE)
FD_SET(it->first, &wfds);
++it;
}
GLOBAL_UNLOCK();
// select, timeout infinite
res=select(fd_max+1, &rfds, &wfds, NULL, NULL);
GLOBAL_LOCK();
// Check select error
if (res == -1)
{
// It is possible for a socket to be deleted between preparing the
// fd_sets and actually performing the select - in which case it
// will fire a Bad file descriptor error. Simply retry.
if (Network_LastError == EBADF)
continue;
perror("CSocketSet::RunWaitLoop(), select");
continue;
}
// Check Control Pipe
if (FD_ISSET(g_SocketSetInternal.m_Pipe[0], &rfds))
{
char bt;
if (read(g_SocketSetInternal.m_Pipe[0], &bt, 1) == 1)
{
if (bt=='q')
break;
else if (bt=='r')
{
// Reload sockets - just skip to the beginning of the loop
continue;
}
}
FD_CLR(g_SocketSetInternal.m_Pipe[0], &rfds);
}
// Go through sockets
int i=-1;
while (++i <= fd_max)
{
if (!FD_ISSET(i, &rfds) && !FD_ISSET(i, &wfds))
continue;
it=g_SocketSetInternal.m_HandleMap.find(i);
if (it == g_SocketSetInternal.m_HandleMap.end())
continue;
CSocketBase *pSock=it->second;
if (FD_ISSET(i, &wfds))
{
SocketWritable(pSock);
}
// After the callback is called, we must check if the socket
// still exists (sockets may delete themselves in the callback)
it=g_SocketSetInternal.m_HandleMap.find(i);
if (it == g_SocketSetInternal.m_HandleMap.end())
continue;
if (FD_ISSET(i, &rfds))
{
SocketReadable(pSock);
}
}
}
// Close control pipe
close(g_SocketSetInternal.m_Pipe[0]);
close(g_SocketSetInternal.m_Pipe[1]);
return;
}
void CSocketBase::SendWaitLoopAbort()
{
char msg='q';
write(g_SocketSetInternal.m_Pipe[1], &msg, 1);
}
void CSocketBase::SendWaitLoopUpdate()
{
// NET_LOG("SendWaitLoopUpdate: fd %d, ops %u\n", m_pInternal->m_fd, m_pInternal->m_Ops);
char msg='r';
write(g_SocketSetInternal.m_Pipe[1], &msg, 1);
}
// Windows WindowProc for async event notification
#else // i.e. #if OS_WIN
void WaitLoop_SocketUpdateProc(int fd, int error, uint event)
{
GLOBAL_LOCK();
CSocketBase *pSock=g_SocketSetInternal.m_HandleMap[fd];
GLOBAL_UNLOCK();
// FIXME What if the fd isn't in the handle map?
if (error)
{
PS_RESULT res=GetPS_RESULT(error);
pSock->m_Error=res;
pSock->m_State=SS_UNCONNECTED;
if (res == PS_FAIL)
pSock->OnClose(CONNECTION_BROKEN);
return;
}
if (pSock->m_State==SS_CONNECT_STARTED)
{
pSock->m_Error=PS_OK;
pSock->m_State=SS_CONNECTED;
}
switch (event)
{
case FD_ACCEPT:
case FD_READ:
pSock->OnRead();
break;
case FD_CONNECT:
case FD_WRITE:
pSock->OnWrite();
break;
case FD_CLOSE:
// If FD_CLOSE and error, OnClose has already been called above
// with the appropriate PS_RESULT
pSock->m_State=SS_UNCONNECTED;
pSock->OnClose(PS_OK);
break;
}
}
LRESULT WINAPI WaitLoop_WindowProc(HWND hWnd, UINT msg, WPARAM wParam, LPARAM lParam)
{
//printf("WaitLoop_WindowProc(): Windows message: %d:%d:%d\n", msg, wParam, lParam);
switch (msg)
{
case MSG_SOCKET_READY:
{
int event=LOWORD(lParam);
int error=HIWORD(lParam);
WaitLoop_SocketUpdateProc((int)wParam, error?error-WSABASEERR:0, event);
return FALSE;
}
default:
return DefWindowProc(hWnd, msg, wParam, lParam);
}
}
// Locked on entry, must be locked on exit
void CSocketBase::RunWaitLoop()
{
int ret;
char errBuf[256];
MSG msg;
WNDCLASS wc;
ATOM atom;
memset(&wc, 0, sizeof(WNDCLASS));
wc.lpszClassName="Network Event WindowClass";
wc.lpfnWndProc=WaitLoop_WindowProc;
atom=RegisterClass(&wc);
if (!atom)
{
ret=GetLastError();
Network_GetErrorString(ret, errBuf, sizeof(errBuf));
NET_LOG("RegisterClass: %s [%d]", errBuf, ret);
return;
}
// Create message window
g_SocketSetInternal.m_hWnd=CreateWindow((LPCTSTR)atom, "Network Event Window", WS_POPUP, 0, 0, 0, 0, NULL, NULL, NULL, NULL);
if (!g_SocketSetInternal.m_hWnd)
{
ret=GetLastError();
Network_GetErrorString(ret, errBuf, sizeof(errBuf));
NET_LOG("CreateWindowEx: %s [%d]", errBuf, ret);
return;
}
// If OpMasks where set in another thread before we got this far,
// WSAAsyncSelect will need to be called again
std::map<int, CSocketBase *>::iterator it;
it=g_SocketSetInternal.m_HandleMap.begin();
while (it != g_SocketSetInternal.m_HandleMap.end())
{
it->second->SetOpMask(it->second->GetOpMask());
++it;
}
NET_LOG("Commencing message loop. hWnd %p", g_SocketSetInternal.m_hWnd);
GLOBAL_UNLOCK();
while ((ret=GetMessage(&msg, g_SocketSetInternal.m_hWnd, 0, 0))!=0)
{
//printf("RunWaitLoop(): Windows message: %d:%d:%d\n", msg.message, msg.wParam, msg.lParam);
if (ret == -1)
{
ret=GetLastError();
Network_GetErrorString(ret, errBuf, sizeof(errBuf));
NET_LOG("GetMessage: %s [%d]", errBuf, ret);
}
{
TranslateMessage(&msg);
DispatchMessage(&msg);
}
}
GLOBAL_LOCK();
g_SocketSetInternal.m_Thread=0;
// FIXME Leak: Destroy window
g_SocketSetInternal.m_hWnd=0;
NET_LOG("RunWaitLoop returning");
return;
}
void CSocketBase::SendWaitLoopAbort()
{
if (g_SocketSetInternal.m_hWnd)
{
PostMessage(g_SocketSetInternal.m_hWnd, WM_QUIT, 0, 0);
}
else
NET_LOG("SendWaitLoopUpdate: No WaitLoop Running.");
}
void CSocketBase::SendWaitLoopUpdate()
{
GLOBAL_LOCK();
if (g_SocketSetInternal.m_hWnd)
{
long wsaOps=FD_CLOSE;
if (m_pInternal->m_Ops & READ)
wsaOps |= FD_READ|FD_ACCEPT;
if (m_pInternal->m_Ops & WRITE)
wsaOps |= FD_WRITE|FD_CONNECT;
GLOBAL_UNLOCK();
//printf("SendWaitLoopUpdate: %d: %u %x -> %p\n", m_pInternal->m_fd, m_pInternal->m_Ops, wsaOps, g_SocketSetInternal.m_hWnd);
WSAAsyncSelect(m_pInternal->m_fd, g_SocketSetInternal.m_hWnd, MSG_SOCKET_READY, wsaOps);
}
else
{
//printf("SendWaitLoopUpdate: No WaitLoop Running.\n");
GLOBAL_UNLOCK();
}
}
#endif // #if OS_WIN
void CSocketBase::AbortWaitLoop()
{
SendWaitLoopAbort();
// pthread_join(g_SocketSetInternal.m_Thread);
}
uint CSocketBase::GetOpMask()
{
return m_pInternal->m_Ops;
}
void CSocketBase::SetOpMask(uint ops)
{
GLOBAL_LOCK();
g_SocketSetInternal.m_HandleMap[m_pInternal->m_fd]=this;
m_pInternal->m_Ops=ops;
/*printf("SetOpMask(fd %d, ops %u) %u\n",
m_pInternal->m_fd,
ops,
g_SocketSetInternal.m_HandleMap[m_pInternal->m_fd]->m_pInternal->m_Ops);*/
GLOBAL_UNLOCK();
SendWaitLoopUpdate();
}