Logo ROOT   6.10/00
Reference Guide
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
TProcessExecutor.hxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #ifndef ROOT_TProcessExecutor
14 #define ROOT_TProcessExecutor
15 
16 #include "MPCode.h"
17 #include "MPSendRecv.h"
18 #include "PoolUtils.h"
19 #include "TChain.h"
20 #include "TChainElement.h"
21 #include "TError.h"
22 #include "TFileCollection.h"
23 #include "TFileInfo.h"
24 #include "THashList.h"
25 #include "TMPClient.h"
26 #include "ROOT/TExecutor.hxx"
27 #include "TMPWorkerExecutor.h"
28 #include <algorithm> //std::generate
29 #include <numeric> //std::iota
30 #include <string>
31 #include <type_traits> //std::result_of, std::enable_if
32 #include <functional> //std::reference_wrapper
33 #include <vector>
34 
35 namespace ROOT {
36 
37 class TProcessExecutor : public TExecutor<TProcessExecutor>, private TMPClient {
38 public:
39  explicit TProcessExecutor(unsigned nWorkers = 0); //default number of workers is the number of processors
40  ~TProcessExecutor() = default;
41  //it doesn't make sense for a TProcessExecutor to be copied
42  TProcessExecutor(const TProcessExecutor &) = delete;
43  TProcessExecutor &operator=(const TProcessExecutor &) = delete;
44 
45  // Map
47  template<class F, class Cond = noReferenceCond<F>>
48  auto Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
49  template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
51  template<class F, class T, class Cond = noReferenceCond<F, T>>
52  auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
53 
54  void SetNWorkers(unsigned n) { TMPClient::SetNWorkers(n); }
55  unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
56 
58  template<class T, class R> T Reduce(const std::vector<T> &objs, R redfunc);
59 
60 private:
61  template<class T> void Collect(std::vector<T> &reslist);
62  template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
63 
64  void Reset();
65  void ReplyToFuncResult(TSocket *s);
66  void ReplyToIdle(TSocket *s);
67 
68  unsigned fNProcessed; ///< number of arguments already passed to the workers
69  unsigned fNToProcess; ///< total number of arguments to pass to the workers
70 
71  /// A collection of the types of tasks that TProcessExecutor can execute.
72  /// It is used to interpret in the right way and properly reply to the
73  /// messages received (see, for example, TProcessExecutor::HandleInput)
74  enum class ETask : unsigned char {
75  kNoTask, ///< no task is being executed
76  kMap, ///< a Map method with no arguments is being executed
77  kMapWithArg ///< a Map method with arguments is being executed
78  };
79 
80  ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
81 };
82 
83 
84 /************ TEMPLATE METHODS IMPLEMENTATION ******************/
85 
86 //////////////////////////////////////////////////////////////////////////
87 /// Execute func (with no arguments) nTimes in parallel.
88 /// A vector containg executions' results is returned.
89 /// Functions that take more than zero arguments can be executed (with
90 /// fixed arguments) by wrapping them in a lambda or with std::bind.
91 template<class F, class Cond>
93 {
94  using retType = decltype(func());
95  //prepare environment
96  Reset();
97  fTaskType = ETask::kMap;
98 
99  //fork max(nTimes, fNWorkers) times
100  unsigned oldNWorkers = GetNWorkers();
101  if (nTimes < oldNWorkers)
102  SetNWorkers(nTimes);
103  TMPWorkerExecutor<F> worker(func);
104  bool ok = Fork(worker);
105  SetNWorkers(oldNWorkers);
106  if (!ok)
107  {
108  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
109  return std::vector<retType>();
110  }
111 
112  //give out tasks
113  fNToProcess = nTimes;
114  std::vector<retType> reslist;
115  reslist.reserve(fNToProcess);
116  fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
117 
118  //collect results, give out other tasks if needed
119  Collect(reslist);
120 
121  //clean-up and return
122  ReapWorkers();
123  fTaskType = ETask::kNoTask;
124  return reslist;
125 }
126 
127 //////////////////////////////////////////////////////////////////////////
128 /// Execute func in parallel, taking an element of an
129 /// std::vector as argument.
130 /// A vector containg executions' results is returned.
131 // actual implementation of the Map method. all other calls with arguments eventually
132 // call this one
133 template<class F, class T, class Cond>
135 {
136  //check whether func is callable
137  using retType = decltype(func(args.front()));
138  //prepare environment
139  Reset();
140  fTaskType = ETask::kMapWithArg;
141 
142  //fork max(args.size(), fNWorkers) times
143  //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
144  unsigned oldNWorkers = GetNWorkers();
145  if (args.size() < oldNWorkers)
146  SetNWorkers(args.size());
147  TMPWorkerExecutor<F, T> worker(func, args);
148  bool ok = Fork(worker);
149  SetNWorkers(oldNWorkers);
150  if (!ok)
151  {
152  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
153  return std::vector<retType>();
154  }
155 
156  //give out tasks
157  fNToProcess = args.size();
158  std::vector<retType> reslist;
159  reslist.reserve(fNToProcess);
160  std::vector<unsigned> range(fNToProcess);
161  std::iota(range.begin(), range.end(), 0);
162  fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
163 
164  //collect results, give out other tasks if needed
165  Collect(reslist);
166 
167  //clean-up and return
168  ReapWorkers();
169  fTaskType = ETask::kNoTask;
170  return reslist;
171 }
172 
173 //////////////////////////////////////////////////////////////////////////
174 /// Execute func in parallel, taking an element of a
175 /// sequence as argument.
176 /// A vector containg executions' results is returned.
177 template<class F, class INTEGER, class Cond>
179 {
180  std::vector<INTEGER> vargs(args.size());
181  std::copy(args.begin(), args.end(), vargs.begin());
182  const auto &reslist = Map(func, vargs);
183  return reslist;
184 }
185 
186 //////////////////////////////////////////////////////////////////////////
187 /// "Reduce" an std::vector into a single object by passing a
188 /// function as the second argument defining the reduction operation.
189 template<class T, class R>
190 T TProcessExecutor::Reduce(const std::vector<T> &objs, R redfunc)
191 {
192  // check we can apply reduce to objs
193  static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
194  return redfunc(objs);
195 }
196 
197 //////////////////////////////////////////////////////////////////////////
198 /// Handle message and reply to the worker
199 template<class T>
200 void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
201 {
202  unsigned code = msg.first;
203  if (code == MPCode::kFuncResult) {
204  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
206  } else if (code == MPCode::kIdling) {
207  ReplyToIdle(s);
208  } else if(code == MPCode::kProcResult) {
209  if(msg.second != nullptr)
210  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
212  } else if(code == MPCode::kProcError) {
213  const char *str = ReadBuffer<const char*>(msg.second.get());
214  Error("TProcessExecutor::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
215  "Continuing execution ignoring these entries.", str);
216  ReplyToIdle(s);
217  delete [] str;
218  } else {
219  // UNKNOWN CODE
220  Error("TProcessExecutor::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
221  }
222 }
223 
224 //////////////////////////////////////////////////////////////////////////
225 /// Listen for messages sent by the workers and call the appropriate handler function.
226 /// TProcessExecutor::HandlePoolCode is called on messages with a code < 1000 and
227 /// TMPClient::HandleMPCode is called on messages with a code >= 1000.
228 template<class T>
229 void TProcessExecutor::Collect(std::vector<T> &reslist)
230 {
231  TMonitor &mon = GetMonitor();
232  mon.ActivateAll();
233  while (mon.GetActive() > 0) {
234  TSocket *s = mon.Select();
235  MPCodeBufPair msg = MPRecv(s);
236  if (msg.first == MPCode::kRecvError) {
237  Error("TProcessExecutor::Collect", "[E][C] Lost connection to a worker");
238  Remove(s);
239  } else if (msg.first < 1000)
240  HandlePoolCode(msg, s, reslist);
241  else
242  HandleMPCode(msg, s);
243  }
244 }
245 
246 } // ROOT namespace
247 
248 #endif
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
void SetNWorkers(unsigned n)
This class works together with TProcessExecutor to allow the execution of functions in server process...
double T(double x)
Definition: ChebyshevPol.h:34
TProcessExecutor & operator=(const TProcessExecutor &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
Error while reading from the socket.
Definition: MPCode.h:51
This class defines an interface to execute the same task multiple times in parallel, possibly with different arguments every time.
Definition: TExecutor.hxx:61
unsigned fNProcessed
number of arguments already passed to the workers
unsigned GetNWorkers() const
Definition: TMPClient.h:40
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:295
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
a Map method with arguments is being executed
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
The message contains the result of the processing of a TTree.
Definition: MPCode.h:42
void Reset()
Reset TProcessExecutor&#39;s state.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
#define F(x, y, z)
The message contains the result of a function execution.
Definition: MPCode.h:33
This class provides a simple interface to execute the same task multiple times in parallel...
Execute function with the argument contained in the message.
Definition: MPCode.h:32
unsigned GetNWorkers() const
void Reset(Detail::TBranchProxy *x)
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
no task is being executed
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
unsigned fNToProcess
total number of arguments to pass to the workers
A pseudo container class which is a generator of indices.
Definition: TSeq.hxx:66
int type
Definition: TGX11.cxx:120
double func(double *x, double *p)
Definition: stressTF1.cxx:213
Base class for multiprocess applications&#39; clients.
Definition: TMPClient.h:23
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition: TMPClient.h:39
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:329
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
Tell the client there was an error while processing.
Definition: MPCode.h:44
ETask
A collection of the types of tasks that TProcessExecutor can execute.
We are ready for the next task.
Definition: MPCode.h:35
const Int_t n
Definition: legend1.C:16
TMonitor & GetMonitor()
Definition: TMPClient.h:36
TRandom3 R
a TMatrixD.
Definition: testIO.cxx:28
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.
Execute function without arguments.
Definition: MPCode.h:31
T Reduce(const std::vector< T > &objs, R redfunc)
&quot;Reduce&quot; an std::vector into a single object by passing a function as the second argument defining th...