#include "precompiled.h" #include "MessagePasserImpl.h" #include "Messages.h" #include "lib/timer.h" using namespace AtlasMessage; MessagePasserImpl::MessagePasserImpl() : m_Trace(false) { } void MessagePasserImpl::Add(IMessage* msg) { debug_assert(msg); debug_assert(msg->GetType() == IMessage::Message); if (m_Trace) debug_printf("%8.3f add message: %s\n", get_time(), msg->GetName()); m_Mutex.Lock(); m_Queue.push(msg); m_Mutex.Unlock(); } IMessage* MessagePasserImpl::Retrieve() { // (It should be fairly easy to use a more efficient thread-safe queue, // since there's only one thread adding items and one thread consuming; // but it's not worthwhile yet.) m_Mutex.Lock(); IMessage* msg = NULL; if (! m_Queue.empty()) { msg = m_Queue.front(); m_Queue.pop(); } m_Mutex.Unlock(); // if (m_Trace && msg) debug_printf("%8.3f retrieved message: %s\n", get_time(), msg->GetType()); return msg; } void MessagePasserImpl::Query(QueryMessage* qry) { debug_assert(qry); debug_assert(qry->GetType() == IMessage::Query); if (m_Trace) debug_printf("%8.3f add query: %s\n", get_time(), qry->GetName()); // Initialise a semaphore, so we can block until the query has been handled int err; sem_t sem; err = sem_init(&sem, 0, 0); if (err != 0) { // Probably-fatal error debug_warn("sem_init failed"); return; } qry->m_Semaphore = (void*)&sem; m_Mutex.Lock(); m_Queue.push(qry); m_Mutex.Unlock(); // Wait until the query handler has handled the query and called sem_post while (0 != (err = sem_wait(&sem))) { // Keep retrying while EINTR if (errno != EINTR) { // Other errors are probably fatal debug_warn("sem_wait failed"); return; } } // Clean up qry->m_Semaphore = NULL; sem_destroy(&sem); } bool MessagePasserImpl::IsEmpty() { m_Mutex.Lock(); bool empty = m_Queue.empty(); m_Mutex.Unlock(); return empty; } void MessagePasserImpl::SetTrace(bool t) { m_Trace = t; }