parallelizer.h
Go to the documentation of this file.
1 // (C) Copyright Renaud Detry 2007-2015.
2 // Distributed under the GNU General Public License and under the
3 // BSD 3-Clause License (See accompanying file LICENSE.txt).
4 
5 /** @file */
6 
7 #ifndef NUKLEI_PARALLELIZER_H
8 #define NUKLEI_PARALLELIZER_H
9 
10 #include <nuklei/Random.h>
11 #include <nuklei/Common.h>
14 
15 #include <cstdlib>
16 #include <boost/filesystem.hpp>
17 #include <boost/asio.hpp>
18 #include <boost/thread.hpp>
19 
20 namespace nuklei {
21 
22  template<typename R, typename Callable, typename PrintAccessor>
23  std::vector<R> parallelizer::run_openmp(Callable callable,
24  PrintAccessor pa) const
25  {
26  std::vector<R> retv;
27 #ifdef _OPENMP
28 #pragma omp parallel for
29 #endif
30  for (int i = 0; i < n_; ++i)
31  {
32  R tmp = callable();
33 #ifdef _OPENMP
34 #pragma omp critical(nuklei_parallelizer_merge)
35 #endif
36  {
37  retv.push_back(tmp);
38  NUKLEI_INFO("Finished OpenMP thread " << i << " with value "
39  << pa(tmp) << ".");
40  }
41  }
42  return retv;
43  }
44 
45  template<typename R, typename Callable, typename PrintAccessor>
46  std::vector<R> parallelizer::run_fork(Callable callable,
47  PrintAccessor pa) const
48  {
49  boost::filesystem::path endpoint_name = boost::filesystem::unique_path("/tmp/nuklei-%%%%-%%%%-%%%%-%%%%");
50  //std::vector<pid_t> pids(n_, 0);
51  std::vector<R> retv;
52  for (int i = 0; i < n_; i++)
53  {
54  pid_t pid = fork();
55  NUKLEI_ASSERT(pid >= 0);
56  if (pid == 0)
57  {
58  Random::seed(seed_+i); // unsigned overflow wraps around.
59 
60  using boost::asio::local::stream_protocol;
61 
62  R tmp = callable();
63 
64  {
65  stream_protocol::endpoint ep(endpoint_name.native());
66  stream_protocol::iostream stream(ep);
67  NUKLEI_SERIALIZATION_BINARY_OARCHIVE oa(stream);
68  oa & i & NUKLEI_SERIALIZATION_NVP(tmp);
69  }
70 
71  _exit(0);
72  }
73  else
74  {
75  //pids.at(i) = pid;
76  }
77  }
78 
79  using boost::asio::local::stream_protocol;
80  stream_protocol::endpoint ep(endpoint_name.native());
81  boost::asio::io_service io_service;
82  stream_protocol::acceptor acceptor(io_service, ep);
83 
84  for (int i = 0; i < n_; i++)
85  {
86  R tmp;
87  int fork_i = 0;
88  {
89  stream_protocol::iostream stream;
90  acceptor.accept(*stream.rdbuf());
91  NUKLEI_SERIALIZATION_BINARY_IARCHIVE ia(stream);
92  ia & fork_i & NUKLEI_SERIALIZATION_NVP(tmp);
93  }
94  retv.push_back(tmp);
95 
96  NUKLEI_INFO("Finished fork " << fork_i << " with value "
97  << pa(tmp) << ".");
98  }
99 
100  // clean up:
101  boost::filesystem::remove(endpoint_name);
102  reap();
103 
104  return retv;
105  }
106 
107  template<typename R, typename Callable, typename PrintAccessor>
108  std::vector<R> parallelizer::run_pthread(Callable callable,
109  PrintAccessor pa) const
110  {
111  std::vector<R> retv(n_);
112  std::vector< boost::shared_ptr<boost::thread> > threads;
113  for (int i = 0; i < n_; ++i)
114  {
115  // This does not work without the first boost::ref (even if bind<void>
116  // is used. The solution is to parametrize run_pthread_stub with
117  // boost::_bi::protected_bind_t<C>, and boost::protect(callable),
118  // but _bi is not public.
119  //boost::shared_ptr<boost::thread> thrd
120  //(new boost::thread(boost::bind(run_pthread_stub<R, Callable>,
121  // boost::ref(callable),
122  // boost::ref(retv.at(i)))));
123  // For future ref, here's the helper function:
124  //template<typename R, typename Callable>
125  //void run_pthread_stub(Callable callable, R& ret)
126  //{
127  // ret = callable();
128  //}
129 
130  boost::shared_ptr<boost::thread> thread
131  (new boost::thread
132  (boost::bind<void>(pthread_wrapper<R, Callable>(callable),
133  boost::ref(retv.at(i)))));
134  threads.push_back(thread);
135  }
136  for (int i = 0; i < n_; ++i)
137  {
138  threads.at(i)->join();
139  NUKLEI_INFO("Finished thread " << i << " with value "
140  << pa(retv.at(i)) << ".");
141  }
142  return retv;
143  }
144 
145  template<typename R, typename Callable, typename PrintAccessor>
146  std::vector<R> parallelizer::run_single(Callable callable,
147  PrintAccessor pa) const
148  {
149  std::vector<R> retv;
150  for (int i = 0; i < n_; ++i)
151  {
152  R tmp = callable();
153  retv.push_back(tmp);
154  NUKLEI_INFO("Finished slice " << i << " with value "
155  << pa(tmp) << ".");
156  }
157  return retv;
158  }
159 
160 }
161 
162 #endif
Public namespace.
Definition: Color.cpp:9
#define NUKLEI_ASSERT(expression)
Throws an Error if expression is not true.
Definition: Common.h:113
static void seed(unsigned s)
Seeds the random generators with s.
Definition: Random.cpp:135
© Copyright 2007-2013 Renaud Detry.
Distributed under the terms of the GNU General Public License (GPL).
(See accompanying file LICENSE.txt or copy at http://www.gnu.org/copyleft/gpl.html.)
Revised Sun Sep 13 2020 19:10:06.