Logo ROOT   6.10/00
Reference Guide
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
TDFNodes.cxx
Go to the documentation of this file.
1 // Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
2 
3 /*************************************************************************
4  * Copyright (C) 1995-2016, Rene Brun and Fons Rademakers. *
5  * All rights reserved. *
6  * *
7  * For the licensing terms see $ROOTSYS/LICENSE. *
8  * For the list of contributors see $ROOTSYS/README/CREDITS. *
9  *************************************************************************/
10 
11 #include "RConfigure.h" // R__USE_IMT
12 #include "ROOT/TDFNodes.hxx"
13 #include "ROOT/TSpinMutex.hxx"
15 #ifdef R__USE_IMT
16 #include "ROOT/TThreadExecutor.hxx"
17 #endif
18 #include "RtypesCore.h" // Long64_t
19 #include "TROOT.h" // IsImplicitMTEnabled
20 #include "TTreeReader.h"
21 
22 #include <cassert>
23 #include <mutex>
24 #include <numeric> // std::accumulate
25 #include <string>
26 class TDirectory;
27 class TTree;
28 using namespace ROOT::Detail::TDF;
29 using namespace ROOT::Internal::TDF;
30 
31 namespace ROOT {
32 namespace Internal {
33 namespace TDF {
34 
35 TActionBase::TActionBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches)
36  : fImplPtr(implPtr), fTmpBranches(tmpBranches)
37 {
38 }
39 
40 } // end NS TDF
41 } // end NS Internal
42 } // end NS ROOT
43 
44 TCustomColumnBase::TCustomColumnBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, std::string_view name)
45  : fImplPtr(implPtr), fTmpBranches(tmpBranches), fName(name){};
46 
47 ColumnNames_t TCustomColumnBase::GetTmpBranches() const
48 {
49  return fTmpBranches;
50 }
51 
52 std::string TCustomColumnBase::GetName() const
53 {
54  return fName;
55 }
56 
58 {
59  return fImplPtr;
60 }
61 
62 TFilterBase::TFilterBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, std::string_view name)
63  : fImplPtr(implPtr), fTmpBranches(tmpBranches), fName(name){};
64 
66 {
67  return fImplPtr;
68 }
69 
70 ColumnNames_t TFilterBase::GetTmpBranches() const
71 {
72  return fTmpBranches;
73 }
74 
76 {
77  return !fName.empty();
78 };
79 
81 {
82  if (fName.empty()) // PrintReport is no-op for unnamed filters
83  return;
84  const auto accepted = std::accumulate(fAccepted.begin(), fAccepted.end(), 0ULL);
85  const auto all = accepted + std::accumulate(fRejected.begin(), fRejected.end(), 0ULL);
86  double perc = accepted;
87  if (all > 0) perc /= all;
88  perc *= 100.;
89  Printf("%-10s: pass=%-10lld all=%-10lld -- %8.3f %%", fName.c_str(), accepted, all, perc);
90 }
91 
92 // This is an helper class to allow to pick a slot without resorting to a map
93 // indexed by thread ids.
94 // WARNING: this class does not work as a regular stack. The size is
95 // fixed at construction time and no blocking is foreseen.
96 class TSlotStack {
97 private:
98  unsigned int fCursor;
99  std::vector<unsigned int> fBuf;
100  ROOT::TSpinMutex fMutex;
101 
102 public:
103  TSlotStack() = delete;
104  TSlotStack(unsigned int size) : fCursor(size), fBuf(size) { std::iota(fBuf.begin(), fBuf.end(), 0U); }
105  void Push(unsigned int slotNumber);
106  unsigned int Pop();
107 };
108 
109 void TSlotStack::Push(unsigned int slotNumber)
110 {
111  std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
112  fBuf[fCursor++] = slotNumber;
113  assert(fCursor <= fBuf.size() && "TSlotStack assumes that at most a fixed number of values can be present in the "
114  "stack. fCursor is greater than the size of the internal buffer. This violates "
115  "such assumption.");
116 }
117 
118 unsigned int TSlotStack::Pop()
119 {
120  assert(fCursor > 0 &&
121  "TSlotStack assumes that a value can be always popped. fCursor is <=0 and this violates such assumption.");
122  std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
123  return fBuf[--fCursor];
124 }
125 
126 TLoopManager::TLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
127  : fTree(std::shared_ptr<TTree>(tree, [](TTree *) {})), fDefaultBranches(defaultBranches),
129 {
130 }
131 
132 TLoopManager::TLoopManager(Long64_t nEmptyEntries) : fNEmptyEntries(nEmptyEntries), fNSlots(TDFInternal::GetNSlots())
133 {
134 }
135 
136 void TLoopManager::RunAndCheckFilters(unsigned int slot, Long64_t entry)
137 {
138  for (auto &actionPtr : fBookedActions) actionPtr->Run(slot, entry);
139  for (auto &namedFilterPtr : fBookedNamedFilters) namedFilterPtr->CheckFilters(slot, entry);
140 }
141 
143 {
144 #ifdef R__USE_IMT
146  TSlotStack slotStack(fNSlots);
148 
149  if (fNEmptyEntries > 0) {
150  // Working with an empty tree.
151  // Evenly partition the entries according to fNSlots
152  const auto nEntriesPerSlot = fNEmptyEntries / fNSlots;
153  auto remainder = fNEmptyEntries % fNSlots;
154  std::vector<std::pair<Long64_t, Long64_t>> entryRanges;
155  Long64_t start = 0;
156  while (start < fNEmptyEntries) {
157  Long64_t end = start + nEntriesPerSlot;
158  if (remainder > 0) {
159  ++end;
160  --remainder;
161  }
162  entryRanges.emplace_back(start, end);
163  start = end;
164  }
165 
166  // Each task will generate a subrange of entries
167  auto genFunction = [this, &slotStack](const std::pair<Long64_t, Long64_t> &range) {
168  auto slot = slotStack.Pop();
169  InitAllNodes(nullptr, slot);
170  for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
171  RunAndCheckFilters(slot, currEntry);
172  }
173  slotStack.Push(slot);
174  };
175 
176  ROOT::TThreadExecutor pool;
177  pool.Foreach(genFunction, entryRanges);
178  } else {
179  using ttpmt_t = ROOT::TTreeProcessorMT;
180  std::unique_ptr<ttpmt_t> tp;
181  tp.reset(new ttpmt_t(*fTree));
182 
183  tp->Process([this, &slotStack](TTreeReader &r) -> void {
184  auto slot = slotStack.Pop();
185  InitAllNodes(&r, slot);
186  // recursive call to check filters and conditionally execute actions
187  while (r.Next()) {
189  }
190  slotStack.Push(slot);
191  });
192  }
193  } else {
194 #endif // R__USE_IMT
195  CreateSlots(1);
196  if (fNEmptyEntries > 0) {
197  InitAllNodes(nullptr, 0);
198  for (Long64_t currEntry = 0; currEntry < fNEmptyEntries && fNStopsReceived < fNChildren; ++currEntry) {
199  RunAndCheckFilters(0, currEntry);
200  }
201  } else {
202  TTreeReader r(fTree.get());
203  InitAllNodes(&r, 0);
204 
205  // recursive call to check filters and conditionally execute actions
206  // in the non-MT case processing can be stopped early by ranges, hence the check on fNStopsReceived
207  while (r.Next() && fNStopsReceived < fNChildren) {
208  RunAndCheckFilters(0, r.GetCurrentEntry());
209  }
210  }
211 #ifdef R__USE_IMT
212  }
213 #endif // R__USE_IMT
214 
215  fHasRunAtLeastOnce = true;
216  // forget actions
217  fBookedActions.clear();
218  // make all TResultProxies ready
219  for (auto readiness : fResProxyReadiness) {
220  *readiness.get() = true;
221  }
222  // forget TResultProxies
223  fResProxyReadiness.clear();
224 }
225 
226 /// Build TTreeReaderValues for all nodes
227 ///
228 /// This method loops over all filters, actions and other booked objects and
229 /// calls their `BuildReaderValues` methods. It is called once per node per slot, before
230 /// running the event loop. It also informs each node of the TTreeReader that
231 /// a particular slot will be using.
232 void TLoopManager::InitAllNodes(TTreeReader *r, unsigned int slot)
233 {
234  // booked branches must be initialized first
235  // because actions and filters might need to point to the values encapsulate
236  for (auto &bookedBranch : fBookedBranches) bookedBranch.second->Init(r, slot);
237  for (auto &ptr : fBookedActions) ptr->Init(r, slot);
238  for (auto &ptr : fBookedFilters) ptr->Init(r, slot);
239 }
240 
241 /// Initialize all nodes of the functional graph before running the event loop
242 ///
243 /// This method loops over all filters, actions and other booked objects and
244 /// calls their `CreateSlots` methods. It is called once per node before running the
245 /// event loop. The main effect is to inform all nodes of the number of slots
246 /// (i.e. workers) that will be used to perform the event loop.
247 void TLoopManager::CreateSlots(unsigned int nSlots)
248 {
249  for (auto &ptr : fBookedActions) ptr->CreateSlots(nSlots);
250  for (auto &ptr : fBookedFilters) ptr->CreateSlots(nSlots);
251  for (auto &bookedBranch : fBookedBranches) bookedBranch.second->CreateSlots(nSlots);
252 }
253 
255 {
256  return this;
257 }
258 
259 const ColumnNames_t &TLoopManager::GetDefaultBranches() const
260 {
261  return fDefaultBranches;
262 }
263 
264 TTree *TLoopManager::GetTree() const
265 {
266  return fTree.get();
267 }
268 
270 {
271  auto it = fBookedBranches.find(name);
272  return it == fBookedBranches.end() ? nullptr : it->second.get();
273 }
274 
276 {
277  return fDirPtr;
278 }
279 
280 void TLoopManager::Book(const ActionBasePtr_t &actionPtr)
281 {
282  fBookedActions.emplace_back(actionPtr);
283 }
284 
285 void TLoopManager::Book(const FilterBasePtr_t &filterPtr)
286 {
287  fBookedFilters.emplace_back(filterPtr);
288  if (filterPtr->HasName()) {
289  fBookedNamedFilters.emplace_back(filterPtr);
290  }
291 }
292 
294 {
295  fBookedBranches[branchPtr->GetName()] = branchPtr;
296 }
297 
298 void TLoopManager::Book(const std::shared_ptr<bool> &readinessPtr)
299 {
300  fResProxyReadiness.emplace_back(readinessPtr);
301 }
302 
303 void TLoopManager::Book(const RangeBasePtr_t &rangePtr)
304 {
305  fBookedRanges.emplace_back(rangePtr);
306 }
307 
308 // dummy call, end of recursive chain of calls
309 bool TLoopManager::CheckFilters(int, unsigned int)
310 {
311  return true;
312 }
313 
314 unsigned int TLoopManager::GetNSlots() const
315 {
316  return fNSlots;
317 }
318 
319 /// Call `PrintReport` on all booked filters
321 {
322  for (const auto &fPtr : fBookedNamedFilters) fPtr->PrintReport();
323 }
324 
325 TRangeBase::TRangeBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, unsigned int start, unsigned int stop,
326  unsigned int stride)
327  : fImplPtr(implPtr), fTmpBranches(tmpBranches), fStart(start), fStop(stop), fStride(stride)
328 {
329 }
330 
332 {
333  return fImplPtr;
334 }
335 
336 ColumnNames_t TRangeBase::GetTmpBranches() const
337 {
338  return fTmpBranches;
339 }
FilterBaseVec_t fBookedFilters
Definition: TDFNodes.hxx:52
long long Long64_t
Definition: RtypesCore.h:69
TTreeReader is a simple, robust and fast interface to read values from a TTree, TChain or TNtuple...
Definition: TTreeReader.h:42
TTree()
Default constructor and I/O constructor.
Definition: TTree.cxx:652
ColumnNames_t GetTmpBranches() const
Definition: TDFNodes.cxx:336
void Report() const
Call PrintReport on all booked filters.
Definition: TDFNodes.cxx:320
::TDirectory * GetDirectory() const
Definition: TDFNodes.cxx:275
TLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
Definition: TDFNodes.cxx:126
std::map< std::string, TmpBranchBasePtr_t > fBookedBranches
Definition: TDFNodes.hxx:54
TRangeBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, unsigned int start, unsigned int stop, unsigned int stride)
Definition: TDFNodes.cxx:325
A spin mutex class which respects the STL interface for mutexes.
Definition: TSpinMutex.hxx:40
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition: TDFNodes.hxx:63
void InitAllNodes(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes.
Definition: TDFNodes.cxx:232
ColumnNames_t GetTmpBranches() const
Definition: TDFNodes.cxx:70
void Book(const ActionBasePtr_t &actionPtr)
Definition: TDFNodes.cxx:280
unsigned int GetNSlots() const
Definition: TDFNodes.cxx:314
std::vector< ULong64_t > fAccepted
Definition: TDFNodes.hxx:372
std::vector< ULong64_t > fRejected
Definition: TDFNodes.hxx:373
std::shared_ptr< TFilterBase > FilterBasePtr_t
Definition: TDFNodes.hxx:43
std::string GetName() const
Definition: TDFNodes.cxx:52
Long64_t GetCurrentEntry() const
Returns the index of the current entry being read.
Definition: TTreeReader.h:208
bool CheckFilters(int, unsigned int)
Definition: TDFNodes.cxx:309
const ColumnNames_t fTmpBranches
Definition: TDFNodes.hxx:369
TLoopManager * GetImplPtr() const
Definition: TDFNodes.cxx:57
ActionBaseVec_t fBookedActions
Definition: TDFNodes.hxx:51
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Definition: TDFNodes.cxx:136
TRandom2 r(17)
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition: TDFNodes.hxx:64
void CreateSlots(unsigned int nSlots)
Initialize all nodes of the functional graph before running the event loop.
Definition: TDFNodes.cxx:247
std::shared_ptr< TCustomColumnBase > TmpBranchBasePtr_t
Definition: TDFNodes.hxx:41
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
Definition: TDFNodes.hxx:367
const unsigned int fNSlots
Definition: TDFNodes.hxx:61
const ColumnNames_t & GetDefaultBranches() const
Definition: TDFNodes.cxx:259
FilterBaseVec_t fBookedNamedFilters
Definition: TDFNodes.hxx:53
#define Printf
Definition: TGeoToOCC.h:18
Describe directory structure in memory.
Definition: TDirectory.h:34
fNSlots(TDFInternal::GetNSlots())
Definition: TDFNodes.cxx:132
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
Definition: TDFNodes.hxx:472
const ColumnNames_t fDefaultBranches
Definition: TDFNodes.hxx:59
TLoopManager * GetImplPtr() const
Definition: TDFNodes.cxx:331
TFilterBase(TLoopManager *df, const ColumnNames_t &tmpBranches, std::string_view name)
Definition: TDFNodes.cxx:62
std::shared_ptr< TDFInternal::TActionBase > ActionBasePtr_t
Definition: TDFNodes.hxx:38
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
Definition: TDFNodes.hxx:263
fDefaultBranches(defaultBranches)
TCustomColumnBase * GetBookedBranch(const std::string &name) const
Definition: TDFNodes.cxx:269
TLoopManager * GetImplPtr() const
Definition: TDFNodes.cxx:65
Bool_t Next()
Move to the next entry (or index of the TEntryList if that is set).
Definition: TTreeReader.h:160
std::shared_ptr< TTree > fTree
Definition: TDFNodes.hxx:58
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition: TROOT.cxx:552
unsigned int GetNSlots()
Definition: TDFUtils.cxx:125
ColumnNames_t GetTmpBranches() const
Definition: TDFNodes.cxx:47
std::vector< std::shared_ptr< bool > > fResProxyReadiness
Definition: TDFNodes.hxx:56
A class to process the entries of a TTree in parallel.
char name[80]
Definition: TGX11.cxx:109
std::shared_ptr< TRangeBase > RangeBasePtr_t
Definition: TDFNodes.hxx:46