Developer documentation
Version 3.0.3-105-gd3941f44
MR::Thread::Queue< T > Class Template Reference

A first-in first-out thread-safe item queue. More...

#include "thread_queue.h"

Classes

class  Reader
 This class is used to register a reader with the queue. More...
 
class  Writer
 This class is used to register a writer with the queue. More...
 

Public Member Functions

 Queue (const std::string &description="unnamed", size_t buffer_size=128)
 Construct a Queue of items of type T. More...
 
 Queue (const Queue &)=delete
 
 Queue (Queue &&)=default
 
Queueoperator= (const Queue &)=delete
 
Queueoperator= (Queue &&)=default
 
 ~Queue ()
 
void status ()
 Print out a status report for debugging purposes. More...
 

Detailed Description

template<class T>
class MR::Thread::Queue< T >

A first-in first-out thread-safe item queue.

This class implements a thread-safe means of pushing data items into a queue, so that they can each be processed in one or more separate threads.

Note
In practice, it is almost always simpler to use the convenience function Thread::run_queue(). You should never need to use the Thread::Queue directly unless you have a very unusual situation.

Usage overview

Thread::Queue has somewhat unusual usage, which consists of the following steps:

  • Create an instance of a Thread::Queue
  • Create one or more instances of the corresponding Thread::Queue::Writer class, each constructed with a reference to the queue. Each of these instances will automatically notify the queue that its corresponding thread will be writing to the queue.
  • Create one or more instances of the corresponding Thread::Queue::Reader class, each constructed with a reference to the queue. Each of these instances will automatically notify the queue that its corresponding thread will be reading from the queue.
  • Launch all threads, one per instance of Thread::Queue::Writer or Thread::Queue::Reader. Note that one of these threads can be the current thread - simply invoke the respective functor's execute() method directly once all other threads have been launched.
  • Within the execute() method of each thread with a Thread::Queue::Writer:
    • create an instance of Thread::Queue::Writer::Item, constructed from the corresponding Thread::Queue::Writer;
    • perform processing in a loop:
      • prepare the item using pointer semantics (i.e. *item or item->method());
      • use the write() method of this class to write to the queue;
      • break out of loop if write() returns false.
    • when the execute() method returns, the destructor of the Thread::Queue::Writer::Item class will notify the queue that its thread has finished writing to the queue.
  • Within the execute() method of each thread with a Thread::Queue::Reader:
    • create an instance of Thread::Queue::Reader::Item, constructed from the corresponding Thread::Queue::Reader;
    • perform processing in a loop:
      • use the read() method of this class to read the next item from the queue;
      • break out of the loop if read() returns false;
      • process the item using pointer semantics (i.e. *item or item->method()).
    • when the execute() method returns, the destructor of the Thread::Queue::Reader::Item class will notify the queue that its thread has finished reading from the queue.
  • If all reader threads have returned, the queue will notify all writer threads that processing should stop, by returning false from the next write attempt.
  • If all writer threads have returned and no items remain in the queue, the queue will notify all reader threads that processing should stop, by returning false from the next read attempt.

The additional member classes are designed to be used in conjunction with the MRtrix multi-threading interface. In this system, each thread corresponds to an instance of a functor class, and its execute() method is the function that will be run within the thread (see Thread::Exec for details). For this reason:

  • The Thread::Queue instance is designed to be created before any of the threads.
  • The Thread::Queue::Writer and Thread::Queue::Reader classes are designed to be used as members of each functor, so that each functor must construct these classes from a reference to the queue within their own constructor. This ensures each thread registers their intention to read or write with the queue before their thread is launched.
  • The Thread::Queue::Writer::Item and Thread::Queue::Reader::Item classes are designed to be instantiated within each functor's execute() method. They must be constructed from a reference to a Thread::Queue::Writer or Thread::Queue::Reader respectively, ensuring no reads or write can take place without having registered with the queue. Their destructors will also unregister from the queue, ensuring that each thread unregisters as soon as the execute() method returns, and hence before the thread exits.

The Queue class performs all memory management for the items in the queue. For this reason, the items are accessed via the Writer::Item & Reader::Item classes. This allows items to be recycled once they have been processed, reducing overheads associated with memory allocation/deallocation.

Note
It is important that all instances of Thread::Queue::Writer and Thread::Queue::Reader are created before any of the threads are launched, to avoid any race conditions at startup.

The use of Thread::Queue is best illustrated with an example:

// the type of objects that will be sent through the queue:
class Item {
public:
...
// data members
...
};
// The use a typedef is recommended to help with readability (and typing!):
typedef Thread::Queue<Item> MyQueue;
// this class will write to the queue:
class Sender {
public:
// construct the 'writer' member in the constructor:
Sender (MyQueue& queue) : writer (queue) { }
void execute () {
// use a local instance of Thread::Queue<Item>::Writer::Item to write to the queue:
MyQueue::Writer::Item item (writer);
while (need_more_items()) {
...
// prepare item
*item = something();
item->set (something_else);
...
if (!item.write()) break; // break if write() returns false
}
}
private:
MyQueue::Writer writer;
};
// this class will read from the queue:
class Receiver {
public:
// construct the 'reader' member in the constructor:
Receiver (MyQueue& queue) : reader (queue) { }
void execute () {
// use a local instance of Thread::Queue<Item>::Reader::Item to read from the queue:
MyQueue::Reader::Item item (reader);
while ((item.read())) { // break when read() returns false
...
// process item
do_something (*item);
if (item->status()) report_error();
...
if (enough_items()) return;
}
}
private:
MyQueue::Reader reader;
};
// this is where the queue and threads are created:
void my_function () {
// create an instance of the queue:
MyQueue queue;
// create all functors from a reference to the queue:
Sender sender (queue);
Receiver receiver (queue);
// once all functors are created, launch their corresponding threads:
Thread::Exec sender_thread (sender);
Thread::Exec receiver_thread (receiver);
}
Item item

Rationale for the Writer, Reader, and Item member classes

The motivation for the use of additional member classes to perform the actual process of writing and reading to and from the queue is related to the need to keep track of the number of processes currently using the queue. This is essential to ensure that threads are notified when the queue is closed. This happens either when all readers have finished reading; or when all writers have finished writing and no items are left in the queue. This is complicated by the need to ensure that the various operations are called in the right order to avoid deadlocks.

There are essentially 4 operations that need to take place:

  • registering an intention to read/write from/to the queue
  • launching the corresponding thread
  • unregistering from the queue
  • terminating the thread

For proper multi-threaded operations, these operations must take place in the order above. Moreover, each operation must be completed for all users of the queue before any of them can perform the next operation. The use of additional member classes ensures that threads have to register their intention to read or write from the queue, and that they unregister from the queue once their processing is done.

While this could have been achieved simply with the appropriate member functions (i.e. register(), unregister(), read() & write() methods in the main Queue class), this places a huge burden on the developer to get it right. Using these member functions reduces the chance of coding errors, and in fact reduces the total amount of code that needs to be written to use the Queue in a safe manner.

The Item classes additionally simplify the memory management of the items in the queue, by preventing direct access to the underlying pointers, and ensuring the Queue itself is responsible for all allocation and deallocation of items as needed.

See also
Thread::run_queue()

Definition at line 334 of file thread_queue.h.

Constructor & Destructor Documentation

◆ Queue() [1/3]

template<class T >
MR::Thread::Queue< T >::Queue ( const std::string &  description = "unnamed",
size_t  buffer_size = 128 
)
inline

Construct a Queue of items of type T.

Parameters
descriptiona string identifying the queue for degugging purposes
buffer_sizethe maximum number of items that can be pushed onto the queue before blocking. If a thread attempts to push more data onto the queue when the queue already contains this number of items, the thread will block until at least one item has been popped. By default, the buffer size is MRTRIX_QUEUE_DEFAULT_CAPACITY items.

Definition at line 344 of file thread_queue.h.

◆ Queue() [2/3]

template<class T >
MR::Thread::Queue< T >::Queue ( const Queue< T > &  )
delete

◆ Queue() [3/3]

template<class T >
MR::Thread::Queue< T >::Queue ( Queue< T > &&  )
default

◆ ~Queue()

template<class T >
MR::Thread::Queue< T >::~Queue ( )
inline

Definition at line 361 of file thread_queue.h.

Member Function Documentation

◆ operator=() [1/2]

template<class T >
Queue & MR::Thread::Queue< T >::operator= ( const Queue< T > &  )
delete

◆ operator=() [2/2]

template<class T >
Queue & MR::Thread::Queue< T >::operator= ( Queue< T > &&  )
default

◆ status()

template<class T >
void MR::Thread::Queue< T >::status ( )
inline

Print out a status report for debugging purposes.

Definition at line 516 of file thread_queue.h.


The documentation for this class was generated from the following file: