TPIE

11a2c2d
node.h
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
20 #ifndef __TPIE_PIPELINING_NODE_H__
21 #define __TPIE_PIPELINING_NODE_H__
22 
23 #include <tpie/tpie_export.h>
24 #include <tpie/pipelining/exception.h>
25 #include <tpie/pipelining/tokens.h>
28 #include <tpie/pipelining/priority_type.h>
30 #include <tpie/pipelining/node_name.h>
31 #include <tpie/pipelining/node_traits.h>
32 #include <tpie/flags.h>
33 #include <tpie/memory.h>
34 #include <limits>
35 #include <tpie/resources.h>
36 
37 namespace tpie::pipelining {
38 namespace bits {
39 
41  node & m_node;
42 
43 public:
45 
46  void refresh() override;
47 };
48 
49 } // namespace bits
50 
52  memory_size_type minimum = 0;
53  memory_size_type maximum = std::numeric_limits<memory_size_type>::max();
54  double fraction = 0.0;
55 
56  memory_size_type available = 0;
57 };
58 
60  node_resource_parameters resource_parameters[resource_type::TOTAL_RESOURCE_TYPES];
61 
62  std::string name;
63  priority_type namePriority = PRIORITY_NO_NAME;
64 
65  std::string phaseName;
66  priority_type phaseNamePriority = PRIORITY_NO_NAME;
67 
68  stream_size_type stepsTotal = 0;
69 };
70 
77 class TPIE_EXPORT node {
78 public:
79  typedef boost::optional<any_noncopyable &> maybeany_t;
80 
84  enum PLOT {
85  PLOT_SIMPLIFIED_HIDE=1,
86  PLOT_BUFFERED=2,
87  PLOT_PARALLEL=4
88  };
89 
93  enum STATE {
94  STATE_FRESH,
95  STATE_IN_PREPARE,
96  STATE_AFTER_PREPARE,
97  STATE_IN_PROPAGATE,
98  STATE_AFTER_PROPAGATE,
99  STATE_IN_BEGIN,
100  STATE_AFTER_BEGIN,
101  STATE_IN_GO,
102  STATE_IN_END,
103  STATE_AFTER_END
104  };
105 
109  virtual ~node() {}
110 
115  memory_size_type get_minimum_resource_usage(resource_type type) const {
116  return m_parameters.resource_parameters[type].minimum;
117  }
118 
123  memory_size_type get_maximum_resource_usage(resource_type type) const {
124  return m_parameters.resource_parameters[type].maximum;
125  }
126 
130  double get_resource_fraction(resource_type type) const {
131  return m_parameters.resource_parameters[type].fraction;
132  }
133 
137  memory_size_type get_available_of_resource(resource_type type) const {
138  return m_parameters.resource_parameters[type].available;
139  }
140 
144  void set_minimum_resource_usage(resource_type type, memory_size_type usage);
145 
152  void set_maximum_resource_usage(resource_type type, memory_size_type usage);
153 
159  void set_resource_fraction(resource_type type, double f);
160 
165  virtual void resource_available_changed(resource_type, memory_size_type) {
166  }
167 
171  void _internal_set_available_of_resource(resource_type type, memory_size_type available);
172 
177  memory_size_type get_minimum_memory() const {
178  return get_minimum_resource_usage(MEMORY);
179  }
180 
185  memory_size_type get_maximum_memory() const {
186  return get_maximum_resource_usage(MEMORY);
187  }
188 
192  double get_memory_fraction() const {
193  return get_resource_fraction(MEMORY);
194  }
195 
199  memory_size_type get_available_memory() const {
200  return get_available_of_resource(MEMORY);
201  }
202 
206  void set_minimum_memory(memory_size_type minimumMemory) {
207  set_minimum_resource_usage(MEMORY, minimumMemory);
208  }
209 
216  void set_maximum_memory(memory_size_type maximumMemory) {
217  set_maximum_resource_usage(MEMORY, maximumMemory);
218  }
219 
224  void set_memory_fraction(double f) {
225  set_resource_fraction(MEMORY, f);
226  }
227 
232  virtual void set_available_memory(memory_size_type availableMemory) {
233  unused(availableMemory);
234  }
235 
239  memory_size_type get_used_memory() const {
240  memory_size_type ans=0;
241  for (const auto & p: m_buckets)
242  if (p) ans += p->count;
243  return ans;
244  }
245 
250  bits::node_map::ptr get_node_map() const {
251  return token.get_map();
252  }
253 
258  node_token::id_t get_id() const {
259  return token.id();
260  }
261 
268  virtual void prepare() {
269  }
270 
282  virtual void propagate() {
283  }
284 
298  virtual void begin() {
299  }
300 
301  virtual bool is_go_free() const {return false;}
302 
308  virtual void go() {
309  log_warning() << "node subclass " << typeid(*this).name()
310  << " is not an initiator node" << std::endl;
311  throw not_initiator_node();
312  }
313 
328  virtual void end() {
329  }
330 
334  virtual bool can_evacuate() {
335  return false;
336  }
337 
341  virtual void evacuate() {
342  }
343 
348  inline priority_type get_name_priority() {
349  return m_parameters.namePriority;
350  }
351 
356  const std::string & get_name();
357 
362  void set_name(const std::string & name, priority_type priority = PRIORITY_USER);
363 
368  priority_type get_phase_name_priority() {
369  return m_parameters.phaseNamePriority;
370  }
371 
376  const std::string & get_phase_name();
377 
382  void set_phase_name(const std::string & name, priority_type priority = PRIORITY_USER);
383 
384 
388  void set_breadcrumb(const std::string & breadcrumb) {
389  m_parameters.name = m_parameters.name.empty() ? breadcrumb : (breadcrumb + " | " + m_parameters.name);
390  }
391 
396  stream_size_type get_steps() {
397  return m_parameters.stepsTotal;
398  }
399 
400  stream_size_type get_steps_left() {
401  return m_stepsLeft;
402  }
403 
408  m_pi = pi;
409  }
410 
415  return m_pi;
416  }
417 
421  STATE get_state() const {
422  return m_state;
423  }
424 
428  void set_state(STATE s) {
429  m_state = s;
430  }
431 
435  resource_type get_resource_being_assigned() const {
436  return m_resourceBeingAssigned;
437  }
438 
442  void set_resource_being_assigned(resource_type type) {
443  m_resourceBeingAssigned = type;
444  }
445 
451  return m_plotOptions;
452  }
453 
459  m_plotOptions = options;
460  }
461 protected:
462 #ifdef _WIN32
463  // Disable warning C4355: 'this' : used in base member initializer list
464  // node_token does not access members of the `node *`,
465  // it merely uses it as a value in the node map.
466  // Only after this node object is completely constructed are node members accessed.
467 #pragma warning( push )
468 #pragma warning( disable : 4355 )
469 #endif // _WIN32
470  node();
474 
478  node(const node & other) = delete;
479  node & operator=(const node & other) = delete;
480 
485  node(node && other);
486  node & operator=(node && other);
487 
491  node(const node_token & token);
492 #ifdef _WIN32
493 #pragma warning( pop )
494 #endif // _WIN32
495 public:
499  void add_push_destination(const node_token & dest);
500 
504  void add_push_destination(const node & dest);
505 
509  void add_pull_source(const node_token & dest);
510 
514  void add_pull_source(const node & dest);
515 
521  void add_dependency(const node_token & dest);
522 
528  void add_dependency(const node & dest);
529 
536  void add_memory_share_dependency(const node_token & dest);
537 
544  void add_memory_share_dependency(const node & dest);
545 
556  // Implementation note: If the type of the `value` parameter is changed
557  // from `T` to `const T &`, this will yield linker errors if an application
558  // attempts to pass a const reference to a static data member inside a
559  // templated class.
560  // See http://stackoverflow.com/a/5392050
562  template <typename T>
563  void forward(std::string key, T value, memory_size_type k = std::numeric_limits<memory_size_type>::max()) {
564  forward_any(key, any_noncopyable(std::move(value)), k);
565  }
566 
570  void forward_any(std::string key, any_noncopyable value, memory_size_type k = std::numeric_limits<memory_size_type>::max());
571 
572 private:
576  void add_forwarded_data(std::string key, node_token::id_t from_node);
577 
581  maybeany_t get_forwarded_data_maybe(std::string key);
582 
583 public:
588  bool can_fetch(std::string key) {
589  return bool(fetch_maybe(key));
590  }
591 
597  maybeany_t fetch_maybe(std::string key);
598 
603  any_noncopyable & fetch_any(std::string key);
604 
608  template <typename T>
609  inline T & fetch(std::string key) {
610  any_noncopyable &item = fetch_any(key);
611  try {
612  return any_cast<T>(item);
613  } catch (const bad_any_noncopyable_cast & e) {
614  std::stringstream ss;
615  ss << "Trying to fetch key '" << key << "' of type "
616  << typeid(T).name() << " but forwarded data was of type "
617  << item.type().name() << ". Message was: " << e.what();
618  throw invalid_argument_exception(ss.str());
619  }
620  }
621 
625  void no_forward_through();
626 
631  const node_token & get_token() const {
632  return token;
633  }
634 
635 public:
640  void set_steps(stream_size_type steps);
641 
642 
643 private:
647  void step_overflow();
648 public:
653  void step(stream_size_type steps = 1) {
654  assert(get_state() == STATE_IN_END ||
655  get_state() == STATE_IN_BEGIN ||
656  get_state() == STATE_AFTER_BEGIN ||
657  get_state() == STATE_IN_END ||
658  get_state() == STATE_IN_GO);
659  if (m_stepsLeft < steps)
660  step_overflow();
661  else
662  m_stepsLeft -= steps;
663  m_pi->step(steps);
664  }
665 
673  progress_indicator_base * proxy_progress_indicator();
674 
675 #ifdef DOXYGEN
676  inline bool can_pull() const;
681 
685  inline item_type pull();
686 
690  inline void push(const item_type & item);
691 #endif
692 
699  void register_datastructure_usage(const std::string & name, double priority=1);
700 
707  void set_datastructure_memory_limits(const std::string & name, memory_size_type min, memory_size_type max=std::numeric_limits<memory_size_type>::max());
708 
713  memory_size_type get_datastructure_memory(const std::string & name);
714 
720  template<typename T>
721  void set_datastructure(const std::string & name, T datastructure) {
722  bits::node_map::datastructuremap_t & structures = get_node_map()->find_authority()->get_datastructures();
723  bits::node_map::datastructuremap_t::iterator i = structures.find(name);
724 
725  if(i == structures.end())
726  throw tpie::exception("attempted to set non-registered datastructure");
727 
728  i->second.second = move_if_movable<T>(datastructure);
729  }
730 
736  template<typename T>
737  T & get_datastructure(const std::string & name) {
738  bits::node_map::datastructuremap_t & structures = get_node_map()->find_authority()->get_datastructures();
739  bits::node_map::datastructuremap_t::iterator i = structures.find(name);
740 
741  if(i == structures.end())
742  throw tpie::exception("attempted to get non-registered datastructure");
743 
744  return any_cast<T>(i->second.second);
745  }
746 
747  void unset_datastructure(const std::string & name) {
748  bits::node_map::datastructuremap_t & structures = get_node_map()->find_authority()->get_datastructures();
749  bits::node_map::datastructuremap_t::iterator i = structures.find(name);
750 
751  if(i == structures.end()) return;
752  i->second.second.reset();
753  }
754 
755 private:
756  struct datastructure_info_t {
757  datastructure_info_t() : min(0), max(std::numeric_limits<memory_size_type>::max()) {}
758  memory_size_type min;
759  memory_size_type max;
760  double priority;
761  };
762 
763  typedef std::map<std::string, datastructure_info_t> datastructuremap_t;
764 
765  const datastructuremap_t & get_datastructures() const {
766  return m_datastructures;
767  }
768 
769 public:
773  size_t buckets() const {return m_buckets.size();}
774 
778  std::unique_ptr<memory_bucket> & bucket(size_t i) {
779  if (m_buckets.size() <= i) m_buckets.resize(i+1);
780  if (!m_buckets[i]) m_buckets[i].reset(new memory_bucket());
781  return m_buckets[i];
782  }
783 
788  return tpie::memory_bucket_ref(bucket(i).get());
789  }
790 
791  friend class bits::memory_runtime;
792 
793  friend class bits::datastructure_runtime;
794 
795  friend class factory_base;
796 
797  friend class bits::pipeline_base;
798 private:
799  node_token token;
800 
801  node_parameters m_parameters;
802  std::vector<std::unique_ptr<memory_bucket> > m_buckets;
803 
804  std::map<std::string, any_noncopyable> m_forwardedFromHere;
805  std::map<std::string, node_token::id_t> m_forwardedToHere;
806 
807  datastructuremap_t m_datastructures;
808  stream_size_type m_stepsLeft;
810  STATE m_state;
811  resource_type m_resourceBeingAssigned = NO_RESOURCE;
812  std::unique_ptr<progress_indicator_base> m_piProxy;
813  flags<PLOT> m_plotOptions;
814 
815  friend class bits::proxy_progress_indicator;
816 };
817 
818 
819 TPIE_DECLARE_OPERATORS_FOR_FLAGS(node::PLOT);
820 
821 } // namespace tpie::pipelining
822 
823 #endif // __TPIE_PIPELINING_NODE_H__
tpie::pipelining
pipelining/factory_base.h Base class of pipelining factories
Definition: ami_glue.h:23
tpie::pipelining::node::STATE
STATE
Used internally to check order of method calls.
Definition: node.h:93
tpie::pipelining::node::begin
virtual void begin()
Begin pipeline processing phase.
Definition: node.h:298
tpie::pipelining::node::evacuate
virtual void evacuate()
Overridden by nodes that have data to evacuate.
Definition: node.h:341
tpie::progress_indicator_base
The base class for indicating the progress of some task.
Definition: progress_indicator_base.h:62
tpie::pipelining::node::~node
virtual ~node()
Virtual dtor.
Definition: node.h:109
tpie::pipelining::node::set_resource_being_assigned
void set_resource_being_assigned(resource_type type)
Used internally to check order of method calls.
Definition: node.h:442
tpie::pipelining::node::prepare
virtual void prepare()
Called before memory assignment but after depending phases have executed and ended.
Definition: node.h:268
tpie::pipelining::not_initiator_node
Definition: exception.h:34
tpie::pipelining::node::go
virtual void go()
For initiator nodes, execute this phase by pushing all items to be pushed.
Definition: node.h:308
tpie::pipelining::node::get_plot_options
flags< PLOT > get_plot_options() const
Get options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:450
tpie::pipelining::node::get_minimum_resource_usage
memory_size_type get_minimum_resource_usage(resource_type type) const
Get the minimum amount of the resource declared by this node.
Definition: node.h:115
tpie::pipelining::node::get_maximum_memory
memory_size_type get_maximum_memory() const
Get the maximum amount of memory declared by this node.
Definition: node.h:185
tpie::pipelining::node::get_datastructure
T & get_datastructure(const std::string &name)
Returns a previously declared datastructure.
Definition: node.h:737
tpie::pipelining::node::get_memory_fraction
double get_memory_fraction() const
Get the memory priority of this node.
Definition: node.h:192
tpie::pipelining::node::set_state
void set_state(STATE s)
Used internally to check order of method calls.
Definition: node.h:428
tpie::pipelining::node::get_steps
stream_size_type get_steps()
Used internally for progress indication.
Definition: node.h:396
tpie::pipelining::node_resource_parameters
Definition: node.h:51
tpie::pipelining::node::set_minimum_memory
void set_minimum_memory(memory_size_type minimumMemory)
Called by implementers to declare minimum memory requirements.
Definition: node.h:206
tpie::pipelining::node::buckets
size_t buckets() const
Count the number of memory buckets.
Definition: node.h:773
tpie::pipelining::node::get_node_map
bits::node_map::ptr get_node_map() const
Get the local node map, mapping node IDs to node pointers for all the nodes reachable from this one.
Definition: node.h:250
tpie::pipelining::node::get_available_of_resource
memory_size_type get_available_of_resource(resource_type type) const
Get the amount of the specific resource assigned to this node.
Definition: node.h:137
tpie::memory_bucket
Bucket used for memory counting.
Definition: memory.h:320
tpie::memory_bucket_ref
Class storring a reference to a memory bucket.
Definition: memory.h:334
tpie::pipelining::node::get_progress_indicator
progress_indicator_base * get_progress_indicator()
Used internally. Get the progress indicator used.
Definition: node.h:414
tpie::pipelining::node::set_plot_options
void set_plot_options(flags< PLOT > options)
Set options specified for plot(), as a combination of node::PLOT values.
Definition: node.h:458
tpie::pipelining::node::allocator
tpie::memory_bucket_ref allocator(size_t i=0)
Return an allocator that counts memory usage within the node.
Definition: node.h:787
tpie::pipelining::factory_base
Base class of all pipelining factories.
Definition: factory_base.h:73
tpie::exception
Definition: exception.h:33
tpie::pipelining::node::forward
void forward(std::string key, T value, memory_size_type k=std::numeric_limits< memory_size_type >::max())
Called by implementers to forward auxiliary data to successors.
Definition: node.h:563
tpie::pipelining::node_parameters
Definition: node.h:59
tpie::pipelining::node::set_breadcrumb
void set_breadcrumb(const std::string &breadcrumb)
Used internally when a pair_factory has a name set.
Definition: node.h:388
tpie::pipelining::node::set_progress_indicator
void set_progress_indicator(progress_indicator_base *pi)
Used internally. Set the progress indicator to use.
Definition: node.h:407
tpie::pipelining::node::get_name_priority
priority_type get_name_priority()
Get the priority of this node's name.
Definition: node.h:348
tpie::invalid_argument_exception
Definition: exception.h:41
tpie::pipelining::node::set_available_memory
virtual void set_available_memory(memory_size_type availableMemory)
Called by the memory manager to set the amount of memory assigned to this node.
Definition: node.h:232
tpie::unused
void unused(const T &x)
Declare that a variable is unused on purpose.
Definition: util.h:42
tpie::log_warning
logstream & log_warning()
Return logstream for writing warning log messages.
Definition: tpie_log.h:158
tpie::pipelining::node::set_maximum_memory
void set_maximum_memory(memory_size_type maximumMemory)
Called by implementers to declare maximum memory requirements.
Definition: node.h:216
tpie::pipelining::node::get_resource_fraction
double get_resource_fraction(resource_type type) const
Get the priority for the specific resource of this node.
Definition: node.h:130
tpie::flags< PLOT >
tpie::pipelining::bits::proxy_progress_indicator
Definition: node.h:40
tpie::pipelining::node::set_datastructure
void set_datastructure(const std::string &name, T datastructure)
Returns a previously declared datastructure.
Definition: node.h:721
tpie::pipelining::node::get_maximum_resource_usage
memory_size_type get_maximum_resource_usage(resource_type type) const
Get the maximum amount of the resource declared by this node.
Definition: node.h:123
progress_indicator_null.h
tpie::pipelining::bad_any_noncopyable_cast
Definition: container.h:190
tpie::pipelining::node::step
void step(stream_size_type steps=1)
Step the progress indicator.
Definition: node.h:653
tpie::pipelining::bits::pipeline_base
Definition: pipeline.h:109
tpie::pipelining::node::bucket
std::unique_ptr< memory_bucket > & bucket(size_t i)
Access a memory bucket.
Definition: node.h:778
tpie::pipelining::node::PLOT
PLOT
Options for how to plot this node.
Definition: node.h:84
tpie::pipelining::node::end
virtual void end()
End pipeline processing phase.
Definition: node.h:328
tpie::pipelining::node::set_memory_fraction
void set_memory_fraction(double f)
Set the memory priority of this node.
Definition: node.h:224
tokens.h
predeclare.h
resources.h
tpie::pipelining::node::resource_available_changed
virtual void resource_available_changed(resource_type, memory_size_type)
Called by the resource manager to notify the node's available amount of resource has changed.
Definition: node.h:165
tpie::pipelining::node
Base class of all nodes.
Definition: node.h:77
tpie::pipelining::node::get_phase_name_priority
priority_type get_phase_name_priority()
Get the priority of this node's pdane name.
Definition: node.h:368
tpie::pipelining::node::propagate
virtual void propagate()
Propagate stream metadata.
Definition: node.h:282
tpie::pipelining::node::get_state
STATE get_state() const
Used internally to check order of method calls.
Definition: node.h:421
progress_indicator_base.h
tpie::pipelining::node::fetch
T & fetch(std::string key)
Fetch piece of auxiliary data, expecting a given value type.
Definition: node.h:609
tpie::pipelining::node::get_minimum_memory
memory_size_type get_minimum_memory() const
Get the minimum amount of memory declared by this node.
Definition: node.h:177
tpie::pipelining::node::get_token
const node_token & get_token() const
Get the node_token that maps this node's ID to a pointer to this.
Definition: node.h:631
tpie::pipelining::node::get_used_memory
memory_size_type get_used_memory() const
Get the amount of memory currently used by this node.
Definition: node.h:239
tpie::pipelining::node::get_available_memory
memory_size_type get_available_memory() const
Get the amount of memory assigned to this node.
Definition: node.h:199
tpie::pipelining::node::get_resource_being_assigned
resource_type get_resource_being_assigned() const
Used internally to check order of method calls.
Definition: node.h:435
tpie::pipelining::node_token
Definition: tokens.h:292
tpie::pipelining::node::can_evacuate
virtual bool can_evacuate()
Overridden by nodes that have data to evacuate.
Definition: node.h:334
tpie::pipelining::item_type
pipe_middle< tfactory< bits::item_type_t, Args< T > > > item_type()
Create item type defining identity pipe node.
Definition: helpers.h:654
memory.h
tpie::pipelining::node::get_id
node_token::id_t get_id() const
Get the internal node ID of this node (mainly for debugging purposes).
Definition: node.h:258
tpie::pipelining::any_noncopyable
Definition: container.h:195
tpie::pipelining::node::can_fetch
bool can_fetch(std::string key)
Find out if there is a piece of auxiliary data forwarded with a given name.
Definition: node.h:588