47 :
TMPWorker(), fFileNames(), fTreeName(), fTree(nullptr), fFile(nullptr), fEntryList(nullptr), fFirstEntry(0),
48 fTreeCache(0), fTreeCacheIsLearning(
kFALSE), fUseTreeCache(
kTRUE), fCacheSize(-1)
55 :
TMPWorker(nWorkers, maxEntries), fFileNames(fileNames), fTreeName(treeName), fTree(nullptr), fFile(nullptr),
56 fEntryList(entries), fFirstEntry(firstEntry), fTreeCache(0), fTreeCacheIsLearning(
kFALSE), fUseTreeCache(
kTRUE),
64 :
TMPWorker(nWorkers, maxEntries), fTree(tree), fFile(nullptr), fEntryList(entries), fFirstEntry(firstEntry),
65 fTreeCache(0), fTreeCacheIsLearning(
kFALSE), fUseTreeCache(
kTRUE), fCacheSize(-1)
105 if (fp ==
nullptr || fp->
IsZombie()) {
106 std::stringstream ss;
107 ss <<
"could not open file " << fileName;
108 std::string errmsg = ss.str();
137 if (tree ==
nullptr) {
138 std::stringstream ss;
139 ss <<
"cannot find tree with name " <<
fTreeName <<
" in file " << fp->
GetName();
140 std::string errmsg = ss.str();
168 Info(
"SetupTreeCache",
"the tree cache is in learning phase");
171 Warning(
"SetupTreeCache",
"default tree does not have a file attached: corruption? Tree cache untouched");
220 std::string reply =
"S" + std::to_string(
GetNWorker());
221 reply +=
": unknown code received: " + std::to_string(code);
248 if (
LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
260 for (
Long64_t entry = start; entry < finish; ++entry) {
289 std::string mgroot =
"[S" + std::to_string(
GetNWorker()) +
"]: ";
294 mgroot +=
"MPCode::kProcTree: ";
297 if(
fTree ==
nullptr) {
298 errmsg = mgroot + std::string(
"tree undefined!");
303 nProcessed = ReadBuffer<UInt_t>(msg.second.get());
311 start = rangeN * nBunch;
313 finish = (rangeN+1)*nBunch;
325 errmsg = mgroot + std::string(
"unable to retrieve tree from open file ") +
342 mgroot +=
"MPCode::kProcRange: ";
344 nProcessed = ReadBuffer<UInt_t>(msg.second.get());
348 mgroot +=
"MPCode::kProcFile: ";
350 fileN = ReadBuffer<UInt_t>(msg.second.get());
352 errmsg +=
"MPCode undefined!";
360 if (
fFile ==
nullptr) {
362 errmsg = mgroot + std::string(
"unable to open file ") +
fFileNames[fileN];
370 if (tree ==
nullptr) {
372 errmsg = mgroot + std::string(
"unable to retrieve tree from open file ") +
fFileNames[fileN];
377 setupcache = (tree !=
fTree) ?
true :
false;
390 start = rangeN * nBunch;
391 if(rangeN < (fNWorkers-1))
392 finish = (rangeN+1)*nBunch;
415 start = rangeN * nBunch;
416 if (rangeN < (fNWorkers - 1))
417 finish = (rangeN + 1) * nBunch;
422 finish = (*enl)->GetN();
436 Info(
"LoadTree",
"%s %d %d file: %s %lld %lld", mgroot.c_str(), nProcessed, fileN,
fFile->
GetName(), start,
void CloseFile()
Handle file closing.
void Setup()
Auxilliary method for common initializations.
TFile * OpenFile(const std::string &fileName)
Handle file opening.
virtual Bool_t Notify()
This method must be overridden to handle object notification.
virtual TList * GetOutputList() const
virtual void SetCacheRead(TFileCacheRead *cache, TObject *tree=0, ECacheAction action=kDisconnect)
Set a pointer to the read cache.
This class represents a WWW compatible URL.
virtual TList * GetListOfKeys() const
std::string fTreeName
the name of the tree to be processed
A specialized TFileCacheRead object for a TTree.
TSelector & fSelector
pointer to the selector to be used to process the tree. It is null if we are not using a TSelector...
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
void Process(UInt_t code, MPCodeBufPair &msg)
Selector specialization.
TMPWorkerTree()
Class constructors.
virtual const char * GetClassName() const
ULong64_t fProcessedEntries
the number of entries processed by this worker so far
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
TTreeCache * fTreeCache
instance of the tree cache for the tree
TFile * GetCurrentFile() const
Return pointer to the current file.
Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl, std::string &errmsg)
Load the requierd tree and evaluate the processing range.
void SendError(const std::string &errmsg, unsigned int code=MPCode::kError)
Error sender.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
TFileCacheRead * GetCacheRead(TObject *tree=0) const
Return a pointer to the current read cache.
void SendResult()
Selector processing SendResult and Process overload.
unsigned GetNWorker() const
virtual Bool_t IsLearning() const
void Info(const char *location, const char *msgfmt,...)
virtual void SendResult()
virtual void ResetCache()
This will simply clear the cache.
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
Ask for a kFuncResult/kProcResult.
Bool_t fTreeCacheIsLearning
Whether cache is in learning phase.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
The message contains the result of the processing of a TTree.
ULong64_t fMaxNEntries
the maximum number of entries to be processed by this worker
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
Max entries evaluation.
virtual void Process(UInt_t, MPCodeBufPair &)
void SetupTreeCache(TTree *tree)
Tree cache handling.
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo...
virtual Long64_t GetCacheSize() const
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
virtual void SlaveBegin(TTree *)
unsigned fNWorkers
the number of workers spawned
virtual Long64_t GetEntry(Int_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next()...
void Init(int fd, UInt_t workerN)
Init overload definign max entries.
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
virtual const char * GetName() const
Returns name of object.
void Warning(const char *location, const char *msgfmt,...)
virtual void SlaveTerminate()
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
unsigned long long ULong64_t
Bool_t fUseTreeCache
Control usage of the tree cache.
TEntryList * fEntryList
entrylist
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
virtual Bool_t Process(Long64_t)
Long64_t fCacheSize
Cache size.
virtual Int_t SetCacheSize(Long64_t cachesize=-1)
Set maximum size of the file cache .
std::vector< std::string > fFileNames
the files to be processed by all workers
void HandleInput(MPCodeBufPair &msg)
Execute instructions received from a MP client.
virtual TEntryList * GetEntryList(const char *treename, const char *filename, Option_t *opt="")
Return the entry list, corresponding to treename and filename By default, the filename is first tried...
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
virtual Long64_t GetEntries() const
A TTree object has a header with a name and a title.
Tell the client there was an error while processing.
A List of entry numbers in a TTree or TChain.
virtual void UpdateBranches(TTree *tree)
Update pointer to current Tree and recompute pointers to the branches in the cache.
We are ready for the next task.
TFile * fFile
last open file
virtual void Init(TTree *)