Below I implemented an example (with synchronization). It looks more complicated than it is because I had to implement stuff around it so that it actually does something, but the core idea is really as above described above. Specifically, I followed the idea that the main thread delegates "path finding" to the worker threads. Here, path finding simply means filling up a queue of integers: For instance, the main thread requests a path from "5" to "8" by putting a queue containing the two numbers on the to_do_queue, and the worker thread pushes as result a queue containing "5,6,7,8" onto the done_queue, after doing some additional bogus work to keep the worker threads busy. The complete code follows again below, but I'll break it down and describe it first.
I start with the main function/thread.
int main(int argc, char* argv[])
{
work_queue<queue<int>, queue<int> > the_work_queue;
This instantiates the work_queue. The latter is a template class, and the two types define the data objects on the to_do_queue and on the done_queue, respectively. In this case, the data is paths, and both input and output paths are realised as standard integer queues.
Next, I am actually using multiple worker threads here (see below).
int nWorkers=3; //how many workers and threads
vector<queue_worker> workers;
queue_worker is the class that defines a worker thread. It provides a start function that will be passed to the actual thread, making the thread wait for work/paths to be pushed onto the work_queue. Then, it has a compute_path method that does the actual work. The latter is the only bit that would have to be reimplemented for a different application.
I use the above vector to store the multiple queue_workers that all get their own thread (in this case I could have used a single queue_worker instance, but never mind).
Next, having multiple threads, I use boost::thread_group for easy handling:
boost::thread_group workerThreads;
for (int iWorker=1;iWorker<=nWorkers;iWorker++)
{
In this loop I create nWorkers queue_worker instances and threads
queue_worker worker(&the_work_queue);
workers.push_back(worker);
The queue_worker constructor takes as argument a pointer to the work_queue, so that both main thread and worker thread can access the latter.
workerThreads.create_thread(boost::bind(&queue_worker::start, workers.back()));
//note that workers.back() is a reference (!) to the worker in the vector
}
workerThreads is the thread_group. One complication here. We had boost::thread my_thread(&hello_world) before. Now, when I actually want to pass an argument to the thread function, I can use boost::bind, e.g.
void hello_world(int x);
boost::thread my_thread(boost::bind(&hello_world,1234));
In the above case, I pass the start function of the queue_worker class, and an actual queue_worker instance as argument (from the vector that stores all the queue_workers). Each .createThread call creates a new thread and "starts" the queue_worker by calling its start method. This will make each worker wait on the
work_queue for work to do.
Now, next comes the part where the main thread creates work for the worker threads, and this part obviously would have to be implemented for the specific application at hand. Here, I just have the main thread create 5 paths repeatedly over 4 cycles. The main thread will push the 5 paths onto the work queue and then wait till they have been processed. In "real life", it could do other work instead.
int nCycles=4;
int nPaths=5;
for (int iCycle=1;iCycle<=nCycles;iCycle++)
{
Now five paths are created (just by selecting arbitrary pairs of integers). The paths are then pushed onto the work queue (the to_do queue, specifically), and will be immediately picked up by the waiting worker threads.
for (int iPath=1;iPath<=nPaths;iPath++)
{
queue<int> path;
path.push(iCycle*10);
path.push(iCycle*10+iPath);
the_work_queue.to_do_push(path);
}
Next, the main thread wait until the current batch of work is done. To this end I implemented a wait_all_done() function into the work_queue class.
the_work_queue.wait_all_done(); //wait for workers to finish batch
Essentially, the main thread now waits on a condition variable (internally). The work_queue will wake up the main thread after all work is done, which is implemented by checking if the count of to_do items so far equals the count of done items so far on the queue.
Below, this is just some output of the main thread whenever it received a batch of results:
cout<<"Retrieved paths:"<<endl;
queue<int> resultPath;
while(the_work_queue.done_try_pop(resultPath)) //false when empty
{
while(!resultPath.empty())
{
cout<<resultPath.front()<<",";
resultPath.pop();
}
cout<<endl;
}
}
After the cycles are over, the (part of the) program is supposed to finish. There is currently no method to kill running threads with boost other than to end the program (?) or to wait for them to finish. Here however they will never finish because they are just waiting on the empty work_queue. As a work around I implemented the queue_worker such that it returns whenever an empty path is pushed onto the to_do_queue (and the actual push will wake it up from its waiting).
queue<int> dummy;
while(the_work_queue.to_do_try_pop(dummy)){}; //emptying queue
queue<int> empty_path;
the_work_queue.to_do_push(empty_path); //signals the worker to finish
// wait for the worker threads to finish
workerThreads.join_all();
return 0;
}
That's it for the main thread. I haven't talked about the implementation of the work_queue and the queue_worker (code comes below), but I hope it could be usable in the above fashion even without knowing the details. The work_queue can be used as is. For the queue_worker, you would have to reimplement the function compute_paths(...) so that it does the actual work you want it to do, and change the data type of the work_queue pointer accordingly if you don't work on queue<int> ("paths" here). Also, if you don't use a data type that has an .empty() method, you would have to modify my work around that stops the thread when an empty datum is pushed (see start function).
work_queue class, copy&paste into work_queue.h:
#include <queue>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
/* based on
http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
*/
/* Provides two concurrently accessible queues for uncompleted and
completed work objects, and a condition variable based method to
wait until all the work is done. */
template<typename to_do_data, typename done_data>
class work_queue
{
private:
std::queue<to_do_data> to_do_queue;
std::queue<done_data> done_queue;
mutable boost::mutex to_do_mutex;
mutable boost::mutex done_mutex;
boost::condition to_do_empty_condition;
boost::condition done_empty_condition;
boost::condition all_done_condition;
long to_do_count;
long done_count;
public:
work_queue()
{
to_do_count=0;
done_count=0;
}
void to_do_push(to_do_data const& data)
{
boost::mutex::scoped_lock lock(to_do_mutex);
bool const was_empty=to_do_queue.empty();
to_do_queue.push(data);
to_do_count++;
lock.unlock();
if(was_empty)
{
to_do_empty_condition.notify_all();
}
}
bool to_do_empty() const
{
boost::mutex::scoped_lock lock(to_do_mutex);
return to_do_queue.empty();
}
bool to_do_try_pop(to_do_data& popped_value)
{
boost::mutex::scoped_lock lock(to_do_mutex);
if(to_do_queue.empty())
{
return false;
}
popped_value=to_do_queue.front();
to_do_queue.pop();
return true;
}
void to_do_wait_and_pop(to_do_data& popped_value)
{
boost::mutex::scoped_lock lock(to_do_mutex);
while(to_do_queue.empty())
{
to_do_empty_condition.wait(lock);
}
popped_value=to_do_queue.front();
to_do_queue.pop();
}
void done_push(done_data const& data)
{
boost::mutex::scoped_lock lock(done_mutex);
bool const was_empty=done_queue.empty();
done_queue.push(data);
done_count++;
lock.unlock();
if(done_count==to_do_count)
{
done_count=0;
to_do_count=0;
all_done_condition.notify_all();
}
if(was_empty)
{
done_empty_condition.notify_all();
}
}
bool done_empty() const
{
boost::mutex::scoped_lock lock(done_mutex);
return done_queue.empty();
}
bool done_try_pop(done_data& popped_value)
{
boost::mutex::scoped_lock lock(done_mutex);
if(done_queue.empty())
{
return false;
}
popped_value=done_queue.front();
done_queue.pop();
return true;
}
void done_wait_and_pop(done_data& popped_value)
{
boost::mutex::scoped_lock lock(done_mutex);
while(done_queue.empty())
{
done_empty_condition.wait(lock);
}
popped_value=done_queue.front();
done_queue.pop();
}
void wait_all_done()
{
boost::mutex::scoped_lock lock(done_mutex);
while(done_queue.empty())
{
all_done_condition.wait(lock);
}
}
};
The above example, including the queue_worker class:
#include "work_queue.h"
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <queue>
using namespace std;
class queue_worker
{
public:
queue_worker(work_queue<queue<int>, queue<int> >* ptr)
{
work_queue_ptr = ptr;
}
void start()
{
queue<int> path;
while(1)
{
work_queue_ptr->to_do_wait_and_pop(path); //call by reference!
//can't think of a better way to gracefully end the thread,
//because otherwise it will wait over an empty queue forever.
//Thus, I'm using the path itself as a signal. Pushing an empty
//path onto the work queue ends the worker thread.
if(path.empty())
{
work_queue_ptr->to_do_push(path); //for other worker threads
break;
}
//got data? Work!
compute_path(path);
work_queue_ptr->done_push(path);
}
}
private:
work_queue<queue<int>, queue<int> >* work_queue_ptr;
void compute_path(queue<int>& path)
{
float bogus_work=42;
for(long j=1; j<=100000;j++)
{
for(int k=1; k<=10000;k++)
{
bogus_work=bogus_work+0.00000001;
}
}
//finding a "path" by filling up integers
int p1;
int p2;
p2=path.front();
path.pop();
p1=path.front();
if (p2>p1)
{
for (int i=p1+1;i<p2;i++) path.push(i);
}
else
{
for (int i=p1-1;i>p2;i--) path.push(i);
}
path.push(p2);
}
};
int main(int argc, char* argv[])
{
work_queue<queue<int>, queue<int> > the_work_queue;
int nWorkers=3; //how many workers and threads
//here: could use one worker instance, but be careful if member data
//is changing in a worker specific way etc.
vector<queue_worker> workers;
boost::thread_group workerThreads;
for (int iWorker=1;iWorker<=nWorkers;iWorker++)
{
queue_worker worker(&the_work_queue);
workers.push_back(worker);
workerThreads.create_thread(boost::bind(&queue_worker::start, workers.back()));
//note that workers.back() is a reference (!) to the worker in the vector
}
int nCycles=4;
int nPaths=5;
for (int iCycle=1;iCycle<=nCycles;iCycle++)
{
for (int iPath=1;iPath<=nPaths;iPath++)
{
queue<int> path;
path.push(iCycle*10);
path.push(iCycle*10+iPath);
the_work_queue.to_do_push(path);
}
the_work_queue.wait_all_done(); //wait for workers to finish batch
cout<<"Retrieved paths:"<<endl;
queue<int> resultPath;
while(the_work_queue.done_try_pop(resultPath)) //false when empty
{
while(!resultPath.empty())
{
cout<<resultPath.front()<<",";
resultPath.pop();
}
cout<<endl;
}
}
queue<int> dummy;
while(the_work_queue.to_do_try_pop(dummy)){}; //emptying queue
queue<int> empty_path;
the_work_queue.to_do_push(empty_path); //signals the worker to finish
// wait for the worker threads to finish
workerThreads.join_all();
return 0;
}
Performance wise, of course you don't want to create arbitrary many worker threads because of a certain overhead caused by managing threads. However, from my tests it seems as if having more threads than you have processors can be worhwhile, and I went up to as many as 10 worker threads in the above example (5 of which can't even do work because there are only 5 paths per cycle) and it still was maximally fast.
ConclusionOkay I know this all sounds complicated. But I'd recommend for anyone who's interested in the matter to download the above example and try it out. To make it work on something you want it to, all you have to do is to change compute_path in the queue_worker, some data types, and main() accordingly.
For DF... well I don't really know. I don't know whether Toady has the nerve to try out this stuff, maybe not now. Maybe if he ever does an experiment analogous to Battle Champs or describes the code sections he's using, I or others could do the implementation for him and he tries it out (again, he seems to be busy with other stuff right now, so adding another non-content thing to his workload might not be in his interest). Btw, I would have tested the thing in BC but there wasn't a CPU intensive part analogous to the above problem around...
That's it for me, for now. Maybe this will be useful to someone at some point - and if only to myself