TPIE

11a2c2d
tokens.h
Go to the documentation of this file.
1 // -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*-
2 // vi:set ts=4 sts=4 sw=4 noet :
3 // Copyright 2012, The TPIE development team
4 //
5 // This file is part of TPIE.
6 //
7 // TPIE is free software: you can redistribute it and/or modify it under
8 // the terms of the GNU Lesser General Public License as published by the
9 // Free Software Foundation, either version 3 of the License, or (at your
10 // option) any later version.
11 //
12 // TPIE is distributed in the hope that it will be useful, but WITHOUT ANY
13 // WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 // FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
15 // License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with TPIE. If not, see <http://www.gnu.org/licenses/>
19 
70 
71 #ifndef __TPIE_PIPELINING_TOKENS_H__
72 #define __TPIE_PIPELINING_TOKENS_H__
73 
74 #include <tpie/tpie_export.h>
75 #include <tpie/exception.h>
76 #include <tpie/pipelining/exception.h>
78 #include <tpie/pipelining/container.h>
79 #include <tpie/types.h>
80 #include <tpie/tpie_assert.h>
81 #include <map>
82 #include <vector>
83 #include <iostream>
84 #include <boost/intrusive_ptr.hpp>
85 #include <boost/optional.hpp>
86 #include <unordered_map>
87 #include <unordered_set>
88 
89 namespace tpie::pipelining {
90 namespace bits {
91 
92 enum node_relation {
93  pushes,
94  pulls,
95  depends,
96  no_forward_depends,
97  memory_share_depends
98 };
99 
100 class TPIE_EXPORT node_map {
101 public:
102  typedef uint64_t id_t;
103  typedef node * val_t;
104 
105  typedef std::map<id_t, val_t> map_t;
106  typedef map_t::const_iterator mapit;
107 
108  typedef std::multimap<id_t, std::pair<id_t, node_relation> > relmap_t;
109  typedef relmap_t::const_iterator relmapit;
110 
111  typedef std::unordered_map<std::string, std::pair<memory_size_type, any_noncopyable> > datastructuremap_t;
112 
113  typedef boost::optional<any_noncopyable &> maybeany_t;
114  typedef std::unordered_map<std::string, any_noncopyable> forwardmap_t;
115 
116  typedef boost::intrusive_ptr<node_map> ptr;
117  typedef std::unique_ptr<node> owned_ptr;
118 
120  id_t from;
121  std::string key;
122  any_noncopyable value;
123  };
124 
125  static ptr create() {
126  return ptr(new node_map);
127  }
128 
129  id_t add_token(val_t token) {
130  id_t id = nextId++;
131  set_token(id, token);
132  return id;
133  }
134 
135  void set_token(id_t id, val_t token) {
136  assert_authoritative();
137  m_tokens[id] = token;
138  }
139 
140  // union-find link
141  void link(ptr target);
142 
143  void union_set(ptr target) {
144  find_authority()->link(target->find_authority());
145  }
146 
147  val_t get(id_t id) const {
148  mapit i = m_tokens.find(id);
149  if (i == m_tokens.end()) return 0;
150  return i->second;
151  }
152 
153  mapit begin() const {
154  return m_tokens.begin();
155  }
156 
157  mapit end() const {
158  return m_tokens.end();
159  }
160 
161  size_t size() const {
162  return m_tokens.size();
163  }
164 
165  void no_forward_through(id_t id) {
166  m_noForwardThrough.insert(id);
167  }
168 
169  // union-find
170  ptr find_authority();
171 
172  void add_relation(id_t from, id_t to, node_relation rel);
173 
174  const relmap_t & get_relations() const {
175  return m_relations;
176  }
177 
178  const datastructuremap_t & get_datastructures() const {
179  return m_datastructures;
180  }
181 
182  datastructuremap_t & get_datastructures() {
183  return m_datastructures;
184  }
185 
186  size_t in_degree(id_t from, node_relation rel) const {
187  return out_degree(m_relationsInv, from, rel);
188  }
189 
190  size_t out_degree(id_t from, node_relation rel) const {
191  return out_degree(m_relations, from, rel);
192  }
193 
194  size_t out_degree(id_t from) const {
195  return out_degree(m_relations, from);
196  }
197 
198  void assert_authoritative() const {
199  if (m_authority) throw non_authoritative_node_map();
200  }
201 
202  void dump(std::ostream & os = std::cout) const;
203 
208  void get_successors(id_t from, std::vector<id_t> & successors, memory_size_type k, bool forward_only=false);
209 
210  void forward(std::string key, any_noncopyable value) {
211  m_pipelineForwards[key] = std::move(value);
212  }
213 
214  maybeany_t fetch_maybe(std::string key) {
215  auto it = m_pipelineForwards.find(key);
216  if (it == m_pipelineForwards.end()) {
217  return maybeany_t();
218  }
219  return maybeany_t(it->second);
220  }
221 
222  void forward_from_pipe_base(id_t from, std::string key, any_noncopyable value) {
223  m_pipeBaseForwards.push_back({from, key, std::move(value)});
224  }
225 
226  void forward_pipe_base_forwards();
227 
228  friend void intrusive_ptr_add_ref(node_map * m) {
229  m->m_refCnt++;
230  }
231 
232  friend void intrusive_ptr_release(node_map * m) {
233  m->m_refCnt--;
234  if (m->m_refCnt == 0) delete m;
235  }
236 
237  void increment_pipeline_ref() {
238  assert_authoritative();
239  m_pipelineRefCnt++;
240  }
241 
242  void decrement_pipeline_ref() {
243  assert_authoritative();
244  m_pipelineRefCnt--;
245  if (m_pipelineRefCnt == 0) {
246  // Make sure we are not dealloc while cleaning up owned nodes
247  ptr thisPtr(this);
248 
249  // Dealloc owned nodes as no pipeline references this node map anymore
250  m_ownedNodes.clear();
251  }
252  }
253 
254  void add_owned_node(owned_ptr p) {
255  m_ownedNodes.push_back(std::move(p));
256  }
257 
258 private:
259  map_t m_tokens;
260  size_t m_refCnt;
261  size_t m_pipelineRefCnt;
262  relmap_t m_relations;
263  relmap_t m_relationsInv;
264  datastructuremap_t m_datastructures;
265  forwardmap_t m_pipelineForwards;
266  std::unordered_set<id_t> m_noForwardThrough;
267  std::vector<pipe_base_forward_t> m_pipeBaseForwards;
268  std::vector<owned_ptr> m_ownedNodes;
269 
270  size_t out_degree(const relmap_t & map, id_t from, node_relation rel) const;
271  size_t out_degree(const relmap_t & map, id_t from) const;
272 
273  // union rank structure
274  ptr m_authority;
275  size_t m_rank;
276 
277  node_map()
278  : m_refCnt(0)
279  , m_pipelineRefCnt(0)
280  , m_rank(0)
281  {
282  }
283 
284  inline node_map(const node_map &);
285  inline node_map & operator=(const node_map &);
286 
287  static id_t nextId;
288 };
289 
290 } // namespace bits
291 
292 class TPIE_EXPORT node_token {
293 public:
294  typedef bits::node_map::id_t id_t;
296 
297  // Use for the simple case in which a node owns its own token
298  explicit node_token(val_t owner)
299  : m_tokens(bits::node_map::create())
300  , m_id(m_tokens->add_token(owner))
301  , m_free(false)
302  {
303  }
304 
305  // This copy constructor has two uses:
306  // 1. Simple case when a node is copied (freshToken = false)
307  // 2. Advanced case when a node is being constructed with a specific token (freshToken = true)
308  inline node_token(const node_token & other, val_t newOwner, bool freshToken = false)
309  : m_tokens(other.m_tokens->find_authority())
310  , m_id(other.id())
311  , m_free(false)
312  {
313  if (freshToken) {
314  if (!other.m_free)
315  throw exception("Trying to take ownership of a non-free token");
316  if (m_tokens->get(m_id) != 0)
317  throw exception("A token already has an owner, but m_free is true - contradiction");
318  } else {
319  if (other.m_free)
320  throw exception("Trying to copy a free token");
321  }
322  m_tokens->set_token(m_id, newOwner);
323  }
324 
325  void assign(const node_token & other, val_t newOwner, bool freshToken = false) {
326  m_tokens = other.m_tokens->find_authority();
327  m_id = other.id();
328  m_free = false;
329  if (freshToken) {
330  if (!other.m_free)
331  throw exception("Trying to take ownership of a non-free token");
332  if (m_tokens->get(m_id) != 0)
333  throw exception("A token already has an owner, but m_free is true - contradiction");
334  } else {
335  if (other.m_free)
336  throw exception("Trying to copy a free token");
337  }
338  m_tokens->set_token(m_id, newOwner);
339  }
340 
341  // Use for the advanced case when a node_token is allocated before the node
342  node_token()
343  : m_tokens(bits::node_map::create())
344  , m_id(m_tokens->add_token(0))
345  , m_free(true)
346  {
347  }
348 
349  id_t id() const { return m_id; }
350 
351  bits::node_map::ptr map_union(const node_token & with) {
352  if (m_tokens != with.m_tokens)
353  m_tokens->union_set(with.m_tokens);
354  return m_tokens = m_tokens->find_authority();
355  }
356 
357  bits::node_map::ptr get_map() const {
358  return m_tokens;
359  }
360 
361  val_t get() const {
362  return m_tokens->get(m_id);
363  }
364 
365 private:
366  bits::node_map::ptr m_tokens;
367  id_t m_id;
368  bool m_free;
369 };
370 
371 } // namespace tpie::pipelining
372 
373 #endif // __TPIE_PIPELINING_TOKENS_H__
tpie::pipelining
pipelining/factory_base.h Base class of pipelining factories
Definition: ami_glue.h:23
tpie_assert.h
types.h
tpie::exception
Definition: exception.h:33
tpie::pipelining::bits::node_map::pipe_base_forward_t
Definition: tokens.h:119
tpie::pipelining::map
pipe_middle< tfactory< bits::map_t, Args< F >, F > > map(const F &functor)
Pipelining nodes that applies to given functor to items in the stream.
Definition: map.h:154
predeclare.h
tpie::pipelining::node
Base class of all nodes.
Definition: node.h:77
exception.h
tpie::pipelining::bits::node_map
Definition: tokens.h:100
tpie::pipelining::node_token
Definition: tokens.h:292
tpie::pipelining::any_noncopyable
Definition: container.h:195