20 #ifndef __TPIE_PIPELINING_PARALLEL_BASE_H__
21 #define __TPIE_PIPELINING_PARALLEL_BASE_H__
23 #include <tpie/pipelining/node.h>
24 #include <tpie/pipelining/factory_base.h>
28 #include <tpie/pipelining/parallel/options.h>
29 #include <tpie/pipelining/parallel/worker_state.h>
30 #include <tpie/pipelining/parallel/aligned_array.h>
33 namespace tpie::pipelining::parallel_bits {
38 template <
typename dest_t>
42 template <
typename T1,
typename T2>
53 template <
typename Input,
typename Output>
58 static const size_t alignment = 64;
78 virtual void init_node(
node & r)
override {
87 std::vector<before_t *> m_dests;
94 stream_size_type sum_steps() {
95 stream_size_type res = 0;
96 for (
size_t i = 0; i < m_progressIndicators.size(); ++i) {
102 virtual ~threads() {}
108 template <
typename Input,
typename Output,
typename fact_t>
117 typedef typename fact_t::template constructed_type<after_t> worker_t;
118 typedef typename push_type<worker_t>::type T1;
121 static const size_t alignment = p_t::alignment;
133 : numJobs(st.opts.numJobs)
136 fact.hook_initialization(&hook);
137 fact.set_destination_kind_push();
139 m_data.realloc(numJobs);
140 this->m_progressIndicators.realloc(numJobs);
141 this->m_dests.resize(numJobs);
144 for (
size_t i = 0; i < numJobs; ++i) {
146 if (((
size_t) m_data.get(i)) % alignment != 0) {
147 log_warning() <<
"Thread " << i <<
" is not aligned: Address "
148 << m_data.get(i) <<
" is off by " <<
149 (((size_t) m_data.get(i)) % alignment) <<
" bytes"
154 new (this->m_progressIndicators.get(i))
pi_t();
156 auto n = fact.construct_copy(
after_t(st, i));
158 n.set_plot_options(node::PLOT_PARALLEL);
160 n.set_plot_options(node::PLOT_PARALLEL | node::PLOT_SIMPLIFIED_HIDE);
168 for (
size_t i = 0; i < numJobs; ++i) {
169 m_data.get(i)->~before_t();
170 this->m_progressIndicators.get(i)->~pi_t();
173 this->m_progressIndicators.realloc(0);
209 typedef std::mutex mutex_t;
210 typedef std::condition_variable cond_t;
211 typedef std::unique_lock<std::mutex> lock_t;
276 return m_states[idx];
281 if (m_states[idx] != from) {
282 std::stringstream ss;
283 ss << idx <<
" Invalid state transition " << from <<
" -> " << to <<
"; current state is " << m_states[idx];
291 std::vector<node *> m_inputs;
292 std::vector<after_base *> m_outputs;
293 std::vector<worker_state> m_states;
298 , m_inputs(opts.numJobs, 0)
299 , m_outputs(opts.numJobs, 0)
300 , m_states(opts.numJobs, INITIALIZING)
305 virtual ~state_base() {
313 template <
typename T>
315 memory_size_type m_inputSize;
325 throw tpie::exception(m_inputBuffer.
size() ?
"Input too large" :
"Input buffer not initialized");
327 memory_size_type items =
329 -m_inputBuffer.
begin();
336 , m_inputBuffer(opts.bufSize)
344 template <
typename T>
346 memory_size_type m_outputSize;
348 friend class after<T>;
357 , m_outputBuffer(opts.bufSize)
369 template <
typename T>
382 template <
typename T1,
typename T2>
385 typedef std::shared_ptr<state> ptr;
386 typedef state_base::mutex_t mutex_t;
387 typedef state_base::cond_t cond_t;
388 typedef state_base::lock_t lock_t;
395 std::unique_ptr<threads<T1, T2> > pipes;
397 template <
typename fact_t>
400 , m_inputBuffers(opts.numJobs)
401 , m_outputBuffers(opts.numJobs)
405 pipes.reset(
new pipes_impl_t(std::move(fact), *
this));
408 void set_consumer_ptr(consumer<T2> * cons) {
412 consumer<T2> *
const * get_consumer_ptr_ptr()
const {
420 template <
typename T>
421 class after :
public after_base {
425 std::unique_ptr<parallel_output_buffer<T> > m_buffer;
426 array<parallel_output_buffer<T> *> & m_outputBuffers;
427 typedef state_base::lock_t lock_t;
428 consumer<T> *
const * m_cons;
433 template <
typename Input>
434 after(state<Input, T> & state,
438 , m_outputBuffers(state.m_outputBuffers)
439 , m_cons(state.get_consumer_ptr_ptr())
441 state.set_output_ptr(parId,
this);
442 set_name(
"Parallel after", PRIORITY_INSIGNIFICANT);
443 set_plot_options(PLOT_PARALLEL | PLOT_SIMPLIFIED_HIDE);
448 void set_consumer(node * cons)
override {
449 this->add_push_destination(*cons);
452 after(after && other)
453 : after_base(std::move(other))
455 , parId(std::move(other.parId))
456 , m_outputBuffers(other.m_outputBuffers)
457 , m_cons(std::move(other.m_cons)) {
458 st.set_output_ptr(parId,
this);
467 if (m_buffer->m_outputSize >= m_buffer->m_outputBuffer.size())
468 flush_buffer_impl(
false);
470 m_buffer->m_outputBuffer[m_buffer->m_outputSize++] = item;
473 void end()
override {
474 flush_buffer_impl(
true);
483 m_outputBuffers[parId] = m_buffer.get();
491 flush_buffer_impl(
true);
495 bool is_done()
const {
496 switch (st.get_state(parId)) {
531 void flush_buffer_impl(
bool complete) {
538 lock_t lock(st.mutex);
539 if (st.get_state(parId) == DONE) {
540 if (*m_cons == 0)
throw tpie::exception(
"Unexpected nullptr in flush_buffer");
542 (*m_cons)->consume(out);
544 st.transition_state(parId, PROCESSING, complete ? OUTPUTTING : PARTIAL_OUTPUT);
546 st.producerCond.notify_one();
548 st.workerCond[parId].wait(lock);
551 m_buffer->m_outputSize = 0;
560 template <
typename T>
561 class before :
public node {
565 std::unique_ptr<parallel_input_buffer<T> > m_buffer;
566 array<parallel_input_buffer<T> *> & m_inputBuffers;
567 std::thread m_worker;
572 virtual void push_all(array_view<T> items) = 0;
574 template <
typename Output>
575 before(state<T, Output> & st,
size_t parId)
578 , m_inputBuffers(st.m_inputBuffers)
580 set_name(
"Parallel before", PRIORITY_INSIGNIFICANT);
581 set_plot_options(PLOT_PARALLEL | PLOT_SIMPLIFIED_HIDE);
585 before(
const before & other)
588 , m_inputBuffers(other.m_inputBuffers)
593 state_base::lock_t lock(st.mutex);
595 auto s = st.get_state(parId);
596 tp_assert_release(s == INITIALIZING || s == DONE,
"State should be INITIALIZING or DONE in before::~before");
598 if (m_worker.joinable())
605 void begin()
override {
607 std::thread t(run_worker,
this);
611 void end()
override {
619 switch (st.get_state(parId)) {
627 throw tpie::exception(
"State 'partial_output' was not expected in before::ready");
629 throw tpie::exception(
"State 'outputting' was not expected in before::ready");
639 class running_signal {
640 typedef state_base::cond_t cond_t;
641 memory_size_type & sig;
642 cond_t & producerCond;
644 running_signal(memory_size_type & sig, cond_t & producerCond)
646 , producerCond(producerCond)
649 producerCond.notify_one();
654 producerCond.notify_one();
658 static void run_worker(before *
self) {
666 state_base::lock_t lock(st.mutex);
668 m_buffer.reset(
new parallel_input_buffer<T>(st.opts));
669 m_inputBuffers[parId] = m_buffer.get();
672 st.output(parId).worker_initialize();
674 st.transition_state(parId, INITIALIZING, IDLE);
675 running_signal _(st.runningWorkers, st.producerCond);
679 if (st.get_state(parId) == DONE) {
682 st.workerCond[parId].wait(lock);
694 st.transition_state(parId, PROCESSING, IDLE);
695 st.eptr = std::current_exception();
696 st.producerCond.notify_one();
707 template <
typename dest_t>
708 class before_impl :
public before<typename push_type<dest_t>::type> {
709 typedef typename push_type<dest_t>::type
item_type;
714 template <
typename Output>
715 before_impl(state<item_type, Output> & st,
719 , dest(std::move(dest))
721 this->add_push_destination(dest);
722 st.set_input_ptr(parId,
this);
732 for (
size_t i = 0; i < items.
size(); ++i) {
737 this->st.output(this->parId).flush_buffer();
744 template <
typename Input,
typename Output,
typename dest_t>
747 typedef typename state_t::ptr stateptr;
754 : dest(std::move(dest))
758 this->
set_name(
"Parallel output", PRIORITY_INSIGNIFICANT);
760 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
761 st->output(i).set_consumer(
this);
769 for (
size_t i = 0; i < a.
size(); ++i) {
780 template <
typename T1,
typename T2>
783 typedef T1 item_type;
787 typedef typename state_t::ptr stateptr;
792 std::shared_ptr<consumer<T2> > cons;
794 stream_size_type m_steps;
803 bool has_ready_pipe() {
804 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
805 switch (st->get_state(i)) {
815 if (st->opts.maintainOrder && m_outputOrder.
front() != i)
838 bool has_outputting_pipe(
bool maintainOrder) {
839 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
840 switch (st->get_state(i)) {
848 if (maintainOrder && m_outputOrder.
front() != i)
867 bool has_processing_pipe() {
868 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
869 switch (st->get_state(i)) {
898 stream_size_type steps = st->pipes->sum_steps();
899 if (steps != m_steps) {
911 void handle_exceptions(state_base::lock_t & lock) {
912 if (!st->eptr)
return;
914 std::rethrow_exception(st->eptr);
930 void stop_workers(state_base::lock_t & lock) {
931 if (st->runningWorkers == 0)
return;
937 while (!has_outputting_pipe(
false)) {
938 if (!has_processing_pipe()) {
943 st->producerCond.wait(lock);
951 if (st->get_state(readyIdx) == PARTIAL_OUTPUT) {
952 st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING);
953 st->workerCond[readyIdx].notify_one();
956 st->transition_state(readyIdx, OUTPUTTING, IDLE);
960 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
961 auto s = st->get_state(i);
972 st->transition_state(i, s, DONE);
973 st->workerCond[i].notify_one();
978 throw tpie::exception(
"State PROCESSING not expected in stop_workers()");
980 throw tpie::exception(
"State PARTIAL_OUTPUT not expected in stop_workers()");
982 throw tpie::exception(
"State OUTPUTTING not expected in stop_workers()");
987 while (st->runningWorkers > 0)
988 st->producerCond.wait(lock);
992 template <
typename consumer_t>
993 producer(stateptr st, consumer_t cons)
996 , cons(
new consumer_t(std::move(cons)))
999 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
1002 this->
set_name(
"Parallel input", PRIORITY_INSIGNIFICANT);
1005 memory_size_type usage =
1006 st->opts.numJobs * st->opts.bufSize * (
sizeof(T1) +
sizeof(T2))
1007 + st->opts.bufSize *
sizeof(item_type)
1011 if (st->opts.maintainOrder) {
1012 m_outputOrder.
resize(st->opts.numJobs);
1024 state_base::lock_t lock(st->mutex);
1030 inputBuffer.
resize(st->opts.bufSize);
1032 state_base::lock_t lock(st->mutex);
1033 while (st->runningWorkers != st->opts.numJobs) {
1034 st->producerCond.wait(lock);
1037 handle_exceptions(lock);
1050 inputBuffer[written++] = item;
1051 if (written < st->opts.bufSize) {
1056 state_base::lock_t lock(st->mutex);
1058 handle_exceptions(lock);
1062 empty_input_buffer(lock);
1066 void empty_input_buffer(state_base::lock_t & lock) {
1067 while (written > 0) {
1068 while (!st->eptr && !has_ready_pipe()) {
1069 st->producerCond.wait(lock);
1073 handle_exceptions(lock);
1076 switch (st->get_state(readyIdx)) {
1078 throw tpie::exception(
"State 'INITIALIZING' not expected at this point");
1084 parallel_input_buffer<T1> & dest = *st->m_inputBuffers[readyIdx];
1086 st->transition_state(readyIdx, IDLE, PROCESSING);
1087 st->workerCond[readyIdx].notify_one();
1089 if (st->opts.maintainOrder)
1090 m_outputOrder.
push(readyIdx);
1094 throw tpie::exception(
"State 'processing' not expected at this point");
1095 case PARTIAL_OUTPUT:
1097 cons->consume(st->m_outputBuffers[readyIdx]->get_output());
1098 st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING);
1099 st->workerCond[readyIdx].notify_one();
1103 cons->consume(st->m_outputBuffers[readyIdx]->get_output());
1105 st->transition_state(readyIdx, OUTPUTTING, IDLE);
1106 st->workerCond[readyIdx].notify_one();
1107 if (st->opts.maintainOrder) {
1108 if (m_outputOrder.
front() != readyIdx) {
1109 log_error() <<
"Producer: Expected " << readyIdx <<
" in front; got "
1110 << m_outputOrder.
front() << std::endl;
1111 throw tpie::exception(
"Producer got wrong entry from has_ready_pipe");
1113 m_outputOrder.
pop();
1124 state_base::lock_t lock(st->mutex);
1128 empty_input_buffer(lock);
1132 st->set_consumer_ptr(cons.get());
1136 while (!st->eptr && !has_outputting_pipe(st->opts.maintainOrder)) {
1137 if (!has_processing_pipe()) {
1142 st->producerCond.wait(lock);
1145 handle_exceptions(lock);
1150 cons->consume(st->m_outputBuffers[readyIdx]->get_output());
1152 if (st->get_state(readyIdx) == PARTIAL_OUTPUT) {
1153 st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING);
1154 st->workerCond[readyIdx].notify_one();
1157 st->transition_state(readyIdx, OUTPUTTING, IDLE);
1158 if (st->opts.maintainOrder) {
1159 if (m_outputOrder.
front() != readyIdx) {
1160 log_error() <<
"Producer: Expected " << readyIdx <<
" in front; got "
1161 << m_outputOrder.
front() << std::endl;
1162 throw tpie::exception(
"Producer got wrong entry from has_ready_pipe");
1164 m_outputOrder.
pop();
1168 for (
size_t i = 0; i < st->opts.numJobs; ++i) {
1169 st->transition_state(i, IDLE, DONE);
1170 st->workerCond[i].notify_one();
1172 while (!st->eptr && st->runningWorkers > 0) {
1173 st->producerCond.wait(lock);
1177 handle_exceptions(lock);
1186 #endif //__TPIE_PIPELINING_PARALLEL_BASE_H__