Developer documentation
Version 3.0.3-105-gd3941f44
thread_queue.h
Go to the documentation of this file.
1/* Copyright (c) 2008-2022 the MRtrix3 contributors.
2 *
3 * This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
6 *
7 * Covered Software is provided under this License on an "as is"
8 * basis, without warranty of any kind, either expressed, implied, or
9 * statutory, including, without limitation, warranties that the
10 * Covered Software is free of defects, merchantable, fit for a
11 * particular purpose or non-infringing.
12 * See the Mozilla Public License v. 2.0 for more details.
13 *
14 * For more details, see http://www.mrtrix.org/.
15 */
16
17#ifndef __mrtrix_thread_queue_h__
18#define __mrtrix_thread_queue_h__
19
20#include <stack>
21#include <condition_variable>
22
23#include "exception.h"
24#include "memory.h"
25#include "thread.h"
26
27#define MRTRIX_QUEUE_DEFAULT_CAPACITY 128
28#define MRTRIX_QUEUE_DEFAULT_BATCH_SIZE 128
29
30namespace MR
31{
32 namespace Thread
33 {
34
35
36 //* \cond skip
37 namespace {
38
39 // to get multi/single job/functor seamlessly:
40 template <class X>
41 class __job { NOMEMALIGN
42 public:
43 using type = typename std::remove_reference<X>::type;
44 using member_type = typename std::remove_reference<X>::type&;
45 static X& functor (X& job) { return job; }
46
47 template <class SingleFunctor>
48 static SingleFunctor& get (X& /*f*/, SingleFunctor& functor) {
49 return functor;
50 }
51 };
52
53 template <class X>
54 class __job <__Multi<X>> { NOMEMALIGN
55 public:
56 using type = typename std::remove_reference<X>::type;
57 using member_type = typename std::remove_reference<X>::type;
58 static X& functor (__Multi<X>& job) { return job.functor; }
59
60 template <class SingleFunctor>
61 static __Multi<SingleFunctor> get (__Multi<X>& f, SingleFunctor& functor) {
62 return __Multi<SingleFunctor> (functor, f.num);
63 }
64 };
65
66
67 }
68
70
71
72
73
129
334 template <class T> class Queue { NOMEMALIGN
335 public:
337
344 Queue (const std::string& description = "unnamed", size_t buffer_size = MRTRIX_QUEUE_DEFAULT_CAPACITY) :
345 buffer (new T* [buffer_size]),
346 front (buffer),
347 back (buffer),
348 capacity (buffer_size),
349 writer_count (0),
350 reader_count (0),
351 name (description) {
352 assert (capacity > 0);
353 }
354
355 Queue (const Queue&) = delete;
356 Queue (Queue&&) = default;
357 Queue& operator= (const Queue&) = delete;
358 Queue& operator= (Queue&&) = default;
359
360
362 delete [] buffer;
363 }
364
366
376 public:
378
380 Writer (Queue<T>& queue) : Q (queue) {
381 Q.register_writer();
382 }
383 Writer (const Writer& W) : Q (W.Q) {
384 Q.register_writer();
385 }
386
388
396 public:
398
405 Item (const Writer& writer) : Q (writer.Q), p (Q.get_item()) { }
407 ~Item () {
408 Q.unregister_writer();
409 }
410
411 using item_type = T;
412
415 return Q.push (p);
416 }
417 FORCE_INLINE T& operator*() const throw () {
418 return *p;
419 }
420 FORCE_INLINE T* operator->() const throw () {
421 return p;
422 }
423 private:
424 Queue<T>& Q;
425 T* p;
426 };
427
428 Item placeholder () const { return Item (*this); }
429
430 private:
431 Queue<T>& Q;
432 };
433
434
436
446 public:
448
450 Reader (Queue<T>& queue) : Q (queue) {
451 Q.register_reader();
452 }
453 Reader (const Reader& reader) : Q (reader.Q) {
454 Q.register_reader();
455 }
456
458
466 public:
468
475 Item (const Reader& reader) : Q (reader.Q), p (nullptr) { }
477 ~Item () {
478 Q.unregister_reader();
479 }
480
481 using item_type = T;
482
485 return Q.pop (p);
486 }
487 FORCE_INLINE T* stash () throw () {
488 T* item = p;
489 p = nullptr;
490 return item;
491 }
492 FORCE_INLINE void recycle (T* item) const throw () {
493 Q.recycle (item);
494 }
495 FORCE_INLINE T& operator*() const throw () {
496 return *p;
497 }
498 FORCE_INLINE T* operator->() const throw () {
499 return p;
500 }
501 FORCE_INLINE bool operator! () const throw () {
502 return !p;
503 }
504 private:
505 Queue<T>& Q;
506 T* p;
507 };
508
509 Item placeholder () const { return Item (*this); }
510
511 private:
512 Queue<T>& Q;
513 };
514
516 void status () {
517 std::lock_guard<std::mutex> lock (mutex);
518 std::cerr << "Thread::Queue \"" + name + "\": "
519 << writer_count << " writer" << (writer_count > 1 ? "s" : "") << ", "
520 << reader_count << " reader" << (reader_count > 1 ? "s" : "") << ", items waiting: " << size() << "\n";
521 }
522
523
524 private:
525 std::mutex mutex;
526 std::condition_variable more_data, more_space;
527 T** buffer;
528 T** front;
529 T** back;
530 size_t capacity;
531 size_t writer_count, reader_count;
532 std::stack<T*,vector<T*> > item_stack;
534 std::string name;
535
536 void register_writer () {
537 std::lock_guard<std::mutex> lock (mutex);
538 ++writer_count;
539 }
540 void unregister_writer () {
541 std::lock_guard<std::mutex> lock (mutex);
542 assert (writer_count);
543 --writer_count;
544 if (!writer_count) {
545 DEBUG ("no writers left on queue \"" + name + "\"");
546 more_data.notify_all();
547 }
548 }
549 void register_reader () {
550 std::lock_guard<std::mutex> lock (mutex);
551 ++reader_count;
552 }
553 void unregister_reader () {
554 std::lock_guard<std::mutex> lock (mutex);
555 assert (reader_count);
556 --reader_count;
557 if (!reader_count) {
558 DEBUG ("no readers left on queue \"" + name + "\"");
559 more_space.notify_all();
560 }
561 }
562
563 FORCE_INLINE bool empty () const {
564 return (front == back);
565 }
566 FORCE_INLINE bool full () const {
567 return (inc (back) == front);
568 }
569 FORCE_INLINE size_t size () const {
570 return ( (back < front ? back+capacity : back) - front);
571 }
572
573 FORCE_INLINE T* get_item () {
574 std::lock_guard<std::mutex> lock (mutex);
575 T* item (new T);
576 items.push_back (std::unique_ptr<T> (item));
577 return item;
578 }
579
580 FORCE_INLINE bool push (T*& item) {
581 std::unique_lock<std::mutex> lock (mutex);
582 more_space.wait (lock, [this]{ return !(full() && reader_count); });
583 if (!reader_count) return false;
584 *back = item;
585 back = inc (back);
586 if (item_stack.empty()) {
587 item = new T;
588 items.push_back (std::unique_ptr<T> (item));
589 }
590 else {
591 item = item_stack.top();
592 item_stack.pop();
593 }
594 more_data.notify_one();
595 return true;
596 }
597
598 FORCE_INLINE bool pop (T*& item) {
599 std::unique_lock<std::mutex> lock (mutex);
600 if (item)
601 item_stack.push (item);
602 item = nullptr;
603 more_data.wait (lock, [this]{ return !(empty() && writer_count); });
604 if (empty() && !writer_count)
605 return false;
606 item = *front;
607 front = inc (front);
608 more_space.notify_one();
609 return true;
610 }
611
612 FORCE_INLINE void recycle (T*& item) {
613 std::unique_lock<std::mutex> lock (mutex);
614 if (item)
615 item_stack.push (item);
616 }
617
618 FORCE_INLINE T** inc (T** p) const {
619 ++p;
620 if (p >= buffer + capacity) p = buffer;
621 return p;
622 }
623 };
624
625
626
627
628
629
630 //* \cond skip
631
632 namespace {
633 /********************************************************************
634 * convenience Functor classes for use in Thread::run_queue()
635 ********************************************************************/
636 template <class Item>
637 struct __Batch { NOMEMALIGN
638 __Batch (size_t number) : num (number) { }
639 size_t num;
640 };
641
642
643
644 template <class Item> struct __batch_size { NOMEMALIGN
645 __batch_size (const Item&) { }
646 operator size_t () const { return 0; }
647 };
648 template <class Item> struct __batch_size <__Batch<Item>> { NOMEMALIGN
649 __batch_size (const __Batch<Item>& item) : n (item.num) { }
650 operator size_t () const { return n; }
651 const size_t n;
652 };
653
654
655
659 template <class Item> struct Type { NOMEMALIGN
660 using item = Item;
661 using queue = Queue<Item>;
662 using reader = typename queue::Reader;
663 using writer = typename queue::Writer;
664 using read_item = typename reader::Item;
665 using write_item = typename writer::Item;
666 };
667
668 template <class Item> struct Type<__Batch<Item>> { NOMEMALIGN
669 using item = Item;
670 using queue = Queue<vector<Item>>;
671 using reader = typename queue::Reader;
672 using writer = typename queue::Writer;
673 using read_item = typename reader::Item;
674 using write_item = typename writer::Item;
675 };
676
677
678
679 template <class Item>
680 struct FetchItem { NOMEMALIGN
681 FetchItem (typename Type<Item>::reader& item) : in (item.placeholder()) { }
682 bool read () { return in.read(); }
683 Item& value () { return (*in); }
684 typename Type<Item>::read_item in;
685 };
686
687 template <class Item>
688 struct FetchItem<__Batch<Item>> { NOMEMALIGN
689 FetchItem (typename Type<__Batch<Item>>::reader& in) : in (in.placeholder()), n (0) { }
690 bool read () {
691 if (!in)
692 return in.read();
693 ++n;
694 if (n >= in->size()) {
695 if (!in.read())
696 return false;
697 n = 0;
698 }
699 return true;
700 }
701 Item& value () { return (*in)[n]; }
702 typename Type<__Batch<Item>>::read_item in;
703 size_t n;
704 };
705
706
707
708
709
710 template <class Item>
711 struct StoreItem { NOMEMALIGN
712 StoreItem (size_t, typename Type<Item>::writer& item) : out (item.placeholder()) { }
713 bool write () { return out.write(); }
714 Item& value () { return (*out); }
715 bool flush () { return true; }
716 typename Type<Item>::write_item out;
717 };
718
719 template <class Item>
720 struct StoreItem<__Batch<Item>> { NOMEMALIGN
721 StoreItem (size_t batch_size, typename Type<__Batch<Item>>::writer& item) :
722 out (item.placeholder()), batch_size (batch_size), n(0) { out->resize (batch_size); }
723 bool write () {
724 ++n;
725 if (n >= batch_size) {
726 n = 0;
727 if (!out.write())
728 return false;
729 out->resize (batch_size);
730 }
731 return true;
732 }
733 Item& value () { return (*out)[n]; }
734 void flush () { if (n) { out->resize (n); out.write(); } }
735 typename Type<__Batch<Item>>::write_item out;
736 const size_t batch_size;
737 size_t n;
738 };
739
740
741
742
743 template <class Item, class Functor>
744 struct __Source { MEMALIGN(__Source<Item,Functor>)
745 using item_t = typename Type<Item>::item;
746 using queue_t = typename Type<Item>::queue;
747 using writer_t = typename Type<Item>::writer;
748 using functor_t = typename __job<Functor>::member_type;
749
750 writer_t writer;
751 functor_t func;
752 size_t batch_size;
753
754 __Source (queue_t& queue, Functor& functor, const Item& item) :
755 writer (queue),
756 func (__job<Functor>::functor (functor)),
757 batch_size (__batch_size<Item> (item)) { }
758
759 void execute () {
760 auto out = StoreItem<Item> (batch_size, writer);
761 do {
762 if (!func (out.value()))
763 break;
764 } while (out.write());
765 out.flush();
766 }
767 };
768
769
770
771
772
773
774
775 template <class Item1, class Functor, class Item2>
776 struct __Pipe { MEMALIGN(__Pipe<Item1,Functor,Item2>)
777 using item1_t = typename Type<Item1>::item;
778 using item2_t = typename Type<Item2>::item;
779 using queue1_t = typename Type<Item1>::queue;
780 using queue2_t = typename Type<Item2>::queue;
781 using reader_t = typename Type<Item1>::reader;
782 using writer_t = typename Type<Item2>::writer;
783 using functor_t = typename __job<Functor>::member_type;
784
785 reader_t reader;
786 writer_t writer;
787 functor_t func;
788 const size_t batch_size;
789
790 __Pipe (queue1_t& queue_in, Functor& functor, queue2_t& queue_out, const Item2& item2) :
791 reader (queue_in),
792 writer (queue_out),
793 func (__job<Functor>::functor (functor)),
794 batch_size (__batch_size<Item2> (item2)) { }
795
796 void execute () {
797 auto in = FetchItem<Item1> (reader);
798 auto out = StoreItem<Item2> (batch_size, writer);
799 while (in.read()) {
800 if (func (in.value(), out.value())) {
801 if (!out.write())
802 break;
803 }
804 }
805 out.flush();
806 }
807
808 };
809
810
811
812
813
814
815 template <class Item, class Functor>
816 struct __Sink { MEMALIGN(__Sink<Item,Functor>)
817 using item_t = typename Type<Item>::item;
818 using queue_t = typename Type<Item>::queue;
819 using reader_t = typename Type<Item>::reader;
820 using functor_t = typename __job<Functor>::member_type;
821
822 reader_t reader;
823 functor_t func;
824
825 __Sink (queue_t& queue, Functor& functor) :
826 reader (queue),
827 func (__job<Functor>::functor (functor)) { }
828
829 void execute () {
830 auto in = FetchItem<Item> (reader);
831 while (in.read()) {
832 if (!func (in.value()))
833 return;
834 }
835 }
836 };
837
838
839
840
841
842 }
843
844
846
847
848
849
850
851
853
857 template <class Item>
858 inline __Batch<Item> batch (const Item&, size_t number = MRTRIX_QUEUE_DEFAULT_BATCH_SIZE)
859 {
860 return __Batch<Item> (number);
861 }
862
863
864
865
866
867
869
1049 template <class Source, class Item, class Sink>
1050 inline void run_queue (
1051 Source&& source,
1052 const Item& item,
1053 Sink&& sink,
1054 size_t capacity = MRTRIX_QUEUE_DEFAULT_CAPACITY)
1055 {
1056 if (threads_to_execute() == 0) {
1057 typename Type<Item>::item item;
1058 while (__job<Source>::functor (source) (item))
1059 if (!__job<Sink>::functor (sink) (item))
1060 return;
1061 return;
1062 }
1063
1064 typename Type<Item>::queue queue ("source->sink", capacity);
1065 __Source<Item,Source> source_functor (queue, source, item);
1066 __Sink<Item,Sink> sink_functor (queue, sink);
1067
1068 auto t1 = run (__job<Source>::get (source, source_functor), "source");
1069 auto t2 = run (__job<Sink>::get (sink, sink_functor), "sink");
1070
1071 t1.wait();
1072 t2.wait();
1073
1075 }
1076
1077
1078
1080
1123 template <class Source, class Item1, class Pipe, class Item2, class Sink>
1124 inline void run_queue (
1125 Source&& source,
1126 const Item1& item1,
1127 Pipe&& pipe,
1128 const Item2& item2,
1129 Sink&& sink,
1130 size_t capacity = MRTRIX_QUEUE_DEFAULT_CAPACITY)
1131 {
1132 if (threads_to_execute() == 0) {
1133 typename Type<Item1>::item item1;
1134 typename Type<Item2>::item item2;
1135 while (__job<Source>::functor (source) (item1)) {
1136 if (__job<Pipe>::functor (pipe) (item1, item2))
1137 if (!__job<Sink>::functor (sink) (item2))
1138 return;
1139 }
1140 return;
1141 }
1142
1143
1144 typename Type<Item1>::queue queue1 ("source->pipe", capacity);
1145 typename Type<Item2>::queue queue2 ("pipe->sink", capacity);
1146
1147 __Source<Item1,Source> source_functor (queue1, source, item1);
1148 __Pipe<Item1,Pipe,Item2> pipe_functor (queue1, pipe, queue2, item2);
1149 __Sink<Item2,Sink> sink_functor (queue2, sink);
1150
1151 auto t1 = run (__job<Source>::get (source, source_functor), "source");
1152 auto t2 = run (__job<Pipe>::get (pipe, pipe_functor), "pipe");
1153 auto t3 = run (__job<Sink>::get (sink, sink_functor), "sink");
1154
1155 t1.wait();
1156 t2.wait();
1157 t3.wait();
1158
1160 }
1161
1162
1163
1165
1167 template <class Source, class Item1, class Pipe1, class Item2, class Pipe2, class Item3, class Sink>
1168 inline void run_queue (
1169 Source&& source,
1170 const Item1& item1,
1171 Pipe1&& pipe1,
1172 const Item2& item2,
1173 Pipe2&& pipe2,
1174 const Item3& item3,
1175 Sink&& sink,
1176 size_t capacity = MRTRIX_QUEUE_DEFAULT_CAPACITY)
1177 {
1178 if (threads_to_execute() == 0) {
1179 typename Type<Item1>::item item1;
1180 typename Type<Item2>::item item2;
1181 typename Type<Item3>::item item3;
1182 while (__job<Source>::functor (source) (item1)) {
1183 if (__job<Pipe1>::functor (pipe1) (item1, item2))
1184 if (__job<Pipe2>::functor (pipe2) (item2, item3))
1185 if (!__job<Sink>::functor (sink) (item3))
1186 return;
1187 }
1188 return;
1189 }
1190
1191
1192 typename Type<Item1>::queue queue1 ("source->pipe", capacity);
1193 typename Type<Item2>::queue queue2 ("pipe->pipe", capacity);
1194 typename Type<Item3>::queue queue3 ("pipe->sink", capacity);
1195
1196 __Source<Item1,Source> source_functor (queue1, source, item1);
1197 __Pipe<Item1,Pipe1,Item2> pipe1_functor (queue1, pipe1, queue2, item2);
1198 __Pipe<Item2,Pipe2,Item3> pipe2_functor (queue2, pipe2, queue3, item3);
1199 __Sink<Item3,Sink> sink_functor (queue3, sink);
1200
1201 auto t1 = run (__job<Source>::get (source, source_functor), "source");
1202 auto t2 = run (__job<Pipe1>::get (pipe1, pipe1_functor), "pipe1");
1203 auto t3 = run (__job<Pipe2>::get (pipe2, pipe2_functor), "pipe2");
1204 auto t4 = run (__job<Sink>::get (sink, sink_functor), "sink");
1205
1206 t1.wait();
1207 t2.wait();
1208 t3.wait();
1209 t4.wait();
1210
1212 }
1213
1214
1217 }
1218}
1219
1220#endif
1221
1222
This class is used to read items from the queue.
Definition: thread_queue.h:465
void recycle(T *item) const
Definition: thread_queue.h:492
Item(const Reader &reader)
Construct a Reader::Item object.
Definition: thread_queue.h:475
bool read()
Get next item from the queue.
Definition: thread_queue.h:484
~Item()
Unregister the parent Reader from the queue.
Definition: thread_queue.h:477
This class is used to register a reader with the queue.
Definition: thread_queue.h:445
Reader(Queue< T > &queue)
Register a Reader object with the queue.
Definition: thread_queue.h:450
Reader(const Reader &reader)
Definition: thread_queue.h:453
This class is used to write items to the queue.
Definition: thread_queue.h:395
bool write()
Push the item onto the queue.
Definition: thread_queue.h:414
Item(const Writer &writer)
Construct a Writer::Item object.
Definition: thread_queue.h:405
~Item()
Unregister the parent Writer from the queue.
Definition: thread_queue.h:407
This class is used to register a writer with the queue.
Definition: thread_queue.h:375
Writer(const Writer &W)
Definition: thread_queue.h:383
Writer(Queue< T > &queue)
Register a Writer object with the queue.
Definition: thread_queue.h:380
A first-in first-out thread-safe item queue.
Definition: thread_queue.h:334
void status()
Print out a status report for debugging purposes.
Definition: thread_queue.h:516
Queue & operator=(const Queue &)=delete
Queue(Queue &&)=default
Queue(const std::string &description="unnamed", size_t buffer_size=128)
Construct a Queue of items of type T.
Definition: thread_queue.h:344
Queue(const Queue &)=delete
#define DEBUG(msg)
Definition: exception.h:75
VectorType::Scalar value(const VectorType &coefs, typename VectorType::Scalar cos_elevation, typename VectorType::Scalar cos_azimuth, typename VectorType::Scalar sin_azimuth, int lmax)
Definition: SH.h:233
__run< Functor >::type run(Functor &&functor, const std::string &name="unnamed")
Execute the functor's execute method in a separate thread.
Definition: thread.h:373
size_t threads_to_execute()
void run_queue(Source &&source, const Item &item, Sink &&sink, size_t capacity=128)
convenience function to set up and run a 2-stage multi-threaded pipeline.
__Batch< Item > batch(const Item &, size_t number=128)
used to request batched processing of items
Definition: thread_queue.h:858
#define NOMEMALIGN
Definition: memory.h:22
void write(const KeyValues &keyval, nlohmann::json &json)
KeyValues read(const nlohmann::json &json, const KeyValues &preexisting=KeyValues())
List get(const HeaderType &header)
return the strides of header as a vector<ssize_t>
Definition: stride.h:125
Definition: base.h:24
void check_app_exit_code()
Item item
#define MEMALIGN(...)
Definition: types.h:185
#define FORCE_INLINE
Definition: types.h:156
size_t num
Definition: thread.h:216
std::remove_reference< Functor >::type & functor
Definition: thread.h:215
#define MRTRIX_QUEUE_DEFAULT_CAPACITY
Definition: thread_queue.h:27
#define MRTRIX_QUEUE_DEFAULT_BATCH_SIZE
Definition: thread_queue.h:28