Developer documentation
Version 3.0.3-105-gd3941f44
Thread-safe queue

Functionality for thread-safe parallel processing of queued items. More...

Classes

class  MR::Thread::Queue< T >
 A first-in first-out thread-safe item queue. More...
 

Functions

template<class Item >
__Batch< Item > MR::Thread::batch (const Item &, size_t number=128)
 used to request batched processing of items More...
 
template<class Source , class Item , class Sink >
void MR::Thread::run_queue (Source &&source, const Item &item, Sink &&sink, size_t capacity=128)
 convenience function to set up and run a 2-stage multi-threaded pipeline. More...
 
template<class Source , class Item1 , class Pipe , class Item2 , class Sink >
void MR::Thread::run_queue (Source &&source, const Item1 &item1, Pipe &&pipe, const Item2 &item2, Sink &&sink, size_t capacity=128)
 convenience functions to set up and run a 3-stage multi-threaded pipeline. More...
 
template<class Source , class Item1 , class Pipe1 , class Item2 , class Pipe2 , class Item3 , class Sink >
void MR::Thread::run_queue (Source &&source, const Item1 &item1, Pipe1 &&pipe1, const Item2 &item2, Pipe2 &&pipe2, const Item3 &item3, Sink &&sink, size_t capacity=128)
 convenience functions to set up and run a 4-stage multi-threaded pipeline. More...
 

Detailed Description

Functionality for thread-safe parallel processing of queued items.

These functions and classes provide functionality for one or more source threads to feed items into a first-in first-out queue, and one or more sink threads to consume items. This pipeline can also extend over two queues, with one or more pipe threads consuming items of one type from the first queue, and feeding items of another type onto the second queue.

As a graphical representation of the pipeline, the following workflows can be achieved:

[source] \ / [sink]
[source] -- queue<item> -- [sink]
[source] / \ [sink]
.. ..
N_source N_sink

or for a deeper pipeline:

[source] \ / [pipe] \ / [sink]
[source] -- queue<item1> -- [pipe] -- queue<item2> -- [sink]
[source] / \ [pipe] / \ [sink]
.. .. ..
N_source N_pipe N_sink

By default, items are push to and pulled from the queue one by one. In situations where the amount of processing per item is small, items can be sent in batches to reduce the overhead of thread management (mutex locking/unlocking, etc).

The simplest way to use this functionality is via the Thread::run_queue() and associated Thread::multi() and Thread::batch() functions. In complex situations, it may be necessary to use the Thread::Queue class directly, although that should very rarely (if ever) be needed.

See also
Thread::run_queue()
Thread::Queue

Function Documentation

◆ batch()

template<class Item >
__Batch< Item > MR::Thread::batch ( const Item &  ,
size_t  number = 128 
)
inline

used to request batched processing of items

This function is used in combination with Thread::run_queue to request that the items object be processed in batches of number items (defaults to MRTRIX_QUEUE_DEFAULT_BATCH_SIZE).

See also
Thread::run_queue()

Definition at line 858 of file thread_queue.h.

◆ run_queue() [1/3]

template<class Source , class Item , class Sink >
void MR::Thread::run_queue ( Source &&  source,
const Item &  item,
Sink &&  sink,
size_t  capacity = 128 
)
inline

convenience function to set up and run a 2-stage multi-threaded pipeline.

This function (and its 3-stage equivalent Thread::run_queue(const Source&, const Item1&, const Pipe&, const Item2&, const Sink&, size_t)) simplify the process of setting up a multi-threaded processing chain that should meet most users' needs.

The arguments to this function correspond to an instance of the Source, the Sink, and optionally the Pipe functors, in addition to an instance of the Items to be passed through each stage of the pipeline - these are provided purely to specify the type of object to pass through the queue(s).

Functors

The 3 types of functors each have a specific purpose, and corresponding requirements as described below:

Source: the input functor
The Source class must at least provide the method:
bool operator() (Item& item);
Item item
This function prepares the item passed to it, and should return true if further items need to be processed, or false to signal that no further items are to be sent through the queue (at which point the corresponding thread(s) will exit).
Sink: the output functor
The Sink class must at least provide the method:
bool operator() (const Item& item);
This function processes the item passed to it, and should return true when ready to process further items, or false to signal the end of processing (at which point the corresponding thread(s) will exit).
Pipe: the processing functor (for 3-stage pipeline only)
The Pipe class must at least provide the method:
bool operator() (const Item1& item_in, Item2& item_out);
This function processes the item_in passed to it, and prepares item_out for the next stage of the pipeline. It should return true if the item processed is to be sent to the next stage in the pipeline, and false if it is to be discarded - note that this is very different from the other functors, where returning false signals end of processing.

Simple example

This is a simple demo application that generates a linear sequence of numbers and sums them up:

const size_t max_count;
// the functor that will generate the items:
class Source {
public:
Source () : count (0) { }
bool operator() (size_t& item) {
item = count++;
return count < max_count; // stop when max_count is reached
}
};
// the functor that will consume the items:
class Sink {
public:
Sink (size_t& total) :
grand_total (grand_total),
total (0) { }
~Sink () { // update grand_total in destructor
grand_total += total;
}
bool operator() (const size_t& item) {
total += item;
return true;
}
protected:
size_t& grand_total;
};
void run ()
{
size_t grand_total = 0;
Source source;
Sink sink (grand_total);
// run a single-source => single-sink pipeline:
Thread::run_queue (source, size_t(), sink);
}
__run< Functor >::type run(Functor &&functor, const std::string &name="unnamed")
Execute the functor's execute method in a separate thread.
Definition: thread.h:373
void run_queue(Source &&source, const Item &item, Sink &&sink, size_t capacity=128)
convenience function to set up and run a 2-stage multi-threaded pipeline.

Parallel execution of functors

If a functor is to be run over multiple parallel threads of execution, it should be wrapped in a call to Thread::multi() before being passed to the Thread::run_queue() functions. The Thread::run_queue() functions will then create additional instances of the relevant functor using its copy constructor; care should therefore be taken to ensure that the functor's copy constructor behaves appropriately.

For example, using the code above:

...
void run ()
{
...
// run a single-source => multi-sink pipeline:
Thread::run_queue (source, size_t(), Thread::multi (sink));
}
__Multi< typename std::remove_reference< Functor >::type > multi(Functor &&functor, size_t nthreads=threads_to_execute())
used to request multiple threads of the corresponding functor
Definition: thread.h:285

For the functor that is being multi-threaded, the default number of threads instantiated will depend on the "NumberOfThreads" entry in the MRtrix confugration file, or can be set at the command-line using the -nthreads option. This number can also be set as additional optional argument to Thread::multi().

Note that any functor can be parallelised in this way. In the example above, the Source functor could have been wrapped in Thread::multi() instead if this was the behaviour required:

...
void run ()
{
...
// run a multi-source => single-sink pipeline:
Thread::run_queue (Thread::multi (source), size_t(), sink);
}

Batching items

In cases where the amount of processing per item is small, the overhead of managing the concurrent access to the various queues from all the threads may become prohibitive (see Writing multi-threaded applications for details). In this case, it is a good idea to process the items in batches, which drastically reduces the number of accesses to the queue. This can be done by wrapping the items in a call to Thread::batch():

...
void run ()
{
...
// run a single-source => multi-sink pipeline on batches of size_t items:
Thread::run_queue (source, Thread::batch (size_t()), Thread::multi (sink));
}
__Batch< Item > batch(const Item &, size_t number=128)
used to request batched processing of items
Definition: thread_queue.h:858

By default, batches consist of MRTRIX_QUEUE_DEFAULT_BATCH_SIZE items (defined as 128). This can be set explicitly by providing the desired size as an additional argument to Thread::batch():

...
void run ()
{
...
// run a single-source => multi-sink pipeline on batches of 1024 size_t items:
Thread::run_queue (source, Thread::batch (size_t(), 1024), Thread::multi (sink));
}

Obviously, Thread::multi() and Thread::batch() can be used in any combination to perform the operations required.

Definition at line 1050 of file thread_queue.h.

◆ run_queue() [2/3]

template<class Source , class Item1 , class Pipe , class Item2 , class Sink >
void MR::Thread::run_queue ( Source &&  source,
const Item1 &  item1,
Pipe &&  pipe,
const Item2 &  item2,
Sink &&  sink,
size_t  capacity = 128 
)
inline

convenience functions to set up and run a 3-stage multi-threaded pipeline.

This function extends the 2-stage Thread::run_queue() function to allow a 3-stage pipeline. For example, using the example from Thread::run_queue(), the following would add an additional stage to the pipeline to double the numbers as they come through:

...
class Pipe {
public:
bool operator() (const size_t& item_in, size_t& item_out) {
item_out = 2 * item_in;
return true;
}
};
...
void run ()
{
...
// run a single-source => multi-pipe => single-sink pipeline on batches of size_t items:
source,
Thread::batch (size_t()),
Thread::batch (size_t()),
sink);
}

Note that the return value of the Pipe functor's operator() method is used in this case to signal whether or not the corresponding item should be sent through to the next stage (true) or discarded (false). This differs from the Source & Sink functors where the corresponding return value is used to signal end of processing.

As with the 2-stage pipeline, any functor can be executed in parallel (i.e. wrapped in Thread::multi()), Items do not need to be of the same type, and can be batched independently with any desired size.

Definition at line 1124 of file thread_queue.h.

◆ run_queue() [3/3]

template<class Source , class Item1 , class Pipe1 , class Item2 , class Pipe2 , class Item3 , class Sink >
void MR::Thread::run_queue ( Source &&  source,
const Item1 &  item1,
Pipe1 &&  pipe1,
const Item2 &  item2,
Pipe2 &&  pipe2,
const Item3 &  item3,
Sink &&  sink,
size_t  capacity = 128 
)
inline

convenience functions to set up and run a 4-stage multi-threaded pipeline.

This function extends the 2-stage Thread::run_queue() function to allow a 3-stage pipeline.

Definition at line 1168 of file thread_queue.h.