20 #ifndef __TPIE_PIPELINING_HELPERS_H__
21 #define __TPIE_PIPELINING_HELPERS_H__
24 #include <tpie/pipelining/node.h>
25 #include <tpie/pipelining/pipe_base.h>
26 #include <tpie/pipelining/factory_helpers.h>
33 template <
typename dest_t>
38 ostream_logger_t(dest_t dest, std::ostream & log) : dest(std::move(dest)), log(log), begun(
false), ended(
false) {
40 set_name(
"Log", PRIORITY_INSIGNIFICANT);
50 void push(
const item_type & item) {
52 log <<
"WARNING: push() called before begin(). Calling begin on rest of pipeline." << std::endl;
56 log <<
"WARNING: push() called after end()." << std::endl;
59 log <<
"pushing " << item << std::endl;
69 template <
typename source_t>
74 pull_peek_t(source_t source) : source(std::move(source)) {
80 could_pull = source.can_pull();
81 if (could_pull) item=source.pull();
86 could_pull = source.can_pull();
87 if (could_pull) item=source.pull();
95 bool can_pull()
const {
105 template <
typename T>
111 std::shared_ptr<T>
buffer;
112 void push(
const T & el) {
120 template <
typename dest_t,
typename fact2_t>
123 typedef typename fact2_t::constructed_type dest2_t;
126 fork_t(dest_t dest, fact2_t fact2) : dest(std::move(dest)), dest2(fact2.construct()) {
131 void push(
const item_type & item) {
142 template <
typename source_t,
typename dest_fact_t>
147 pull_fork_t(source_t source, dest_fact_t dest_fact)
148 : dest(dest_fact.construct())
149 , source(std::move(source)) {
154 bool can_pull() {
return source.can_pull();}
157 item_type i=source.pull();
163 typename dest_fact_t::constructed_type dest;
168 template <
typename T>
173 void push(
const T &) {}
177 template <
typename T>
183 T pull() {tpie_unreachable();}
184 bool can_pull() {
return false;}
188 template <
typename IT>
193 typedef typename IT::value_type item_type;
209 template <
typename dest_t,
typename IT>
218 , dest(std::move(dest)) {
230 template <
typename Iterator,
typename Item =
void>
233 template <
typename Iterator>
237 typedef typename Iterator::value_type item_type;
243 void push(
const item_type & item) {
249 template <
typename Iterator,
typename Item>
259 void push(
const item_type & item) {
265 template <
typename dest_t,
typename IT>
272 , dest(std::move(dest)) {
277 while (dest.can_pull()) {
284 template <
typename dest_t,
typename F>
291 preparer_t(dest_t dest,
const F & functor): functor(functor), dest(std::move(dest)) {
296 functor(*
static_cast<node*
>(
this));
299 void push(
const item_type & item) {dest.push(item);}
302 template <
typename dest_t,
typename F>
309 propagater_t(dest_t dest,
const F & functor): functor(functor), dest(std::move(dest)) {
314 functor(*
static_cast<node*
>(
this));
317 void push(
const item_type & item) {dest.push(item);}
320 template <
typename dest_t,
typename src_fact_t>
322 typedef typename src_fact_t::constructed_type src_t;
325 zip_t(dest_t dest, src_fact_t src_fact)
326 : src(src_fact.construct()), dest(std::move(dest)) {
331 void push(
const item_type & item) {
332 tp_assert(src.can_pull(),
"We should be able to pull");
333 dest.push(std::make_pair(item, src.pull()));
340 template <
typename dest1_t,
typename fact2_t>
342 typedef typename fact2_t::constructed_type dest2_t;
343 typedef typename push_type<dest1_t>::type first_type;
344 typedef typename push_type<dest2_t>::type second_type;
345 typedef std::pair<first_type, second_type> item_type;
347 unzip_t(dest1_t dest1, fact2_t fact2) : dest1(std::move(dest1)), dest2(fact2.construct()) {
352 void push(
const item_type & item) {
353 dest2.push(item.second);
354 dest1.push(item.first);
361 template <
typename dest_t,
typename T>
368 void push(
const item_type & item) {dest.push(item);}
372 template <
typename dest_t,
typename fact_t>
375 typedef typename fact_t::constructed_type source_t;
379 : dest(std::move(dest)), src(fact.construct()) {
385 size_t size = fetch<stream_size_type>(
"items");
390 while (src.can_pull()) {
391 dest.push(src.pull());
401 template <
typename dest_t,
typename equal_t>
406 unique_t(dest_t dest, equal_t equal)
407 : equal(equal), dest(std::move(dest)) {}
413 void push(
const item_type & item) {
414 if (!first && equal(prev, item))
428 template <
typename src_t,
typename equal_t>
434 : equal(equal), src(std::move(src)) {}
437 has_item = src.can_pull();
438 if (!has_item)
return;
452 if (!src.can_pull())
return over();
455 do next = src.pull();
456 while (src.can_pull() && equal(item, next));
458 if (equal(item, next))
return over();
459 std::swap(next, item);
476 inline pipe_middle<factory<bits::ostream_logger_t, std::ostream &> >
493 template <
typename fact_t>
496 return {std::move(to.factory)};
505 template <
typename dest_fact_t>
506 pullpipe_middle<tfactory<bits::pull_fork_t, Args<dest_fact_t>, dest_fact_t> >
508 return {std::move(dest_fact)};
517 template <
typename fact_t>
518 pipe_middle<tfactory<bits::unzip_t, Args<fact_t>, fact_t> >
520 return {std::move(to.factory)};
530 template <
typename fact_t>
531 pipe_middle<tfactory<bits::zip_t, Args<fact_t>, fact_t> >
533 return {std::move(from.factory)};
541 template <
typename T>
542 inline pipe_end<termfactory<bits::null_sink_t<T>>>
550 template <
typename T>
551 inline pullpipe_begin<termfactory<bits::zero_source_t<T>>>
555 template <
template <
typename dest_t>
class Fact,
typename... T>
556 pipe_begin<factory<Fact, T...> > make_pipe_begin(T... t) {
560 template <
template <
typename dest_t>
class Fact,
typename... T>
561 pipe_middle<factory<Fact, T...> > make_pipe_middle(T... t) {
565 template <
typename Fact,
typename... T>
566 pipe_end<termfactory<Fact, T...> > make_pipe_end(T ... t) {
575 template <
typename IT>
586 template <
typename IT>
597 template <
typename IT>
609 template <
typename Item,
typename IT>
620 template <
typename IT>
631 template <
typename F>
642 template <
typename F>
653 template <
typename T>
663 template <
typename fact_t>
664 pipe_begin<tfactory<bits::pull_source_t, Args<fact_t>, fact_t> >
666 return {std::move(from.factory)};
675 template <
typename equal_t>
686 template <
typename equal_t>
692 #endif //__TPIE_PIPELINING_HELPERS_H__