LArSoft  v06_85_00
Liquid Argon Software toolkit - http://larsoft.org/
RootOutput_module.cc
Go to the documentation of this file.
28 #include "fhiclcpp/ParameterSet.h"
29 #include "fhiclcpp/types/Atom.h"
32 #include "fhiclcpp/types/Table.h"
34 
35 #include <iomanip>
36 #include <memory>
37 #include <sstream>
38 #include <string>
39 #include <utility>
40 
41 using std::string;
42 
43 namespace {
44  std::string const dev_null{"/dev/null"};
45 }
46 
47 namespace art {
48  class RootOutput;
49  class RootOutputFile;
50 }
51 
52 class art::RootOutput final : public OutputModule {
53 public:
54  static constexpr char const* default_tmpDir{"<parent-path-of-filename>"};
55 
56  struct Config {
57 
58  using Name = fhicl::Name;
60  template <typename T>
62  template <typename T>
64 
66  Atom<std::string> catalog{Name("catalog"), ""};
68  Atom<bool> dropAllSubRuns{Name("dropAllSubRuns"), false};
71  Atom<int> compressionLevel{Name("compressionLevel"), 7};
72  Atom<int64_t> saveMemoryObjectThreshold{Name("saveMemoryObjectThreshold"),
73  -1l};
74  Atom<int64_t> treeMaxVirtualSize{Name("treeMaxVirtualSize"), -1};
75  Atom<int> splitLevel{Name("splitLevel"), 99};
76  Atom<int> basketSize{Name("basketSize"), 16384};
77  Atom<bool> dropMetaDataForDroppedData{Name("dropMetaDataForDroppedData"),
78  false};
79  Atom<std::string> dropMetaData{Name("dropMetaData"), "NONE"};
80  Atom<bool> writeParameterSets{Name("writeParameterSets"), true};
82  Name("fileProperties")};
83 
85  {
86  // Both RootOutput module and OutputModule use the "fileName"
87  // FHiCL parameter. However, whereas in OutputModule the
88  // parameter has a default, for RootOutput the parameter should
89  // not. We therefore have to change the default flag setting
90  // for 'OutputModule::Config::fileName'.
91  using namespace fhicl::detail;
92  ParameterBase* adjustFilename{
93  const_cast<fhicl::Atom<std::string>*>(&omConfig().fileName)};
95  }
96 
97  struct KeysToIgnore {
98  std::set<std::string>
99  operator()() const
100  {
101  std::set<std::string> keys{
103  keys.insert("results");
104  return keys;
105  }
106  };
107  };
108 
110 
111  explicit RootOutput(Parameters const&);
112 
113  void postSelectProducts() override;
114 
115  void beginJob() override;
116  void endJob() override;
117 
118  void event(EventPrincipal const&) override;
119 
120  void beginSubRun(SubRunPrincipal const&) override;
121  void endSubRun(SubRunPrincipal const&) override;
122 
123  void beginRun(RunPrincipal const&) override;
124  void endRun(RunPrincipal const&) override;
125 
126 private:
127  std::string fileNameAtOpen() const;
128  std::string fileNameAtClose(std::string const& currentFileName);
129  std::string const& lastClosedFileName() const override;
130  void openFile(FileBlock const&) override;
131  void respondToOpenInputFile(FileBlock const&) override;
132  void readResults(ResultsPrincipal const& resp) override;
133  void respondToCloseInputFile(FileBlock const&) override;
134  void incrementInputFileNumber() override;
135  Granularity fileGranularity() const override;
136  void write(EventPrincipal&) override;
137  void writeSubRun(SubRunPrincipal&) override;
138  void writeRun(RunPrincipal&) override;
139  void setSubRunAuxiliaryRangeSetID(RangeSet const&) override;
140  void setRunAuxiliaryRangeSetID(RangeSet const&) override;
141  bool isFileOpen() const override;
142  void setFileStatus(OutputFileStatus) override;
143  bool requestsToCloseFile() const override;
144  void doOpenFile();
145  void startEndFile() override;
146  void writeFileFormatVersion() override;
147  void writeFileIndex() override;
148  void writeEventHistory() override;
149  void writeProcessConfigurationRegistry() override;
150  void writeProcessHistoryRegistry() override;
151  void writeParameterSetRegistry() override;
152  void writeProductDescriptionRegistry() override;
153  void writeParentageRegistry() override;
156  FileCatalogMetadata::collection_type const& ssmd) override;
157  void writeProductDependencies() override;
158  void finishEndFile() override;
160  ProductDescriptions& producedProducts,
161  ModuleDescription const& md) override;
162 
163 private:
164  std::string const catalog_;
165  bool dropAllEvents_{false};
167  std::string const moduleLabel_;
169  std::unique_ptr<RootOutputFile> rootOutputFile_{nullptr};
172  std::string const filePattern_;
173  std::string tmpDir_;
174  std::string lastClosedFileName_{};
175 
176  // We keep this set of data members for the use of RootOutputFile.
177  int const compressionLevel_;
179  int64_t const treeMaxVirtualSize_;
180  int const splitLevel_;
181  int const basketSize_;
184 
185  // We keep this for the use of RootOutputFile and we also use it
186  // during file open to make some choices.
188 
189  // Set false only for cases where we are guaranteed never to need
190  // historical ParameterSet information in the downstream file
191  // (e.g. mixing).
194 
195  // ResultsProducer management.
198 };
199 
201  : OutputModule{config().omConfig, config.get_PSet()}
202  , catalog_{config().catalog()}
203  , dropAllSubRuns_{config().dropAllSubRuns()}
204  , moduleLabel_{config.get_PSet().get<string>("module_label")}
206  , filePattern_{config().omConfig().fileName()}
207  , tmpDir_{config().tmpDir() == default_tmpDir ? parent_path(filePattern_) :
208  config().tmpDir()}
209  , compressionLevel_{config().compressionLevel()}
210  , saveMemoryObjectThreshold_{config().saveMemoryObjectThreshold()}
211  , treeMaxVirtualSize_{config().treeMaxVirtualSize()}
212  , splitLevel_{config().splitLevel()}
213  , basketSize_{config().basketSize()}
214  , dropMetaData_{config().dropMetaData()}
215  , dropMetaDataForDroppedData_{config().dropMetaDataForDroppedData()}
216  , writeParameterSets_{config().writeParameterSets()}
217  , fileProperties_{(
219  config.get_PSet().has_key(config().fileProperties.name()),
220  filePattern_), // comma operator!
221  config().fileProperties())}
222  , rpm_{config.get_PSet()}
223 {
224  bool const dropAllEventsSet{config().dropAllEvents(dropAllEvents_)};
227 
228  // N.B. Any time file switching is enabled at a boundary other than
229  // InputFile, fastCloningEnabled_ ***MUST*** be deactivated. This is
230  // to ensure that the Event tree from the InputFile is not
231  // accidentally cloned to the output file before the output
232  // module has seen the events that are going to be processed.
233  bool const fastCloningSet{config().fastCloning(fastCloningEnabled_)};
236 
237  if (!writeParameterSets_) {
238  mf::LogWarning("PROVENANCE")
239  << "Output module " << moduleLabel_
240  << " has parameter writeParameterSets set to false.\n"
241  << "Parameter set provenance will not be available in subsequent jobs.\n"
242  << "Check your experiment's policy on this issue to avoid future "
243  "problems\n"
244  << "with analysis reproducibility.\n";
245  }
246 }
247 
248 void
250 {
251  // Note: The file block here refers to the currently open input
252  // file, so we can find out about the available products by
253  // looping over the branches of the input file data trees.
254  if (!isFileOpen()) {
255  doOpenFile();
257  }
258 }
259 
260 void
262 {
263  if (isFileOpen()) {
264  rootOutputFile_->selectProducts();
265  }
266 }
267 
268 void
270 {
271  ++inputFileCount_;
272  if (!isFileOpen())
273  return;
274 
275  auto const* rfb = dynamic_cast<RootFileBlock const*>(&fb);
276 
277  bool fastCloneThisOne = fastCloningEnabled_ && rfb &&
278  (rfb->tree() != nullptr) &&
279  ((remainingEvents() < 0) ||
280  (remainingEvents() >= rfb->tree()->GetEntries()));
281  if (fastCloningEnabled_ && !fastCloneThisOne) {
282  mf::LogWarning("FastCloning")
283  << "Fast cloning deactivated for this input file due to "
284  << "empty event tree and/or event limits.";
285  }
286  if (fastCloneThisOne && !rfb->fastClonable()) {
287  mf::LogWarning("FastCloning")
288  << "Fast cloning deactivated for this input file due to "
289  << "information in FileBlock.";
290  fastCloneThisOne = false;
291  }
292  rootOutputFile_->beginInputFile(rfb, fastCloneThisOne);
294 }
295 
296 void
298 {
299  rpm_.for_each_RPWorker([&resp](RPWorker& w) { w.rp().doReadResults(resp); });
300 }
301 
302 void
304 {
305  if (isFileOpen()) {
306  rootOutputFile_->respondToCloseInputFile(fb);
307  }
308 }
309 
310 void
312 {
313  if (dropAllEvents_) {
314  return;
315  }
317  ep.addToProcessHistory();
318  }
319  rootOutputFile_->writeOne(ep);
320  fstats_.recordEvent(ep.id());
321 }
322 
323 void
325 {
326  rootOutputFile_->setSubRunAuxiliaryRangeSetID(rs);
327 }
328 
329 void
331 {
332  if (dropAllSubRuns_) {
333  return;
334  }
336  sr.addToProcessHistory();
337  }
338  rootOutputFile_->writeSubRun(sr);
339  fstats_.recordSubRun(sr.id());
340 }
341 
342 void
344 {
345  rootOutputFile_->setRunAuxiliaryRangeSetID(rs);
346 }
347 
348 void
350 {
351  if (hasNewlyDroppedBranch()[InRun]) {
353  }
354  rootOutputFile_->writeRun(r);
355  fstats_.recordRun(r.id());
356 }
357 
358 void
360 {
361  auto resp = std::make_unique<ResultsPrincipal>(
363  resp->setProducedProducts(producedResultsProducts_);
364  if (ProductMetaData::instance().productProduced(InResults) ||
366  resp->addToProcessHistory();
367  }
369  [&resp](RPWorker& w) { w.rp().doWriteResults(*resp); });
370  rootOutputFile_->writeResults(*resp);
371 }
372 
373 void
375 {
376  rootOutputFile_->writeFileFormatVersion();
377 }
378 
379 void
381 {
382  rootOutputFile_->writeFileIndex();
383 }
384 
385 void
387 {
388  rootOutputFile_->writeEventHistory();
389 }
390 
391 void
393 {
394  rootOutputFile_->writeProcessConfigurationRegistry();
395 }
396 
397 void
399 {
400  rootOutputFile_->writeProcessHistoryRegistry();
401 }
402 
403 void
405 {
406  if (writeParameterSets_) {
407  rootOutputFile_->writeParameterSetRegistry();
408  }
409 }
410 
411 void
413 {
414  rootOutputFile_->writeProductDescriptionRegistry();
415 }
416 
417 void
419 {
420  rootOutputFile_->writeParentageRegistry();
421 }
422 
423 void
427 {
428  rootOutputFile_->writeFileCatalogMetadata(fstats_, md, ssmd);
429 }
430 
431 void
433 {
434  rootOutputFile_->writeProductDependencies();
435 }
436 
437 void
439 {
440  std::string const currentFileName{rootOutputFile_->currentFileName()};
441  rootOutputFile_->writeTTrees();
442  rootOutputFile_.reset();
444  lastClosedFileName_ = fileNameAtClose(currentFileName);
445  detail::logFileAction("Closed output file ", lastClosedFileName_);
447 }
448 
449 void
451  ProductDescriptions& producedProducts,
452  ModuleDescription const& md)
453 {
454  // Register Results products from ResultsProducers.
455  rpm_.for_each_RPWorker([&mpr, &producedProducts, &md](RPWorker& w) {
456  auto const& params = w.params();
458  ModuleDescription{params.rpPSetID,
459  params.rpPluginType,
460  md.moduleLabel() + '#' + params.rpLabel,
463  w.rp().registerProducts(mpr, producedProducts, w.moduleDescription());
464  });
465 
466  // Form product table for Results products. We do this here so we
467  // can appropriately set the product tables for the
468  // ResultsPrincipal.
469  producedResultsProducts_ = ProductTable{producedProducts, InResults};
470 }
471 
472 void
474 {
475  if (isFileOpen())
476  rootOutputFile_->setFileStatus(ofs);
477 }
478 
479 bool
481 {
482  return rootOutputFile_.get() != nullptr;
483 }
484 
485 void
487 {
488  if (isFileOpen())
489  rootOutputFile_->incrementInputFileNumber();
490 }
491 
492 bool
494 {
495  return isFileOpen() ? rootOutputFile_->requestsToCloseFile() : false;
496 }
497 
500 {
501  return fileProperties_.granularity();
502 }
503 
504 void
506 {
507  if (inputFileCount_ == 0) {
509  << "Attempt to open output file before input file. "
510  << "Please report this to the core framework developers.\n";
511  }
513  std::make_unique<RootOutputFile>(this,
514  fileNameAtOpen(),
519  splitLevel_,
520  basketSize_,
525  detail::logFileAction("Opened output file with pattern ", filePattern_);
526 }
527 
528 string
530 {
531  return filePattern_ == dev_null ? dev_null :
532  unique_filename(tmpDir_ + "/RootOutput");
533 }
534 
535 string
536 art::RootOutput::fileNameAtClose(std::string const& currentFileName)
537 {
538  return filePattern_ == dev_null ?
539  dev_null :
540  fRenamer_.maybeRenameFile(currentFileName, filePattern_);
541 }
542 
543 string const&
545 {
546  if (lastClosedFileName_.empty()) {
547  throw Exception(errors::LogicError, "RootOutput::currentFileName(): ")
548  << "called before meaningful.\n";
549  }
550  return lastClosedFileName_;
551 }
552 
553 void
555 {
557 }
558 
559 void
561 {
564 }
565 
566 void
568 {
569  rpm_.for_each_RPWorker([&ep](RPWorker& w) { w.rp().doEvent(ep); });
570 }
571 
572 void
574 {
575  rpm_.for_each_RPWorker([&srp](RPWorker& w) {
577  w.rp().doBeginSubRun(srp);
578  });
579 }
580 
581 void
583 {
584  rpm_.for_each_RPWorker([&srp](RPWorker& w) { w.rp().doEndSubRun(srp); });
585 }
586 
587 void
589 {
590  rpm_.for_each_RPWorker([&rp](RPWorker& w) { w.rp().doBeginRun(rp); });
591 }
592 
593 void
595 {
596  rpm_.for_each_RPWorker([&rp](RPWorker& w) { w.rp().doEndRun(rp); });
597 }
598 
600 
601 // vim: set sw=2:
void writeParentageRegistry() override
std::vector< std::pair< std::string, std::string >> collection_type
void respondToCloseInputFile(FileBlock const &) override
void startEndFile() override
static constexpr ModuleDescriptionID invalidID()
std::string const filePattern_
bool shouldDropEvents(bool dropAllEventsSet, bool dropAllEvents, bool dropAllSubRuns)
void writeProductDescriptionRegistry() override
void endSubRun(SubRunPrincipal const &) override
void doEvent(EventPrincipal const &)
auto const & get_PSet() const
void doBeginSubRun(SubRunPrincipal const &)
Atom< bool > dropMetaDataForDroppedData
void recordRun(RunID const &id)
ClosingCriteria fileProperties_
std::string fileNameAtOpen() const
void doWriteFileCatalogMetadata(FileCatalogMetadata::collection_type const &md, FileCatalogMetadata::collection_type const &ssmd) override
void doWriteResults(ResultsPrincipal &)
ModuleDescription const & moduleDescription() const
Definition: RPWorker.h:65
std::string const & fileName() const
Definition: FileBlock.h:38
FileStatsCollector fstats_
void writeFileIndex() override
static cet::exempt_ptr< Consumer > non_module_context()
Definition: Consumer.cc:76
void registerProducts(MasterProductRegistry &mpr, ProductDescriptions &producedProducts, ModuleDescription const &md)
PostCloseFileRenamer fRenamer_
std::string const moduleLabel_
void doReadResults(ResultsPrincipal const &)
void validateFileNamePattern(bool do_check, std::string const &pattern)
void logFileAction(const char *msg, std::string const &file)
Definition: logFileAction.cc:9
void writeFileFormatVersion() override
void setFileStatus(OutputFileStatus) override
void recordEvent(EventID const &id)
static constexpr char const * default_tmpDir
std::vector< BranchDescription > ProductDescriptions
bool isFileOpen() const override
void openFile(FileBlock const &) override
void writeProcessConfigurationRegistry() override
void doEndRun(RunPrincipal const &)
void incrementInputFileNumber() override
void for_each_RPWorker(on_rpworker_t wfunc)
Definition: RPManager.h:76
int64_t const treeMaxVirtualSize_
std::string lastClosedFileName_
OptionalAtom< bool > fastCloning
std::string unique_filename(std::string stem, std::string extension=".root")
void doBeginRun(RunPrincipal const &)
void doRegisterProducts(MasterProductRegistry &mpr, ProductDescriptions &producedProducts, ModuleDescription const &md) override
void writeParameterSetRegistry() override
OutputFileStatus
Atom< int64_t > saveMemoryObjectThreshold
#define DEFINE_ART_MODULE(klass)
Definition: ModuleMacros.h:42
SubRunID id() const
DropMetaData dropMetaData_
void doEndSubRun(SubRunPrincipal const &)
bool requestsToCloseFile() const override
void addToProcessHistory()
Definition: Principal.cc:80
static ProductMetaData const & instance()
ProductTable producedResultsProducts_
auto granularity() const
void write(EventPrincipal &) override
std::unique_ptr< RootOutputFile > rootOutputFile_
std::string const & lastClosedFileName() const override
TFile fb("Li6.root")
void setModuleDescription(ModuleDescription const &)
Definition: RPWorker.h:71
void recordSubRun(SubRunID const &id)
RootOutput(Parameters const &)
int const compressionLevel_
fhicl::TableFragment< art::OutputModule::Config > omConfig
Atom< std::string > tmpDir
void recordInputFile(std::string const &inputFileName)
void writeProductDependencies() override
std::array< bool, NumBranchTypes > const & hasNewlyDroppedBranch() const
Definition: OutputModule.h:342
void writeSubRun(SubRunPrincipal &) override
ModuleDescription const & description() const
Definition: OutputModule.h:311
void beginRun(RunPrincipal const &) override
Atom< std::string > catalog
int64_t const saveMemoryObjectThreshold_
void invoke(invoke_function_t< void, ARGS... > mfunc, ARGS &&...args)
Definition: RPManager.h:66
std::string const & moduleLabel() const
int remainingEvents() const
Definition: OutputModule.h:323
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
void beginJob() override
void setRunAuxiliaryRangeSetID(RangeSet const &) override
void respondToOpenInputFile(FileBlock const &) override
ResultsProducer & rp()
Definition: RPWorker.h:47
std::set< std::string > operator()() const
std::string parent_path(std::string const &path)
Definition: parent_path.cc:15
RunID const & id() const
Definition: RunPrincipal.h:46
void readResults(ResultsPrincipal const &resp) override
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
void setSubRunAuxiliaryRangeSetID(RangeSet const &) override
std::string fileNameAtClose(std::string const &currentFileName)
void endRun(RunPrincipal const &) override
void showMissingConsumes() const
Definition: Consumer.cc:125
std::string maybeRenameFile(std::string const &inPath, std::string const &toPattern)
HLT enums.
void endJob() override
EventID const & id() const
Granularity fileGranularity() const override
Atom< int64_t > treeMaxVirtualSize
void beginSubRun(SubRunPrincipal const &) override
void writeProcessHistoryRegistry() override
void postSelectProducts() override
void writeRun(RunPrincipal &) override
fhicl::Table< ClosingCriteria::Config > fileProperties
void writeEventHistory() override
Float_t w
Definition: plot.C:23
bool shouldFastClone(bool fastCloningSet, bool fastCloning, bool wantAllEvents, ClosingCriteria const &fileProperties)
std::string const catalog_
std::string const & processName() const
RPParams const & params() const
Definition: RPWorker.h:59
void set_par_style(par_style const vt)
OptionalAtom< bool > dropAllEvents
void event(EventPrincipal const &) override
ProcessConfiguration const & processConfiguration() const
Atom< std::string > dropMetaData
void finishEndFile() override