Use one semaphore for the process's lifetime, for efficiency.
Use a name that works on Linux. Try several random names to avoid conflicts with other processes. This was SVN commit r5315.
This commit is contained in:
parent
a22e73406a
commit
027c3c7cc0
@ -4,13 +4,54 @@
|
||||
#include "Messages.h"
|
||||
|
||||
#include "lib/timer.h"
|
||||
#include "lib/rand.h"
|
||||
|
||||
using namespace AtlasMessage;
|
||||
|
||||
|
||||
MessagePasserImpl::MessagePasserImpl()
|
||||
: m_Trace(false)
|
||||
: m_Trace(false), m_Semaphore(NULL)
|
||||
{
|
||||
int tries = 0;
|
||||
while (tries++ < 16) // some arbitrary cut-off point to avoid infinite loops
|
||||
{
|
||||
CStr name = "/wfg-atlas-msgpass-" + CStr(rand(100000, 1000000));
|
||||
sem_t* sem = sem_open(name, O_CREAT | O_EXCL, 0700, 0);
|
||||
if (sem == SEM_FAILED)
|
||||
{
|
||||
int err = errno;
|
||||
if (err == EEXIST)
|
||||
{
|
||||
// Semaphore already exists - try another one
|
||||
continue;
|
||||
}
|
||||
// Otherwise, it's a probably-fatal error
|
||||
debug_printf("errno: %d (%s)\n", err, strerror(err));
|
||||
debug_warn("sem_open failed");
|
||||
break;
|
||||
}
|
||||
// Succeeded - use this semaphore
|
||||
m_Semaphore = sem;
|
||||
m_SemaphoreName = name;
|
||||
break;
|
||||
}
|
||||
|
||||
if (! m_Semaphore)
|
||||
{
|
||||
debug_warn("Failed to create semaphore for Atlas - giving up");
|
||||
// We will probably crash later - maybe we could fall back on sem_init, if this
|
||||
// ever fails in practice
|
||||
}
|
||||
}
|
||||
|
||||
MessagePasserImpl::~MessagePasserImpl()
|
||||
{
|
||||
if (m_Semaphore)
|
||||
{
|
||||
// Clean up
|
||||
sem_close(m_Semaphore);
|
||||
sem_unlink(m_SemaphoreName);
|
||||
}
|
||||
}
|
||||
|
||||
void MessagePasserImpl::Add(IMessage* msg)
|
||||
@ -57,23 +98,8 @@ void MessagePasserImpl::Query(QueryMessage* qry, void(* UNUSED(timeoutCallback)
|
||||
if (m_Trace)
|
||||
debug_printf("%8.3f add query: %s\n", get_time(), qry->GetName());
|
||||
|
||||
// Initialise a semaphore, so we can block until the query has been handled
|
||||
int err = 0;
|
||||
sem_t* psem = (sem_t*) SEM_FAILED;
|
||||
//sem_t sem;
|
||||
//psem = &sem;
|
||||
//err = sem_init(psem, 0, 0);
|
||||
const char* sem_name = "/tmp/atlas";
|
||||
psem = sem_open(sem_name, O_CREAT, 0777, 0);
|
||||
err = errno;
|
||||
if (psem == (sem_t*) SEM_FAILED)
|
||||
{
|
||||
// Probably-fatal error
|
||||
debug_printf("errno: %d (%s)\n", err, strerror(err));
|
||||
debug_warn("sem_init failed");
|
||||
return;
|
||||
}
|
||||
qry->m_Semaphore = (void*)psem;
|
||||
// Set the semaphore, so we can block until the query has been handled
|
||||
qry->m_Semaphore = static_cast<void*>(m_Semaphore);
|
||||
|
||||
m_Mutex.Lock();
|
||||
m_Queue.push(qry);
|
||||
@ -113,7 +139,8 @@ void MessagePasserImpl::Query(QueryMessage* qry, void(* UNUSED(timeoutCallback)
|
||||
// while (0 != (err = sem_timedwait(psem, &abs_timeout)))
|
||||
// #endif
|
||||
|
||||
while (0 != (err = sem_wait(psem)))
|
||||
int err;
|
||||
while (0 != (err = sem_wait(m_Semaphore)))
|
||||
{
|
||||
// If timed out, call callback and try again
|
||||
// if (errno == ETIMEDOUT)
|
||||
@ -129,9 +156,6 @@ void MessagePasserImpl::Query(QueryMessage* qry, void(* UNUSED(timeoutCallback)
|
||||
|
||||
// Clean up
|
||||
qry->m_Semaphore = NULL;
|
||||
//sem_destroy(psem);
|
||||
sem_close(psem);
|
||||
sem_unlink(sem_name);
|
||||
}
|
||||
|
||||
bool MessagePasserImpl::IsEmpty()
|
||||
|
@ -1,12 +1,14 @@
|
||||
#include "MessagePasser.h"
|
||||
|
||||
#include "ps/ThreadUtil.h"
|
||||
#include "ps/CStr.h"
|
||||
#include <queue>
|
||||
|
||||
class MessagePasserImpl : public AtlasMessage::MessagePasser
|
||||
class MessagePasserImpl : public AtlasMessage::MessagePasser, boost::noncopyable
|
||||
{
|
||||
public:
|
||||
MessagePasserImpl();
|
||||
~MessagePasserImpl();
|
||||
virtual void Add(AtlasMessage::IMessage* msg);
|
||||
virtual AtlasMessage::IMessage* Retrieve();
|
||||
virtual void Query(AtlasMessage::QueryMessage* qry, void(*timeoutCallback)());
|
||||
@ -17,6 +19,8 @@ public:
|
||||
|
||||
private:
|
||||
CMutex m_Mutex;
|
||||
CStr m_SemaphoreName;
|
||||
sem_t* m_Semaphore;
|
||||
std::queue<AtlasMessage::IMessage*> m_Queue;
|
||||
bool m_Trace;
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user