LArSoft  v07_13_02
Liquid Argon Software toolkit - http://larsoft.org/
RootInputFileSequence.cc
Go to the documentation of this file.
2 // vim: set sw=2:
3 
4 #include "TFile.h"
14 #include "cetlib/container_algorithms.h"
15 #include "fhiclcpp/ParameterSet.h"
17 
18 #include <ctime>
19 #include <map>
20 #include <set>
21 #include <stack>
22 #include <string>
23 #include <utility>
24 
25 using namespace cet;
26 using namespace std;
27 
28 namespace art {
29 
30  RootInputFileSequence::RootInputFileSequence(
32  InputFileCatalog& catalog,
33  FastCloningInfoProvider const& fcip,
36  ProcessConfiguration const& processConfig)
37  : catalog_{catalog}
38  , fileIndexes_(fileCatalogItems().size())
39  , eventsToSkip_{config().skipEvents()}
40  , compactSubRunRanges_{config().compactSubRunRanges()}
41  , noEventSort_{config().noEventSort()}
42  , skipBadFiles_{config().skipBadFiles()}
43  , treeCacheSize_{config().cacheSize()}
44  , treeMaxVirtualSize_{config().treeMaxVirtualSize()}
45  , saveMemoryObjectThreshold_{config().saveMemoryObjectThreshold()}
46  , delayedReadEventProducts_{config().delayedReadEventProducts()}
47  , delayedReadSubRunProducts_{config().delayedReadSubRunProducts()}
48  , delayedReadRunProducts_{config().delayedReadRunProducts()}
49  , groupSelectorRules_{config().inputCommands(),
50  "inputCommands",
51  "InputSource"}
52  , dropDescendants_{config().dropDescendantsOfDroppedBranches()}
53  , readParameterSets_{config().readParameterSets()}
54  , fastCloningInfo_{fcip}
55  , processingMode_{pMode}
56  , processConfiguration_{processConfig}
57  , mpr_{mpr}
58  {
59  auto const& primaryFileNames = catalog_.fileSources();
60 
61  map<string const, vector<string> const> secondaryFilesMap;
62 
63  std::vector<Config::SecondaryFile> secondaryFiles;
64  if (config().secondaryFileNames(secondaryFiles)) {
65  for (auto const& val : secondaryFiles) {
66  auto const a = val.a();
67  auto const b = val.b();
68  if (a.empty()) {
70  << "Empty filename found as value of an \"a\" parameter!\n";
71  }
72  for (auto const& name : b) {
73  if (name.empty()) {
75  << "Empty secondary filename found as value of an \"b\" "
76  "parameter!\n";
77  }
78  }
79  secondaryFilesMap.emplace(a, b);
80  }
81  }
82 
84  stk;
85  for (auto const& primaryFileName : primaryFileNames) {
86  vector<string> secondaries;
87  auto SFMI = secondaryFilesMap.find(primaryFileName);
88  if (SFMI == secondaryFilesMap.end()) {
89  // This primary has no secondaries.
90  secondaryFileNames_.push_back(std::move(secondaries));
91  continue;
92  }
93  if (!SFMI->second.size()) {
94  // Has an empty secondary list.
95  secondaryFileNames_.push_back(std::move(secondaries));
96  continue;
97  }
98  stk.emplace_back(SFMI->second.cbegin(), SFMI->second.cend());
99  while (stk.size()) {
100  auto val = stk.back();
101  stk.pop_back();
102  if (val.first == val.second) {
103  // Reached end of this filename list.
104  continue;
105  }
106  auto const& fn = *val.first;
107  ++val.first;
108  secondaries.push_back(fn);
109  auto SI = secondaryFilesMap.find(fn);
110  if (SI == secondaryFilesMap.end()) {
111  // Has no secondary list.
112  if (val.first == val.second) {
113  // Reached end of this filename list.
114  continue;
115  }
116  stk.emplace_back(val.first, val.second);
117  continue;
118  }
119  if (!SI->second.size()) {
120  // Has an empty secondary list.
121  if (val.first == val.second) {
122  // Reached end of this filename list.
123  continue;
124  }
125  stk.emplace_back(val.first, val.second);
126  continue;
127  }
128  stk.emplace_back(val.first, val.second);
129  stk.emplace_back(SI->second.cbegin(), SI->second.cend());
130  }
131  secondaryFileNames_.push_back(std::move(secondaries));
132  }
133  RunNumber_t firstRun{};
134  bool const haveFirstRun{config().hasFirstRun(firstRun)};
135  SubRunNumber_t firstSubRun{};
136  bool const haveFirstSubRun{config().hasFirstSubRun(firstSubRun)};
137  EventNumber_t firstEvent{};
138  bool const haveFirstEvent{config().hasFirstEvent(firstEvent)};
139 
140  RunID const firstRunID{haveFirstRun ? RunID{firstRun} : RunID::firstRun()};
141  SubRunID const firstSubRunID{haveFirstSubRun ?
142  SubRunID{firstRunID.run(), firstSubRun} :
143  SubRunID::firstSubRun(firstRunID)};
144 
145  origEventID_ = haveFirstEvent ? EventID{firstSubRunID, firstEvent} :
146  EventID::firstEvent(firstSubRunID);
147 
148  if (noEventSort_ && haveFirstEvent) {
150  << "Illegal configuration options passed to RootInput\n"
151  << "You cannot request \"noEventSort\" and also set \"firstEvent\".\n";
152  }
153  if (primary()) {
154  duplicateChecker_ = std::make_shared<DuplicateChecker>(config().dc);
155  }
156  if (pendingClose_) {
158  << "RootInputFileSequence looking for next file with a pending close!";
159  }
160  while (catalog_.getNextFile()) {
162  if (rootFile_) {
163  // We found one, good, stop now.
164  break;
165  }
166  }
167  if (!rootFile_) {
168  // We could not open any input files, stop.
169  return;
170  }
171  if (config().setRunNumber(setRun_)) {
172  try {
173  forcedRunOffset_ = rootFile_->setForcedRunOffset(setRun_);
174  }
175  catch (art::Exception& e) {
176  if (e.categoryCode() == errors::InvalidNumber) {
178  << "setRunNumber " << setRun_
179  << " does not correspond to a valid run number in ["
180  << RunID::firstRun().run() << ", " << RunID::maxRun().run()
181  << "]\n";
182  } else {
183  throw; // Rethrow.
184  }
185  }
186  if (forcedRunOffset_ < 0) {
188  << "The value of the 'setRunNumber' parameter must not be\n"
189  << "less than the first run number in the first input file.\n"
190  << "'setRunNumber' was " << setRun_ << ", while the first run was "
191  << setRun_ - forcedRunOffset_ << ".\n";
192  }
193  }
194  if (!readParameterSets_) {
195  mf::LogWarning("PROVENANCE")
196  << "Source parameter readParameterSets was set to false: parameter set "
197  "provenance\n"
198  << "will NOT be available in this or subsequent jobs using output from "
199  "this job.\n"
200  << "Check your experiment's policy on this issue to avoid future "
201  "problems\n"
202  << "with analysis reproducibility.\n";
203  }
204  if (compactSubRunRanges_) {
205  mf::LogWarning("PROVENANCE")
206  << "Source parameter compactEventRanges was set to true: enabling "
207  "compact event ranges\n"
208  << "creates a history that can cause file concatenation problems if a "
209  "given SubRun spans\n"
210  << "multiple input files. Use with care.\n";
211  }
212  }
213 
214  EventID
216  {
217  // Attempt to find event in currently open input file.
218  bool found = rootFile_->setEntry<InEvent>(eID, true);
219  // found in the current file
220  if (found) {
221  return rootFile_->eventIDForFileIndexPosition();
222  }
223  // fail if not searchable
224  if (!catalog_.isSearchable()) {
225  return EventID();
226  }
227  // Look for event in files previously opened without reopening unnecessary
228  // files.
229  for (auto itBegin = fileIndexes_.cbegin(),
230  itEnd = fileIndexes_.cend(),
231  it = itBegin;
232  (!found) && it != itEnd;
233  ++it) {
234  if (*it && (*it)->contains(eID, exact)) {
235  // We found it. Close the currently open file, and open the correct one.
236  catalog_.rewindTo(std::distance(itBegin, it));
237  initFile(/*skipBadFiles=*/false);
238  // Now get the event from the correct file.
239  found = rootFile_->setEntry<InEvent>(eID, exact);
240  assert(found);
241  return rootFile_->eventIDForFileIndexPosition();
242  }
243  }
244  // Look for event in files not yet opened.
245  while (catalog_.getNextFile()) {
246  initFile(/*skipBadFiles=*/false);
247  found = rootFile_->setEntry<InEvent>(eID, exact);
248  }
249  return (found) ? rootFile_->eventIDForFileIndexPosition() : EventID();
250  }
251 
252  EventID
254  {
255  skip(offset);
256  return rootFile_->eventIDForFileIndexPosition();
257  }
258 
259  vector<FileCatalogItem> const&
261  {
262  return catalog_.fileCatalogItems();
263  }
264 
265  void
267  {
268  closeFile_();
269  }
270 
271  std::unique_ptr<FileBlock>
273  {
274  if (firstFile_) {
275  // We are at the first file in the sequence of files.
276  firstFile_ = false;
277  if (!rootFile_) {
279  }
280  } else if (!nextFile()) {
281  // FIXME: Turn this into a throw!
282  assert(false);
283  }
284  if (!rootFile_) {
285  return std::make_unique<RootFileBlock>();
286  }
287  return rootFile_->createFileBlock();
288  }
289 
290  void
292  {
293  if (pendingClose_) {
294  catalog_.finish(); // We were expecting this
295  pendingClose_ = false;
296  }
297  if (!rootFile_)
298  return;
299 
300  // Account for events skipped in the file.
301  eventsToSkip_ = rootFile_->eventsToSkip();
302  rootFile_->close(primary());
303  detail::logFileAction("Closed input file ", rootFile_->fileName());
304  rootFile_.reset();
305  if (duplicateChecker_.get() != nullptr) {
306  duplicateChecker_->inputFileClosed();
307  }
308  }
309 
310  void
312  {
313  pendingClose_ = true;
314  }
315 
316  void
317  RootInputFileSequence::initFile(bool const skipBadFiles)
318  {
319  // close the currently open file, any, and delete the RootInputFile object.
320  closeFile_();
321  std::unique_ptr<TFile> filePtr;
322  try {
323  detail::logFileAction("Initiating request to open input file ",
325  filePtr.reset(TFile::Open(catalog_.currentFile().fileName().c_str()));
326  }
327  catch (cet::exception e) {
328  if (!skipBadFiles) {
330  << e.explain_self()
331  << "\nRootInputFileSequence::initFile(): Input file "
333  << " was not found or could not be opened.\n";
334  }
335  }
336  if (!filePtr || filePtr->IsZombie()) {
337  if (!skipBadFiles) {
339  << "RootInputFileSequence::initFile(): Input file "
341  << " was not found or could not be opened.\n";
342  }
343  mf::LogWarning("")
344  << "Input file: " << catalog_.currentFile().fileName()
345  << " was not found or could not be opened, and will be skipped.\n";
346  return;
347  }
348  detail::logFileAction("Opened input file ",
350  vector<string> empty_vs;
351  rootFile_ = make_shared<RootInputFile>(
353  catalog_.url(),
356  std::move(filePtr),
357  origEventID_,
369  noEventSort_,
374  /*primaryFile*/ exempt_ptr<RootInputFile>{nullptr},
375  secondaryFileNames_.empty() ?
376  empty_vs :
378  this,
379  mpr_);
380 
382  if (catalog_.currentIndex() + 1 > fileIndexes_.size()) {
383  fileIndexes_.resize(catalog_.currentIndex() + 1);
384  }
385  fileIndexes_[catalog_.currentIndex()] = rootFile_->fileIndexSharedPtr();
386  }
387 
388  std::unique_ptr<RootInputFile>
390  string const& name,
391  exempt_ptr<RootInputFile> primaryFile)
392  {
393  std::unique_ptr<TFile> filePtr;
394  try {
395  detail::logFileAction("Attempting to open secondary input file ", name);
396  filePtr.reset(TFile::Open(name.c_str()));
397  }
398  catch (cet::exception e) {
400  << e.explain_self()
401  << "\nRootInputFileSequence::openSecondaryFile(): Input file " << name
402  << " was not found or could not be opened.\n";
403  }
404  if (!filePtr || filePtr->IsZombie()) {
406  << "RootInputFileSequence::openSecondaryFile(): Input file " << name
407  << " was not found or could not be opened.\n";
408  }
409  detail::logFileAction("Opened secondary input file ", name);
410  vector<string> empty_secondary_filenames;
411  return std::make_unique<RootInputFile>(name,
412  /*url*/ "",
414  /*logicalFileName*/ "",
415  std::move(filePtr),
416  origEventID_,
428  noEventSort_,
430  /*duplicateChecker_*/ nullptr,
433  primaryFile,
434  empty_secondary_filenames,
435  this,
436  mpr_);
437  }
438 
439  bool
441  {
442  if (!catalog_.getNextFile()) {
443  // no more files
444  return false;
445  }
447  return true;
448  }
449 
450  bool
452  {
453  // no going back for non-persistent files
454  if (!catalog_.isSearchable()) {
455  return false;
456  }
457  // no file in the catalog
459  return false;
460  }
461  // first file in the catalog, move to the last file in the list
462  if (catalog_.currentIndex() == 0) {
463  return false;
464  } else {
466  }
467  initFile(/*skipBadFiles=*/false);
468  if (rootFile_) {
469  rootFile_->setToLastEntry();
470  }
471  return true;
472  }
473 
474  unique_ptr<EventPrincipal>
475  RootInputFileSequence::readIt(EventID const& id, bool exact)
476  {
477  // Attempt to find event in currently open input file.
478  bool found = rootFile_->setEntry<InEvent>(id, exact);
479  if (found) {
481  unique_ptr<EventPrincipal> eptr(readEvent_());
482  return eptr;
483  }
484  if (!catalog_.isSearchable()) {
485  // return unique_ptr<EventPrincipal>();
486  return 0;
487  }
488  // Look for event in cached files
489  for (auto IB = fileIndexes_.cbegin(), IE = fileIndexes_.cend(), I = IB;
490  I != IE;
491  ++I) {
492  if (*I && (*I)->contains(id, exact)) {
493  // We found it. Close the currently open file, and open the correct one.
494  catalog_.rewindTo(std::distance(IB, I));
495  initFile(/*skipBadFiles=*/false);
496  // Now get the event from the correct file.
497  found = rootFile_->setEntry<InEvent>(id, exact);
498  assert(found);
500  unique_ptr<EventPrincipal> ep(readEvent_());
501  return ep;
502  }
503  }
504  // Look for event in files not yet opened.
505  while (catalog_.getNextFile()) {
506  initFile(/*skipBadFiles=*/false);
507  found = rootFile_->setEntry<InEvent>(id, exact);
508  if (found) {
510  unique_ptr<EventPrincipal> ep(readEvent_());
511  return ep;
512  }
513  }
514  // Not found
515  return 0;
516  }
517 
518  unique_ptr<EventPrincipal>
520  {
521  // Create and setup the EventPrincipal.
522  //
523  // 1. create an EventPrincipal with a unique EventID
524  // 2. For each entry in the provenance, put in one Group,
525  // holding the Provenance for the corresponding EDProduct.
526  // 3. set up the caches in the EventPrincipal to know about this
527  // Group.
528  //
529  // We do *not* create the EDProduct instance (the equivalent of reading
530  // the branch containing this EDProduct. That will be done by the
531  // Delayed Reader when it is asked to do so.
532  //
534  return rootFile_->readEvent();
535  }
536 
537  std::unique_ptr<RangeSetHandler>
539  {
540  return rootFile_->runRangeSetHandler();
541  }
542 
543  std::unique_ptr<RangeSetHandler>
545  {
546  return rootFile_->subRunRangeSetHandler();
547  }
548 
549  std::unique_ptr<SubRunPrincipal>
551  cet::exempt_ptr<RunPrincipal> rp)
552  {
553  // Attempt to find subRun in currently open input file.
554  bool found = rootFile_->setEntry<InSubRun>(id);
555  if (found) {
556  return readSubRun_(rp);
557  }
558  if (!catalog_.isSearchable()) {
559  return std::unique_ptr<SubRunPrincipal>{nullptr};
560  }
561  // Look for event in cached files
562  typedef vector<std::shared_ptr<FileIndex>>::const_iterator Iter;
563  for (Iter itBegin = fileIndexes_.begin(),
564  itEnd = fileIndexes_.end(),
565  it = itBegin;
566  it != itEnd;
567  ++it) {
568  if (*it && (*it)->contains(id, true)) {
569  // We found it. Close the currently open file, and open the correct one.
570  catalog_.rewindTo(std::distance(itBegin, it));
571  initFile(/*skipBadFiles=*/false);
572  // Now get the subRun from the correct file.
573  found = rootFile_->setEntry<InSubRun>(id);
574  assert(found);
575  return readSubRun_(rp);
576  }
577  }
578  // Look for subRun in files not yet opened.
579  while (catalog_.getNextFile()) {
580  initFile(/*skipBadFiles=*/false);
581  found = rootFile_->setEntry<InSubRun>(id);
582  if (found) {
583  return readSubRun_(rp);
584  }
585  }
586  // not found
587  return std::unique_ptr<SubRunPrincipal>{nullptr};
588  }
589 
590  std::unique_ptr<SubRunPrincipal>
591  RootInputFileSequence::readSubRun_(cet::exempt_ptr<RunPrincipal> rp)
592  {
593  return rootFile_->readSubRun(rp);
594  }
595 
596  std::unique_ptr<RunPrincipal>
598  {
599  // Attempt to find run in current file.
600  bool found = rootFile_->setEntry<InRun>(id);
601  if (found) {
602  // Got it, read the run.
603  return readRun_();
604  }
605  if (!catalog_.isSearchable()) {
606  // Cannot random access files, give up.
607  return std::unique_ptr<RunPrincipal>{nullptr};
608  }
609  // Look for the run in the opened files.
610  for (auto B = fileIndexes_.cbegin(), E = fileIndexes_.cend(), I = B; I != E;
611  ++I) {
612  if (*I && (*I)->contains(id, true)) {
613  // We found it, open the file.
614  catalog_.rewindTo(std::distance(B, I));
615  initFile(/*skipBadFiles=*/false);
616  // Now read the run.
617  found = rootFile_->setEntry<InRun>(id);
618  assert(found);
619  return readRun_();
620  }
621  }
622  // Look for run in files not yet opened.
623  while (catalog_.getNextFile()) {
624  initFile(/*skipBadFiles=*/false);
625  found = rootFile_->setEntry<InRun>(id);
626  if (found) {
627  // Got it, read the run.
628  return readRun_();
629  }
630  }
631  // Not found.
632  return std::unique_ptr<RunPrincipal>{nullptr};
633  }
634 
635  std::unique_ptr<RunPrincipal>
637  {
638  return rootFile_->readRun();
639  }
640 
643  {
644  // marked as the first file but failed to find a valid
645  // root file. we should make it stop.
646  if (firstFile_ && !rootFile_) {
647  return input::IsStop;
648  }
649  if (firstFile_) {
650  return input::IsFile;
651  }
652  if (rootFile_) {
653  FileIndex::EntryType entryType = rootFile_->getNextEntryTypeWanted();
654  if (entryType == FileIndex::kEvent) {
655  return input::IsEvent;
656  } else if (entryType == FileIndex::kSubRun) {
657  return input::IsSubRun;
658  } else if (entryType == FileIndex::kRun) {
659  return input::IsRun;
660  }
661  assert(entryType == FileIndex::kEnd);
662  }
663  // now we are either at the end of a root file
664  // or the current file is not a root file
665  if (!catalog_.hasNextFile()) {
666  return input::IsStop;
667  }
668  return input::IsFile;
669  }
670 
671  // Rewind to before the first event that was read.
672  void
674  {
675  if (!catalog_.isSearchable()) {
677  << "RootInputFileSequence::rewind_() "
678  << "cannot rollback on non-searchable file catalogs.";
679  }
680  firstFile_ = true;
681  catalog_.rewind();
682  if (duplicateChecker_.get() != nullptr) {
683  duplicateChecker_->rewind();
684  }
685  }
686 
687  // Rewind to the beginning of the current file
688  void
690  {
691  rootFile_->rewind();
692  }
693 
694  // Advance "offset" events. Offset can be positive or negative (or zero).
695  void
697  {
698  while (offset != 0) {
699  offset = rootFile_->skipEvents(offset);
700  if (offset > 0 && !nextFile()) {
701  return;
702  }
703  if (offset < 0 && !previousFile()) {
704  return;
705  }
706  }
707  rootFile_->skipEvents(0);
708  }
709 
710  bool
712  {
713  return true;
714  }
715 
716  ProcessConfiguration const&
718  {
719  return processConfiguration_;
720  }
721 
722 } // namespace art
size_t currentIndex() const
std::unique_ptr< RangeSetHandler > runRangeSetHandler()
bool hasNextFile(int attempts=5)
std::vector< std::shared_ptr< FileIndex > > fileIndexes_
Float_t E
Definition: plot.C:23
ProcessConfiguration const & processConfiguration_
static constexpr size_t indexEnd
std::vector< FileCatalogItem > const & fileCatalogItems() const
Int_t B
Definition: plot.C:25
bool getNextFile(int attempts=5)
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler()
std::unique_ptr< RunPrincipal > readRun_()
std::unique_ptr< RootInputFile > openSecondaryFile(std::string const &name, cet::exempt_ptr< RootInputFile > primaryFile)
void logFileAction(const char *msg, std::string const &file)
Definition: logFileAction.cc:9
STL namespace.
std::string & url()
Definition: FileCatalog.h:98
std::vector< FileCatalogItem > const & fileCatalogItems() const
std::string const & logicalFileName() const
Definition: FileCatalog.h:30
ProcessConfiguration const & processConfiguration() const
std::shared_ptr< DuplicateChecker > duplicateChecker_
RunNumber_t run() const
Definition: RunID.h:63
EventID seekToEvent(EventID const &, bool exact=false)
std::vector< std::vector< std::string > > const & secondaryFileNames() const
FileCatalogItem const & currentFile() const
intermediate_table::const_iterator const_iterator
GroupSelectorRules groupSelectorRules_
std::unique_ptr< RunPrincipal > readIt(RunID const &)
static RunID maxRun()
Definition: RunID.h:109
RunNumber_t run() const
Definition: SubRunID.h:84
FastCloningInfoProvider fastCloningInfo_
IDNumber_t< Level::SubRun > SubRunNumber_t
Definition: IDNumber.h:118
RootInputFileSharedPtr rootFile_
static SubRunID firstSubRun()
Definition: SubRunID.h:152
void rewindTo(size_t index)
std::unique_ptr< FileBlock > readFile_()
std::vector< std::vector< std::string > > secondaryFileNames_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
InputSource::ProcessingMode processingMode_
MasterProductRegistry & mpr_
std::unique_ptr< EventPrincipal > readEvent_()
void initFile(bool skipBadFiles)
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
IDNumber_t< Level::Event > EventNumber_t
Definition: IDNumber.h:117
HLT enums.
static RunID firstRun()
Definition: RunID.h:115
std::unique_ptr< SubRunPrincipal > readSubRun_(cet::exempt_ptr< RunPrincipal >)
std::vector< std::string > const & fileSources() const
Float_t e
Definition: plot.C:34
static EventID firstEvent()
Definition: EventID.h:191
RootInputFileSharedPtr rootFileForLastReadEvent_
std::string const & fileName() const
Definition: FileCatalog.h:25
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
IDNumber_t< Level::Run > RunNumber_t
Definition: IDNumber.h:119