Code for my message processor (if anyone who knows more C++ than I do can shoot holes in this, please do, I'm still hobbying).
The Message processor is the base class for new threads, contains a message queue and a loop, and will process its action queue every loop.
#pragma once
#include "TAction.h"
#include "TActionQueue.h"
#include <boost/thread/thread_time.hpp>
#include <boost/thread/thread.hpp>
class TController;
class TMessageProcessor {
public:
TMessageProcessor();
~TMessageProcessor();
void pushAction(TAction_ptr a); // push action onto our list
void processQueue(); // process our entire current queue
void processAction(TAction_ptr a); // process single action
void startThread();
void startPoint(); // this is called from the new thread. It calls start() and startLoop()
virtual void start() = 0;
virtual void startLoop();
virtual void waitForFrame();
TController* getController();
void setController(TController* t);
virtual void tick() = 0; // "do stuff", override this function, gets called every frame
protected:
TAction_ptr popAction(); // pop an action from the list, returns null if empty
TController* controller;
private:
boost::thread thread;
TActionQueue actions;
boost::posix_time::time_duration waitTime; // time to wait between frames
boost::posix_time::ptime nextFrame; // next time frame
boost::posix_time::ptime getCurrentTime();
};
#include "TMessageProcessor.h"
#include "TAction.h"
#include "TController.h"
TMessageProcessor::TMessageProcessor() {
waitTime = boost::posix_time::millisec(5);
nextFrame = getCurrentTime();
}
TMessageProcessor::~TMessageProcessor() {
}
// Start the new thread
void TMessageProcessor::startThread(){
thread = boost::thread(&TMessageProcessor::startPoint, this);
}
void TMessageProcessor::startPoint(){
start();
startLoop();
}
// push action onto our list
void TMessageProcessor::pushAction(TAction_ptr a){
actions.push(a);
}
// pop an action from the list, returns null if empty
TAction_ptr TMessageProcessor::popAction(){
return actions.pop();
}
// process our entire current queue
void TMessageProcessor::processQueue(){
TAction_ptr a;
actions.startRead();
while(!actions.isEmpty()){
a = popAction();
processAction(a);
}
}
// process single action
void TMessageProcessor::processAction(TAction_ptr a){
a->execute(this);
}
// This starts looping.
void TMessageProcessor::startLoop(){
while(controller->getSimulation()->getCurrentState() != TSimulation::SHUTDOWN) {
processQueue();
tick();
waitForFrame();
}
}
// wait for the next frame, if we're too fast
void TMessageProcessor::waitForFrame(){
boost::posix_time::time_duration d = nextFrame - getCurrentTime();
if(d.total_milliseconds() > 0){
boost::this_thread::sleep(d);
}
nextFrame = getCurrentTime() + waitTime;
}
// Current time
boost::posix_time::ptime TMessageProcessor::getCurrentTime(){
return boost::get_system_time();
}
// set the controller
void TMessageProcessor::setController(TController* t){
controller = t;
}
// Get the controller
TController* TMessageProcessor::getController(){
return controller;
}
The actionqueue is a double deque of actions, it reads from one queue and writes to the other, and switches them when you start reading (explicitly tell it startRead()). This means you can read and write to the same TActionQueue at the same time, without blocking, and that reading a queue is guaranteed to end.
#pragma once
#include "TAction.h"
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <deque>
class TActionQueue {
public:
TActionQueue();
void push(TAction_ptr a);
TAction_ptr pop();
void startRead();
bool isEmpty();
private:
boost::interprocess::interprocess_mutex mutex;
std::deque<TAction_ptr> actions1;
std::deque<TAction_ptr> actions2;
std::deque<TAction_ptr>* writeQueue;
std::deque<TAction_ptr>* readQueue;
};
#include "TActionQueue.h"
#include <deque>
#include <boost/interprocess/sync/scoped_lock.hpp>
TActionQueue::TActionQueue() {
writeQueue = &actions1;
readQueue = &actions2;
}
TAction_ptr TActionQueue::pop(){
TAction_ptr t = readQueue->front();
readQueue->pop_front();
return t;
}
void TActionQueue::startRead(){
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex);
// switch the queues.
std::deque<TAction_ptr>* t = readQueue;
readQueue = writeQueue;
writeQueue = t;
}
void TActionQueue::push(TAction_ptr a){
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex);
writeQueue->push_back(a);
}
bool TActionQueue::isEmpty(){
return readQueue->empty();
}
The action Base Class. Pretty empty, but we need it for the polymorphism.
#pragma once
#include <stdexcept>
#include <boost/shared_ptr.hpp>
#include <boost/function.hpp>
class TMessageProcessor;
class NotReadyException : public std::runtime_error {
public:
NotReadyException() : std::runtime_error("Requested value was not ready yet.") { }
};
class TAction {
public:
TAction(){}
~TAction(){}
virtual void execute(TMessageProcessor* t) = 0;
};
// Always use shared pointers for threadsafety.
typedef boost::shared_ptr<TAction> TAction_ptr;
Action example: generic get value. Check with returnReady to see if its done yet.
#pragma once
#include "TAction.h"
template <class R>
class TActionGetValue : public TAction {
public:
TActionGetValue(boost::function<R()> f) : returnReady(false), getFunc(f){} // init with pointer-to-getfunction
R getValue(){ // get the returnvalue, ONLY CALL WHEN returnReady == true!
if(!returnReady) throw NotReadyException();
return returnVal;
}
void execute(TMessageProcessor* t){
returnVal = getFunc();
returnReady = true;
}
bool isReady(){ return returnReady; }
private:
bool returnReady; // is the return value ready?
boost::function<R()> getFunc;
R returnVal;
};
So for example the GraphicsManager and the World class extend TMessageProcessor, are members of the controller and are started through that controller (in main) like this:
void TController::start(){
world.setController(this);
graphics.setController(this);
world.startThread();
graphics.startThread();
}
And they each have their own main loop.
An example of an Action:
class TGraphicActionAddObject : public TAction {
public:
NObject_ptr a;
TGraphicActionAddObject(NObject_ptr t);
void execute(TMessageProcessor* t);
};
/* Add Object */
TGraphicActionAddObject::TGraphicActionAddObject(NObject_ptr t){
a = t;
}
void TGraphicActionAddObject::execute(TMessageProcessor* t){
NGraphicsManager* g = dynamic_cast<NGraphicsManager*>(t);
if(g){
g->addObject(a);
}
}
To send a message I do:
NObject_ptr t(new Player());
TAction_ptr a(new TGraphicActionAddObject(t)); // create the action
getController()->getGraphics()->pushAction(a); // send the action to the graphicsmanager