20 #ifndef __TPIE_PIPELINING_NODE_H__
21 #define __TPIE_PIPELINING_NODE_H__
23 #include <tpie/tpie_export.h>
24 #include <tpie/pipelining/exception.h>
28 #include <tpie/pipelining/priority_type.h>
30 #include <tpie/pipelining/node_name.h>
31 #include <tpie/pipelining/node_traits.h>
32 #include <tpie/flags.h>
46 void refresh()
override;
52 memory_size_type minimum = 0;
53 memory_size_type maximum = std::numeric_limits<memory_size_type>::max();
54 double fraction = 0.0;
56 memory_size_type available = 0;
63 priority_type namePriority = PRIORITY_NO_NAME;
65 std::string phaseName;
66 priority_type phaseNamePriority = PRIORITY_NO_NAME;
68 stream_size_type stepsTotal = 0;
79 typedef boost::optional<any_noncopyable &> maybeany_t;
85 PLOT_SIMPLIFIED_HIDE=1,
98 STATE_AFTER_PROPAGATE,
116 return m_parameters.resource_parameters[type].minimum;
124 return m_parameters.resource_parameters[type].maximum;
131 return m_parameters.resource_parameters[type].fraction;
138 return m_parameters.resource_parameters[type].available;
144 void set_minimum_resource_usage(resource_type type, memory_size_type usage);
152 void set_maximum_resource_usage(resource_type type, memory_size_type usage);
159 void set_resource_fraction(resource_type type,
double f);
171 void _internal_set_available_of_resource(resource_type type, memory_size_type available);
178 return get_minimum_resource_usage(MEMORY);
186 return get_maximum_resource_usage(MEMORY);
193 return get_resource_fraction(MEMORY);
200 return get_available_of_resource(MEMORY);
207 set_minimum_resource_usage(MEMORY, minimumMemory);
217 set_maximum_resource_usage(MEMORY, maximumMemory);
225 set_resource_fraction(MEMORY, f);
240 memory_size_type ans=0;
241 for (
const auto & p: m_buckets)
242 if (p) ans += p->count;
251 return token.get_map();
301 virtual bool is_go_free()
const {
return false;}
309 log_warning() <<
"node subclass " <<
typeid(*this).name()
310 <<
" is not an initiator node" << std::endl;
349 return m_parameters.namePriority;
356 const std::string & get_name();
362 void set_name(
const std::string & name, priority_type priority = PRIORITY_USER);
369 return m_parameters.phaseNamePriority;
376 const std::string & get_phase_name();
382 void set_phase_name(
const std::string & name, priority_type priority = PRIORITY_USER);
389 m_parameters.name = m_parameters.name.empty() ? breadcrumb : (breadcrumb +
" | " + m_parameters.name);
397 return m_parameters.stepsTotal;
400 stream_size_type get_steps_left() {
436 return m_resourceBeingAssigned;
443 m_resourceBeingAssigned = type;
451 return m_plotOptions;
459 m_plotOptions = options;
467 #pragma warning( push )
468 #pragma warning( disable : 4355 )
479 node & operator=(
const node & other) =
delete;
493 #pragma warning( pop )
499 void add_push_destination(
const node_token & dest);
504 void add_push_destination(
const node & dest);
509 void add_pull_source(
const node_token & dest);
514 void add_pull_source(
const node & dest);
528 void add_dependency(
const node & dest);
536 void add_memory_share_dependency(
const node_token & dest);
544 void add_memory_share_dependency(
const node & dest);
562 template <
typename T>
563 void forward(std::string key, T value, memory_size_type k = std::numeric_limits<memory_size_type>::max()) {
570 void forward_any(std::string key,
any_noncopyable value, memory_size_type k = std::numeric_limits<memory_size_type>::max());
576 void add_forwarded_data(std::string key, node_token::id_t from_node);
581 maybeany_t get_forwarded_data_maybe(std::string key);
589 return bool(fetch_maybe(key));
597 maybeany_t fetch_maybe(std::string key);
608 template <
typename T>
612 return any_cast<T>(item);
614 std::stringstream ss;
615 ss <<
"Trying to fetch key '" << key <<
"' of type "
616 <<
typeid(T).name() <<
" but forwarded data was of type "
617 << item.type().name() <<
". Message was: " << e.what();
625 void no_forward_through();
640 void set_steps(stream_size_type steps);
647 void step_overflow();
653 void step(stream_size_type steps = 1) {
654 assert(get_state() == STATE_IN_END ||
655 get_state() == STATE_IN_BEGIN ||
656 get_state() == STATE_AFTER_BEGIN ||
657 get_state() == STATE_IN_END ||
658 get_state() == STATE_IN_GO);
659 if (m_stepsLeft < steps)
662 m_stepsLeft -= steps;
676 inline bool can_pull()
const;
690 inline void push(
const item_type & item);
699 void register_datastructure_usage(
const std::string & name,
double priority=1);
707 void set_datastructure_memory_limits(
const std::string & name, memory_size_type min, memory_size_type max=std::numeric_limits<memory_size_type>::max());
713 memory_size_type get_datastructure_memory(
const std::string & name);
722 bits::node_map::datastructuremap_t & structures = get_node_map()->find_authority()->get_datastructures();
723 bits::node_map::datastructuremap_t::iterator i = structures.find(name);
725 if(i == structures.end())
726 throw tpie::exception(
"attempted to set non-registered datastructure");
728 i->second.second = move_if_movable<T>(datastructure);
738 bits::node_map::datastructuremap_t & structures = get_node_map()->find_authority()->get_datastructures();
739 bits::node_map::datastructuremap_t::iterator i = structures.find(name);
741 if(i == structures.end())
742 throw tpie::exception(
"attempted to get non-registered datastructure");
744 return any_cast<T>(i->second.second);
747 void unset_datastructure(
const std::string & name) {
748 bits::node_map::datastructuremap_t & structures = get_node_map()->find_authority()->get_datastructures();
749 bits::node_map::datastructuremap_t::iterator i = structures.find(name);
751 if(i == structures.end())
return;
752 i->second.second.reset();
756 struct datastructure_info_t {
757 datastructure_info_t() : min(0), max(std::numeric_limits<memory_size_type>::max()) {}
758 memory_size_type min;
759 memory_size_type max;
763 typedef std::map<std::string, datastructure_info_t> datastructuremap_t;
765 const datastructuremap_t & get_datastructures()
const {
766 return m_datastructures;
773 size_t buckets()
const {
return m_buckets.size();}
778 std::unique_ptr<memory_bucket> &
bucket(
size_t i) {
779 if (m_buckets.size() <= i) m_buckets.resize(i+1);
791 friend class bits::memory_runtime;
793 friend class bits::datastructure_runtime;
802 std::vector<std::unique_ptr<memory_bucket> > m_buckets;
804 std::map<std::string, any_noncopyable> m_forwardedFromHere;
805 std::map<std::string, node_token::id_t> m_forwardedToHere;
807 datastructuremap_t m_datastructures;
808 stream_size_type m_stepsLeft;
811 resource_type m_resourceBeingAssigned = NO_RESOURCE;
812 std::unique_ptr<progress_indicator_base> m_piProxy;
823 #endif // __TPIE_PIPELINING_NODE_H__