TPIE

11a2c2d
base.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_PARALLEL_BASE_H__
21 #define __TPIE_PIPELINING_PARALLEL_BASE_H__
22 
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/factory_base.h>
25 #include <tpie/array_view.h>
26 #include <memory>
28 #include <tpie/pipelining/parallel/options.h>
29 #include <tpie/pipelining/parallel/worker_state.h>
30 #include <tpie/pipelining/parallel/aligned_array.h>
31 #include <thread>
32 
33 namespace tpie::pipelining::parallel_bits {
34 
35 // predeclare
36 template <typename T>
37 class before;
38 template <typename dest_t>
40 template <typename T>
41 class after;
42 template <typename T1, typename T2>
43 class state;
44 
53 template <typename Input, typename Output>
54 class threads {
55  typedef before<Input> before_t;
56 
57 protected:
58  static const size_t alignment = 64;
59 
62  aligned_array<pi_t, alignment> m_progressIndicators;
63 
70  threads * t;
71  public:
73  : t(t)
74  , index(0)
75  {
76  }
77 
78  virtual void init_node(node & r) override {
79  r.set_progress_indicator(t->m_progressIndicators.get(index));
80  }
81 
82  size_t index;
83  };
84 
85  friend class progress_indicator_hook;
86 
87  std::vector<before_t *> m_dests;
88 
89 public:
90  before_t & operator[](size_t idx) {
91  return *m_dests[idx];
92  }
93 
94  stream_size_type sum_steps() {
95  stream_size_type res = 0;
96  for (size_t i = 0; i < m_progressIndicators.size(); ++i) {
97  res += m_progressIndicators.get(i)->get_current();
98  }
99  return res;
100  }
101 
102  virtual ~threads() {}
103 };
104 
108 template <typename Input, typename Output, typename fact_t>
109 class threads_impl : public threads<Input, Output> {
110 private:
111  typedef threads<Input, Output> p_t;
112 
114  typedef typename p_t::pi_t pi_t;
115 
116  typedef after<Output> after_t;
117  typedef typename fact_t::template constructed_type<after_t> worker_t;
118  typedef typename push_type<worker_t>::type T1;
119  typedef Output T2;
121  static const size_t alignment = p_t::alignment;
123 
125  size_t numJobs;
126 
128  aligned_before_t m_data;
129 
130 public:
131  threads_impl(fact_t && fact,
132  state<T1, T2> & st)
133  : numJobs(st.opts.numJobs)
134  {
135  typename p_t::progress_indicator_hook hook(this);
136  fact.hook_initialization(&hook);
137  fact.set_destination_kind_push();
138  // uninitialized allocation
139  m_data.realloc(numJobs);
140  this->m_progressIndicators.realloc(numJobs);
141  this->m_dests.resize(numJobs);
142 
143  // construct elements manually
144  for (size_t i = 0; i < numJobs; ++i) {
145  // for debugging: check that pointer is aligned.
146  if (((size_t) m_data.get(i)) % alignment != 0) {
147  log_warning() << "Thread " << i << " is not aligned: Address "
148  << m_data.get(i) << " is off by " <<
149  (((size_t) m_data.get(i)) % alignment) << " bytes"
150  << std::endl;
151  }
152 
153  hook.index = i;
154  new (this->m_progressIndicators.get(i)) pi_t();
155 
156  auto n = fact.construct_copy(after_t(st, i));
157  if (i == 0)
158  n.set_plot_options(node::PLOT_PARALLEL);
159  else
160  n.set_plot_options(node::PLOT_PARALLEL | node::PLOT_SIMPLIFIED_HIDE);
161  this->m_dests[i] =
162  new(m_data.get(i))
163  before_t(st, i, std::move(n));
164  }
165  }
166 
167  virtual ~threads_impl() {
168  for (size_t i = 0; i < numJobs; ++i) {
169  m_data.get(i)->~before_t();
170  this->m_progressIndicators.get(i)->~pi_t();
171  }
172  m_data.realloc(0);
173  this->m_progressIndicators.realloc(0);
174  }
175 };
176 
180 class after_base : public node {
181 public:
185  virtual void worker_initialize() = 0;
186 
191  virtual void flush_buffer() = 0;
192 
196  virtual void set_consumer(node *) = 0;
197 };
198 
207 class state_base {
208 public:
209  typedef std::mutex mutex_t;
210  typedef std::condition_variable cond_t;
211  typedef std::unique_lock<std::mutex> lock_t;
212 
213  const options opts;
214 
216  mutex_t mutex;
217 
224  cond_t producerCond;
225 
235  cond_t * workerCond;
236 
239 
241  std::exception_ptr eptr;
242 
244  void set_input_ptr(size_t idx, node * v) {
245  m_inputs[idx] = v;
246  }
247 
249  void set_output_ptr(size_t idx, after_base * v) {
250  m_outputs[idx] = v;
251  }
252 
260  node & input(size_t idx) { return *m_inputs[idx]; }
261 
272  after_base & output(size_t idx) { return *m_outputs[idx]; }
273 
275  worker_state get_state(size_t idx) {
276  return m_states[idx];
277  }
278 
280  void transition_state(size_t idx, worker_state from, worker_state to) {
281  if (m_states[idx] != from) {
282  std::stringstream ss;
283  ss << idx << " Invalid state transition " << from << " -> " << to << "; current state is " << m_states[idx];
284  log_error() << ss.str() << std::endl;
285  throw exception(ss.str());
286  }
287  m_states[idx] = to;
288  }
289 
290 protected:
291  std::vector<node *> m_inputs;
292  std::vector<after_base *> m_outputs;
293  std::vector<worker_state> m_states;
294 
295  state_base(const options opts)
296  : opts(opts)
297  , runningWorkers(0)
298  , m_inputs(opts.numJobs, 0)
299  , m_outputs(opts.numJobs, 0)
300  , m_states(opts.numJobs, INITIALIZING)
301  {
302  workerCond = new cond_t[opts.numJobs];
303  }
304 
305  virtual ~state_base() {
306  delete[] workerCond;
307  }
308 };
309 
313 template <typename T>
315  memory_size_type m_inputSize;
316  array<T> m_inputBuffer;
317 
318 public:
319  array_view<T> get_input() {
320  return array_view<T>(&m_inputBuffer[0], m_inputSize);
321  }
322 
323  void set_input(array_view<T> input) {
324  if (input.size() > m_inputBuffer.size())
325  throw tpie::exception(m_inputBuffer.size() ? "Input too large" : "Input buffer not initialized");
326 
327  memory_size_type items =
328  std::copy(input.begin(), input.end(), m_inputBuffer.begin())
329  -m_inputBuffer.begin();
330 
331  m_inputSize = items;
332  }
333 
334  parallel_input_buffer(const options & opts)
335  : m_inputSize(0)
336  , m_inputBuffer(opts.bufSize)
337  {
338  }
339 };
340 
344 template <typename T>
346  memory_size_type m_outputSize;
347  array<T> m_outputBuffer;
348  friend class after<T>;
349 
350 public:
351  array_view<T> get_output() {
352  return array_view<T>(&m_outputBuffer[0], m_outputSize);
353  }
354 
355  parallel_output_buffer(const options & opts)
356  : m_outputSize(0)
357  , m_outputBuffer(opts.bufSize)
358  {
359  }
360 };
361 
369 template <typename T>
370 class consumer : public node {
371 public:
372  typedef T item_type;
373 
374  virtual void consume(array_view<T>) = 0;
375  // node has virtual dtor
376 };
377 
382 template <typename T1, typename T2>
383 class state : public state_base {
384 public:
385  typedef std::shared_ptr<state> ptr;
386  typedef state_base::mutex_t mutex_t;
387  typedef state_base::cond_t cond_t;
388  typedef state_base::lock_t lock_t;
389 
390  array<parallel_input_buffer<T1> *> m_inputBuffers;
391  array<parallel_output_buffer<T2> *> m_outputBuffers;
392 
393  consumer<T2> * m_cons;
394 
395  std::unique_ptr<threads<T1, T2> > pipes;
396 
397  template <typename fact_t>
398  state( options opts, fact_t && fact)
399  : state_base(opts)
400  , m_inputBuffers(opts.numJobs)
401  , m_outputBuffers(opts.numJobs)
402  , m_cons(0)
403  {
404  typedef threads_impl<T1, T2, fact_t> pipes_impl_t;
405  pipes.reset(new pipes_impl_t(std::move(fact), *this));
406  }
407 
408  void set_consumer_ptr(consumer<T2> * cons) {
409  m_cons = cons;
410  }
411 
412  consumer<T2> * const * get_consumer_ptr_ptr() const {
413  return &m_cons;
414  }
415 };
416 
420 template <typename T>
421 class after : public after_base {
422 protected:
423  state_base & st;
424  size_t parId;
425  std::unique_ptr<parallel_output_buffer<T> > m_buffer;
426  array<parallel_output_buffer<T> *> & m_outputBuffers;
427  typedef state_base::lock_t lock_t;
428  consumer<T> * const * m_cons;
429 
430 public:
431  typedef T item_type;
432 
433  template <typename Input>
434  after(state<Input, T> & state,
435  size_t parId)
436  : st(state)
437  , parId(parId)
438  , m_outputBuffers(state.m_outputBuffers)
439  , m_cons(state.get_consumer_ptr_ptr())
440  {
441  state.set_output_ptr(parId, this);
442  set_name("Parallel after", PRIORITY_INSIGNIFICANT);
443  set_plot_options(PLOT_PARALLEL | PLOT_SIMPLIFIED_HIDE);
444  if (m_cons == 0) throw tpie::exception("Unexpected nullptr");
445  if (*m_cons != 0) throw tpie::exception("Expected nullptr");
446  }
447 
448  void set_consumer(node * cons) override {
449  this->add_push_destination(*cons);
450  }
451 
452  after(after && other)
453  : after_base(std::move(other))
454  , st(other.st)
455  , parId(std::move(other.parId))
456  , m_outputBuffers(other.m_outputBuffers)
457  , m_cons(std::move(other.m_cons)) {
458  st.set_output_ptr(parId, this);
459  if (m_cons == 0) throw tpie::exception("Unexpected nullptr in move");
460  if (*m_cons != 0) throw tpie::exception("Expected nullptr in move");
461  }
462 
466  void push(const T & item) {
467  if (m_buffer->m_outputSize >= m_buffer->m_outputBuffer.size())
468  flush_buffer_impl(false);
469 
470  m_buffer->m_outputBuffer[m_buffer->m_outputSize++] = item;
471  }
472 
473  void end() override {
474  flush_buffer_impl(true);
475  m_buffer.reset();
476  }
477 
481  void worker_initialize() override {
482  m_buffer.reset(new parallel_output_buffer<T>(st.opts));
483  m_outputBuffers[parId] = m_buffer.get();
484  }
485 
490  void flush_buffer() override {
491  flush_buffer_impl(true);
492  }
493 
494 private:
495  bool is_done() const {
496  switch (st.get_state(parId)) {
497  case INITIALIZING:
498  throw tpie::exception("INITIALIZING not expected in after::is_done");
499  case IDLE:
500  return true;
501  case PROCESSING:
502  // The main thread may transition us from Outputting to Idle to
503  // Processing without us noticing, or it may transition us from
504  // Partial_Output to Processing. In either case, we are done
505  // flushing the buffer.
506  return true;
507  case PARTIAL_OUTPUT:
508  case OUTPUTTING:
509  return false;
510  case DONE:
511  return true;
512  }
513  throw tpie::exception("Unknown state");
514  }
515 
531  void flush_buffer_impl(bool complete) {
532  // At this point, we could check if the output buffer is empty and
533  // short-circuit when it is without acquiring the lock; however, we
534  // must do a full PROCESSING -> OUTPUTTING -> IDLE transition in this
535  // case to let the main thread know that we are done processing the
536  // input.
537 
538  lock_t lock(st.mutex);
539  if (st.get_state(parId) == DONE) {
540  if (*m_cons == 0) throw tpie::exception("Unexpected nullptr in flush_buffer");
541  array_view<T> out = m_buffer->get_output();
542  (*m_cons)->consume(out);
543  } else {
544  st.transition_state(parId, PROCESSING, complete ? OUTPUTTING : PARTIAL_OUTPUT);
545  // notify producer that output is ready
546  st.producerCond.notify_one();
547  while (!is_done()) {
548  st.workerCond[parId].wait(lock);
549  }
550  }
551  m_buffer->m_outputSize = 0;
552  }
553 };
554 
560 template <typename T>
561 class before : public node {
562 protected:
563  state_base & st;
564  size_t parId;
565  std::unique_ptr<parallel_input_buffer<T> > m_buffer;
566  array<parallel_input_buffer<T> *> & m_inputBuffers;
567  std::thread m_worker;
568 
572  virtual void push_all(array_view<T> items) = 0;
573 
574  template <typename Output>
575  before(state<T, Output> & st, size_t parId)
576  : st(st)
577  , parId(parId)
578  , m_inputBuffers(st.m_inputBuffers)
579  {
580  set_name("Parallel before", PRIORITY_INSIGNIFICANT);
581  set_plot_options(PLOT_PARALLEL | PLOT_SIMPLIFIED_HIDE);
582  }
583  // virtual dtor in node
584 
585  before(const before & other)
586  : st(other.st)
587  , parId(other.parId)
588  , m_inputBuffers(other.m_inputBuffers)
589  {
590  }
591 
592  ~before() {
593  state_base::lock_t lock(st.mutex);
594 
595  auto s = st.get_state(parId);
596  tp_assert_release(s == INITIALIZING || s == DONE, "State should be INITIALIZING or DONE in before::~before");
597 
598  if (m_worker.joinable())
599  m_worker.join();
600  }
601 
602 public:
603  typedef T item_type;
604 
605  void begin() override {
606  node::begin();
607  std::thread t(run_worker, this);
608  m_worker.swap(t);
609  }
610 
611  void end() override {
612  m_buffer.reset();
613  }
614 private:
618  bool ready() {
619  switch (st.get_state(parId)) {
620  case INITIALIZING:
621  throw tpie::exception("INITIALIZING not expected in before::ready");
622  case IDLE:
623  return false;
624  case PROCESSING:
625  return true;
626  case PARTIAL_OUTPUT:
627  throw tpie::exception("State 'partial_output' was not expected in before::ready");
628  case OUTPUTTING:
629  throw tpie::exception("State 'outputting' was not expected in before::ready");
630  case DONE:
631  return false;
632  }
633  throw tpie::exception("Unknown state");
634  }
635 
639  class running_signal {
640  typedef state_base::cond_t cond_t;
641  memory_size_type & sig;
642  cond_t & producerCond;
643  public:
644  running_signal(memory_size_type & sig, cond_t & producerCond)
645  : sig(sig)
646  , producerCond(producerCond)
647  {
648  ++sig;
649  producerCond.notify_one();
650  }
651 
652  ~running_signal() {
653  --sig;
654  producerCond.notify_one();
655  }
656  };
657 
658  static void run_worker(before * self) {
659  self->worker();
660  }
661 
665  void worker() {
666  state_base::lock_t lock(st.mutex);
667 
668  m_buffer.reset(new parallel_input_buffer<T>(st.opts));
669  m_inputBuffers[parId] = m_buffer.get();
670 
671  // virtual invocation
672  st.output(parId).worker_initialize();
673 
674  st.transition_state(parId, INITIALIZING, IDLE);
675  running_signal _(st.runningWorkers, st.producerCond);
676  while (true) {
677  // wait for transition IDLE -> PROCESSING
678  while (!ready()) {
679  if (st.get_state(parId) == DONE) {
680  return;
681  }
682  st.workerCond[parId].wait(lock);
683  }
684  lock.unlock();
685 
686  try {
687  // Virtual invocation; eventually calls flush_buffer_impl(true)
688  // to switch state to OUTPUTTING or PARTIAL_OUTPUT.
689  push_all(m_buffer->get_input());
690  } catch (...) {
691  // Some node in the pipeline threw an exception.
692  // Pass it to the main thread.
693  lock.lock();
694  st.transition_state(parId, PROCESSING, IDLE);
695  st.eptr = std::current_exception();
696  st.producerCond.notify_one();
697  continue;
698  }
699  lock.lock();
700  }
701  }
702 };
703 
707 template <typename dest_t>
708 class before_impl : public before<typename push_type<dest_t>::type> {
709  typedef typename push_type<dest_t>::type item_type;
710 
711  dest_t dest;
712 
713 public:
714  template <typename Output>
715  before_impl(state<item_type, Output> & st,
716  size_t parId,
717  dest_t dest)
718  : before<item_type>(st, parId)
719  , dest(std::move(dest))
720  {
721  this->add_push_destination(dest);
722  st.set_input_ptr(parId, this);
723  }
724 
731  virtual void push_all(array_view<item_type> items) {
732  for (size_t i = 0; i < items.size(); ++i) {
733  dest.push(items[i]);
734  }
735 
736  // virtual invocation
737  this->st.output(this->parId).flush_buffer();
738  }
739 };
740 
744 template <typename Input, typename Output, typename dest_t>
745 class consumer_impl : public consumer<typename push_type<dest_t>::type> {
747  typedef typename state_t::ptr stateptr;
748  dest_t dest;
749  stateptr st;
750 public:
751  typedef typename push_type<dest_t>::type item_type;
752 
753  consumer_impl(dest_t dest, stateptr st)
754  : dest(std::move(dest))
755  , st(st)
756  {
757  this->add_push_destination(dest);
758  this->set_name("Parallel output", PRIORITY_INSIGNIFICANT);
759  this->set_plot_options(node::PLOT_PARALLEL | node::PLOT_SIMPLIFIED_HIDE);
760  for (size_t i = 0; i < st->opts.numJobs; ++i) {
761  st->output(i).set_consumer(this);
762  }
763  }
764 
768  void consume(array_view<item_type> a) override {
769  for (size_t i = 0; i < a.size(); ++i) {
770  dest.push(a[i]);
771  }
772  }
773 };
774 
780 template <typename T1, typename T2>
781 class producer : public node {
782 public:
783  typedef T1 item_type;
784 
785 private:
786  typedef state<T1, T2> state_t;
787  typedef typename state_t::ptr stateptr;
788  stateptr st;
789  array<T1> inputBuffer;
790  size_t written;
791  size_t readyIdx;
792  std::shared_ptr<consumer<T2> > cons;
793  internal_queue<memory_size_type> m_outputOrder;
794  stream_size_type m_steps;
795 
803  bool has_ready_pipe() {
804  for (size_t i = 0; i < st->opts.numJobs; ++i) {
805  switch (st->get_state(i)) {
806  case INITIALIZING:
807  case PROCESSING:
808  case DONE:
809  break;
810  case PARTIAL_OUTPUT:
811  case OUTPUTTING:
812  // If we have to maintain order of items, the only
813  // outputting worker we consider to be waiting is the
814  // "front worker".
815  if (st->opts.maintainOrder && m_outputOrder.front() != i)
816  break;
817  // fallthrough
818  case IDLE:
819  readyIdx = i;
820  return true;
821  }
822  }
823  return false;
824  }
825 
838  bool has_outputting_pipe(bool maintainOrder) {
839  for (size_t i = 0; i < st->opts.numJobs; ++i) {
840  switch (st->get_state(i)) {
841  case INITIALIZING:
842  case IDLE:
843  case PROCESSING:
844  case DONE:
845  break;
846  case PARTIAL_OUTPUT:
847  case OUTPUTTING:
848  if (maintainOrder && m_outputOrder.front() != i)
849  break;
850  readyIdx = i;
851  return true;
852  }
853  }
854  return false;
855  }
856 
867  bool has_processing_pipe() {
868  for (size_t i = 0; i < st->opts.numJobs; ++i) {
869  switch (st->get_state(i)) {
870  case INITIALIZING:
871  case IDLE:
872  case PARTIAL_OUTPUT:
873  case OUTPUTTING:
874  case DONE:
875  break;
876  case PROCESSING:
877  return true;
878  }
879  }
880  return false;
881  }
882 
886  void flush_steps() {
887  // The number of items has been forwarded along unchanged to all
888  // the workers (it is still a valid upper bound).
889  //
890  // This means the workers each expect to handle all the items,
891  // which means the number of steps reported in total is scaled up
892  // by the number of workers.
893  //
894  // Therefore, we similarly scale up the number of times we call step.
895  // In effect, every time step() is called once in a single worker,
896  // we process this as if all workers called step().
897 
898  stream_size_type steps = st->pipes->sum_steps();
899  if (steps != m_steps) {
900  this->get_progress_indicator()->step(st->opts.numJobs*(steps - m_steps));
901  m_steps = steps;
902  }
903  }
904 
911  void handle_exceptions(state_base::lock_t & lock) {
912  if (!st->eptr) return;
913  stop_workers(lock);
914  std::rethrow_exception(st->eptr);
915  }
916 
930  void stop_workers(state_base::lock_t & lock) {
931  if (st->runningWorkers == 0) return;
932 
933  bool done = false;
934  while (!done) {
935  // Force maintainOrder == false to avoid deadlock in case processor p
936  // has thrown an exception and processor p+1 is ready to output.
937  while (!has_outputting_pipe(false)) {
938  if (!has_processing_pipe()) {
939  done = true;
940  break;
941  }
942  // All items pushed; wait for processors to complete
943  st->producerCond.wait(lock);
944  }
945 
946  if (done) break;
947 
948  // At this point, we have a processor that waiting to output.
949  // Set its state to PROCESSING or IDLE without pushing its items
950  // (since we are currently handling an exception).
951  if (st->get_state(readyIdx) == PARTIAL_OUTPUT) {
952  st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING);
953  st->workerCond[readyIdx].notify_one();
954  continue;
955  }
956  st->transition_state(readyIdx, OUTPUTTING, IDLE);
957  }
958 
959  // Notify all workers that all processing is done
960  for (size_t i = 0; i < st->opts.numJobs; ++i) {
961  auto s = st->get_state(i);
962  switch (s) {
963  // Some workers could already be done
964  case DONE:
965  break;
966 
967  // If we get an exception before begin() has been called
968  case INITIALIZING:
969 
970  // If the worker is idle
971  case IDLE:
972  st->transition_state(i, s, DONE);
973  st->workerCond[i].notify_one();
974  break;
975 
976  // These cases should have been handled earlier in this function
977  case PROCESSING:
978  throw tpie::exception("State PROCESSING not expected in stop_workers()");
979  case PARTIAL_OUTPUT:
980  throw tpie::exception("State PARTIAL_OUTPUT not expected in stop_workers()");
981  case OUTPUTTING:
982  throw tpie::exception("State OUTPUTTING not expected in stop_workers()");
983  }
984  }
985 
986  // Wait for workers to stop
987  while (st->runningWorkers > 0)
988  st->producerCond.wait(lock);
989  }
990 
991 public:
992  template <typename consumer_t>
993  producer(stateptr st, consumer_t cons)
994  : st(st)
995  , written(0)
996  , cons(new consumer_t(std::move(cons)))
997  , m_steps(0)
998  {
999  for (size_t i = 0; i < st->opts.numJobs; ++i) {
1000  this->add_push_destination(st->input(i));
1001  }
1002  this->set_name("Parallel input", PRIORITY_INSIGNIFICANT);
1003  this->set_plot_options(PLOT_PARALLEL | PLOT_SIMPLIFIED_HIDE);
1004 
1005  memory_size_type usage =
1006  st->opts.numJobs * st->opts.bufSize * (sizeof(T1) + sizeof(T2)) // workers
1007  + st->opts.bufSize * sizeof(item_type) // our buffer
1008  ;
1009  this->set_minimum_memory(usage);
1010 
1011  if (st->opts.maintainOrder) {
1012  m_outputOrder.resize(st->opts.numJobs);
1013  }
1014  }
1015 
1016  producer(const producer &) = delete;
1017  const producer & operator=(const producer &) = delete;
1018 
1019  producer(producer &&) = default;
1020  producer & operator=(producer &&) = default;
1021 
1022  virtual ~producer() {
1023  if (st) {
1024  state_base::lock_t lock(st->mutex);
1025  stop_workers(lock);
1026  }
1027  }
1028 
1029  void begin() override {
1030  inputBuffer.resize(st->opts.bufSize);
1031 
1032  state_base::lock_t lock(st->mutex);
1033  while (st->runningWorkers != st->opts.numJobs) {
1034  st->producerCond.wait(lock);
1035  }
1036 
1037  handle_exceptions(lock);
1038  }
1039 
1049  void push(item_type item) {
1050  inputBuffer[written++] = item;
1051  if (written < st->opts.bufSize) {
1052  // Wait for more items before doing anything expensive such as
1053  // locking.
1054  return;
1055  }
1056  state_base::lock_t lock(st->mutex);
1057 
1058  handle_exceptions(lock);
1059 
1060  flush_steps();
1061 
1062  empty_input_buffer(lock);
1063  }
1064 
1065 private:
1066  void empty_input_buffer(state_base::lock_t & lock) {
1067  while (written > 0) {
1068  while (!st->eptr && !has_ready_pipe()) {
1069  st->producerCond.wait(lock);
1070  }
1071 
1072  // Either st->eptr is set or we have a ready pipe.
1073  handle_exceptions(lock);
1074  // At this point, we have a ready pipe.
1075 
1076  switch (st->get_state(readyIdx)) {
1077  case INITIALIZING:
1078  throw tpie::exception("State 'INITIALIZING' not expected at this point");
1079  case IDLE:
1080  {
1081  // Send buffer to ready worker
1082  item_type * first = &inputBuffer[0];
1083  item_type * last = first + written;
1084  parallel_input_buffer<T1> & dest = *st->m_inputBuffers[readyIdx];
1085  dest.set_input(array_view<T1>(first, last));
1086  st->transition_state(readyIdx, IDLE, PROCESSING);
1087  st->workerCond[readyIdx].notify_one();
1088  written = 0;
1089  if (st->opts.maintainOrder)
1090  m_outputOrder.push(readyIdx);
1091  break;
1092  }
1093  case PROCESSING:
1094  throw tpie::exception("State 'processing' not expected at this point");
1095  case PARTIAL_OUTPUT:
1096  // Receive buffer (virtual invocation)
1097  cons->consume(st->m_outputBuffers[readyIdx]->get_output());
1098  st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING);
1099  st->workerCond[readyIdx].notify_one();
1100  break;
1101  case OUTPUTTING:
1102  // Receive buffer (virtual invocation)
1103  cons->consume(st->m_outputBuffers[readyIdx]->get_output());
1104 
1105  st->transition_state(readyIdx, OUTPUTTING, IDLE);
1106  st->workerCond[readyIdx].notify_one();
1107  if (st->opts.maintainOrder) {
1108  if (m_outputOrder.front() != readyIdx) {
1109  log_error() << "Producer: Expected " << readyIdx << " in front; got "
1110  << m_outputOrder.front() << std::endl;
1111  throw tpie::exception("Producer got wrong entry from has_ready_pipe");
1112  }
1113  m_outputOrder.pop();
1114  }
1115  break;
1116  case DONE:
1117  throw tpie::exception("State 'DONE' not expected at this point");
1118  }
1119  }
1120  }
1121 
1122 public:
1123  void end() override {
1124  state_base::lock_t lock(st->mutex);
1125 
1126  flush_steps();
1127 
1128  empty_input_buffer(lock);
1129 
1130  inputBuffer.resize(0);
1131 
1132  st->set_consumer_ptr(cons.get());
1133 
1134  bool done = false;
1135  while (!done) {
1136  while (!st->eptr && !has_outputting_pipe(st->opts.maintainOrder)) {
1137  if (!has_processing_pipe()) {
1138  done = true;
1139  break;
1140  }
1141  // All items pushed; wait for processors to complete
1142  st->producerCond.wait(lock);
1143  }
1144 
1145  handle_exceptions(lock);
1146 
1147  if (done) break;
1148 
1149  // virtual invocation
1150  cons->consume(st->m_outputBuffers[readyIdx]->get_output());
1151 
1152  if (st->get_state(readyIdx) == PARTIAL_OUTPUT) {
1153  st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING);
1154  st->workerCond[readyIdx].notify_one();
1155  continue;
1156  }
1157  st->transition_state(readyIdx, OUTPUTTING, IDLE);
1158  if (st->opts.maintainOrder) {
1159  if (m_outputOrder.front() != readyIdx) {
1160  log_error() << "Producer: Expected " << readyIdx << " in front; got "
1161  << m_outputOrder.front() << std::endl;
1162  throw tpie::exception("Producer got wrong entry from has_ready_pipe");
1163  }
1164  m_outputOrder.pop();
1165  }
1166  }
1167  // Notify all workers that all processing is done
1168  for (size_t i = 0; i < st->opts.numJobs; ++i) {
1169  st->transition_state(i, IDLE, DONE);
1170  st->workerCond[i].notify_one();
1171  }
1172  while (!st->eptr && st->runningWorkers > 0) {
1173  st->producerCond.wait(lock);
1174  }
1175  // All workers terminated
1176 
1177  handle_exceptions(lock);
1178  flush_steps();
1179 
1180  free_structure_memory(m_outputOrder);
1181  }
1182 };
1183 
1184 } // namespace tpie::pipelining::parallel_bits
1185 
1186 #endif //__TPIE_PIPELINING_PARALLEL_BASE_H__
array_view.h
tpie::pipelining::parallel_bits::consumer
Node running in main thread, accepting an output buffer from the managing producer and forwards them ...
Definition: base.h:370
tpie::pipelining::node::begin
virtual void begin()
Begin pipeline processing phase.
Definition: node.h:298
tpie::pipelining::parallel_bits::state_base::get_state
worker_state get_state(size_t idx)
Shared state, must have mutex to use.
Definition: base.h:275
tpie::pipelining::parallel_bits::state_base::runningWorkers
size_t runningWorkers
Shared state, must have mutex to write.
Definition: base.h:238
tpie::pipelining::parallel_bits::state_base::set_input_ptr
void set_input_ptr(size_t idx, node *v)
Must not be used concurrently.
Definition: base.h:244
tpie::pipelining::parallel_bits::producer::end
void end() override
End pipeline processing phase.
Definition: base.h:1123
tpie::pipelining::parallel_bits::state_base::transition_state
void transition_state(size_t idx, worker_state from, worker_state to)
Shared state, must have mutex to use.
Definition: base.h:280
tpie::internal_queue::front
const T & front() const
Return the item that has been in the queue for the longest time.
Definition: internal_queue.h:76
tpie::pipelining::parallel_bits::consumer_impl
Concrete consumer implementation.
Definition: base.h:745
tpie::pipelining::parallel_bits::producer::push
void push(item_type item)
Accumulate input buffer and send off to workers.
Definition: base.h:1049
tpie::pipelining::parallel_bits::before_impl
Concrete before class.
Definition: base.h:39
tpie::internal_queue< memory_size_type >
tpie::array< T >
tpie::internal_queue::push
void push(T val)
Add an element to the front of the queue.
Definition: internal_queue.h:88
tpie::pipelining::parallel_bits::after_base::flush_buffer
virtual void flush_buffer()=0
Called by before::worker after a batch of items has been pushed.
tpie::array::begin
iterator begin()
Return an iterator to the beginning of the array.
Definition: array.h:312
tpie::free_structure_memory
void free_structure_memory(T &v)
Free the memory assosiated with a stl or tpie structure by swapping it with a default constructed str...
Definition: util.h:217
tpie::log_error
logstream & log_error()
Return logstream for writing error log messages.
Definition: tpie_log.h:148
tpie::pipelining::parallel_bits::state_base::producerCond
cond_t producerCond
Condition variable.
Definition: base.h:224
tpie::pipelining::parallel_bits::state_base::set_output_ptr
void set_output_ptr(size_t idx, after_base *v)
Must not be used concurrently.
Definition: base.h:249
tpie::pipelining::node::set_minimum_memory
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:206
tpie::pipelining::parallel_bits::after_base::worker_initialize
virtual void worker_initialize()=0
Called by before::worker to initialize buffers.
tpie::pipelining::push_type
Class to deduce the item_type of a node of type T.
Definition: node_traits.h:158
tpie::pipelining::parallel_bits::after
Accepts output items and sends them to the main thread.
Definition: base.h:41
tpie::internal_queue::pop
void pop()
Remove an element from the back of the queue.
Definition: internal_queue.h:93
tpie::pipelining::parallel_bits::after_base::set_consumer
virtual void set_consumer(node *)=0
For internal use in order to construct the pipeline graph.
tpie::pipelining::parallel_bits::state
State subclass containing the item type specific state, i.e.
Definition: base.h:43
tpie::pipelining::node::get_progress_indicator
progress_indicator_base * get_progress_indicator()
Used internally. Get the progress indicator used.
Definition: node.h:414
tpie::pipelining::parallel_bits::before_impl::push_all
virtual void push_all(array_view< item_type > items)
Push all items from buffer and flush output buffer afterwards.
Definition: base.h:731
tpie::pipelining::parallel_bits::threads::pi_t
progress_indicator_null pi_t
Progress indicator type.
Definition: base.h:61
tpie::pipelining::node::set_plot_options
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:458
tpie::pipelining::parallel_bits::state_base::input
node & input(size_t idx)
Get the specified before instance.
Definition: base.h:260
tpie::exception
Definition: exception.h:33
tpie::pipelining::parallel_bits::after::flush_buffer
void flush_buffer() override
Invoked by before::push_all when all input items have been pushed.
Definition: base.h:490
tpie::pipelining::parallel_bits::parallel_output_buffer
Instantiated in each thread.
Definition: base.h:345
tpie::pipelining::node::add_push_destination
void add_push_destination(const node_token &dest)
Called by implementers to declare a push destination.
tpie::pipelining::parallel_bits::state_base::output
after_base & output(size_t idx)
Get the specified after instance.
Definition: base.h:272
tpie::array_view_base::size
size_t size() const
Get number of elements in the array.
Definition: array_view_base.h:108
tpie::pipelining::node::set_progress_indicator
void set_progress_indicator(progress_indicator_base *pi)
Used internally. Set the progress indicator to use.
Definition: node.h:407
tpie::progress_indicator_null
a dummy progress indicator that produces no output
Definition: progress_indicator_null.h:39
tpie::pipelining::parallel_bits::producer
Producer, running in main thread, managing the parallel execution.
Definition: base.h:781
tpie::pipelining::parallel_bits::parallel_input_buffer
Instantiated in each thread.
Definition: base.h:314
tpie::pipelining::node::set_name
void set_name(const std::string &name, priority_type priority=PRIORITY_USER)
Set this node's name.
tpie::log_warning
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:158
tpie::pipelining::parallel_bits::after::push
void push(const T &item)
Push to thread-local buffer; flush it when full.
Definition: base.h:466
tpie::pipelining::parallel_bits::consumer_impl::consume
void consume(array_view< item_type > a) override
Push all items from output buffer to the rest of the pipeline.
Definition: base.h:768
tpie::progress_indicator_base::step
void step(stream_size_type step=1)
Record an increment to the indicator and advance the indicator.
Definition: progress_indicator_base.h:91
tpie::pipelining::parallel_bits::state_base
Common state in parallel pipelining library.
Definition: base.h:207
tpie::pipelining::parallel_bits::threads
Class containing an array of node instances.
Definition: base.h:54
maintain_order_type.h
tpie::pipelining::parallel_bits::state_base::mutex
mutex_t mutex
Single mutex.
Definition: base.h:216
tp_assert_release
#define tp_assert_release(condition, message)
Definition: tpie_assert.h:43
tpie::pipelining::parallel_bits::producer::begin
void begin() override
Begin pipeline processing phase.
Definition: base.h:1029
tpie::pipelining::parallel_bits::aligned_array
Aligned, uninitialized storage.
Definition: aligned_array.h:39
tpie::array::resize
void resize(size_t size, const T &elm)
Change the size of the array.
Definition: array.h:490
tpie::pipelining::input
pipe_begin< factory< bits::input_t, file_stream< T > &, stream_options > > input(file_stream< T > &fs, stream_options options=stream_options())
Pipelining nodes that pushes the contents of the given file stream to the next node in the pipeline.
Definition: file_stream.h:378
tpie::pipelining::node
Base class of all nodes.
Definition: node.h:77
tpie::pipelining::parallel_bits::threads::progress_indicator_hook
Factory hook that sets the progress indicator of the nodes run in parallel to the null progress indic...
Definition: base.h:69
tpie::pipelining::parallel_bits::threads_impl
Subclass of threads instantiating and managing the pipelines.
Definition: base.h:109
tpie::progress_indicator_base::get_current
stream_size_type get_current()
Get the current value of the step counter.
Definition: progress_indicator_base.h:152
tpie::pipelining::parallel_bits::options
User-supplied options to the parallelism framework.
Definition: options.h:30
tpie::pipelining::parallel_bits::before
Accepts input items from the main thread and sends them down the pipeline.
Definition: base.h:37
tpie::pipelining::parallel_bits::state_base::eptr
std::exception_ptr eptr
Exception thrown in worker thread to be rethrown in main thread.
Definition: base.h:241
tpie::internal_queue::resize
void resize(size_t size=0)
Resize the queue; all data is lost.
Definition: internal_queue.h:71
tpie::pipelining::item_type
pipe_middle< tfactory< bits::item_type_t, Args< T > > > item_type()
Create item type defining identity pipe node.
Definition: helpers.h:654
tpie::pipelining::factory_init_hook
Definition: factory_base.h:34
tpie::pipelining::parallel_bits::before::push_all
virtual void push_all(array_view< T > items)=0
Overridden in subclass to push a buffer of items.
tpie::pipelining::parallel_bits::after_base
Non-templated virtual base class of after.
Definition: base.h:180
tpie::pipelining::parallel_bits::after::worker_initialize
void worker_initialize() override
Invoked by before::worker (in worker thread context).
Definition: base.h:481
tpie::array::size
size_type size() const
Return the size of the array.
Definition: array.h:531
tpie::pipelining::parallel_bits::state_base::workerCond
cond_t * workerCond
Condition variable, one per worker.
Definition: base.h:235
tpie::array_view
Encapsulation of two pointers from any random access container.
Definition: array_view.h:47