LArSoft  v06_85_00
Liquid Argon Software toolkit - http://larsoft.org/
RootOutputFile.cc
Go to the documentation of this file.
2 // vim: set sw=2:
3 
4 #include "Rtypes.h"
5 #include "TBranchElement.h"
6 #include "TClass.h"
7 #include "TFile.h"
8 #include "TTree.h"
27 #include "boost/date_time/posix_time/posix_time.hpp"
44 #include "canvas_root_io/Utilities/DictionaryChecker.h"
45 #include "cetlib/canonical_string.h"
46 #include "cetlib/container_algorithms.h"
47 #include "cetlib/exempt_ptr.h"
48 #include "cetlib/sqlite/Ntuple.h"
49 #include "cetlib/sqlite/Transaction.h"
50 #include "cetlib/sqlite/create_table.h"
51 #include "cetlib/sqlite/exec.h"
52 #include "cetlib/sqlite/insert.h"
53 #include "fhiclcpp/ParameterSet.h"
56 
57 #include <algorithm>
58 #include <utility>
59 #include <vector>
60 
61 using namespace cet;
62 using namespace std;
63 using art::BranchType;
66 
67 namespace {
68 
69  void
70  create_table(sqlite3* const db,
71  std::string const& name,
72  std::vector<std::string> const& columns,
73  std::string const& suffix = {})
74  {
75  if (columns.empty())
77  << "Number of sqlite columns specified for table: " << name << '\n'
78  << "is zero.\n";
79 
80  std::string ddl = "DROP TABLE IF EXISTS " + name +
81  "; "
82  "CREATE TABLE " +
83  name + "(" + columns.front();
84  std::for_each(columns.begin() + 1, columns.end(), [&ddl](auto const& col) {
85  ddl += "," + col;
86  });
87  ddl += ") ";
88  ddl += suffix;
89  ddl += ";";
90  sqlite::exec(db, ddl);
91  }
92 
93  void
94  insert_eventRanges_row(sqlite3_stmt* stmt,
95  art::SubRunNumber_t const sr,
96  art::EventNumber_t const b,
97  art::EventNumber_t const e)
98  {
99  sqlite3_bind_int64(stmt, 1, sr);
100  sqlite3_bind_int64(stmt, 2, b);
101  sqlite3_bind_int64(stmt, 3, e);
102  sqlite3_step(stmt);
103  sqlite3_reset(stmt);
104  }
105 
106  void
107  insert_rangeSets_eventSets_row(sqlite3_stmt* stmt,
108  unsigned const rsid,
109  unsigned const esid)
110  {
111  sqlite3_bind_int64(stmt, 1, rsid);
112  sqlite3_bind_int64(stmt, 2, esid);
113  sqlite3_step(stmt);
114  sqlite3_reset(stmt);
115  }
116 
117  unsigned
118  getNewRangeSetID(sqlite3* db,
119  art::BranchType const bt,
120  art::RunNumber_t const r)
121  {
122  sqlite::insert_into(db, art::BranchTypeToString(bt) + "RangeSets")
123  .values(r);
124  return sqlite3_last_insert_rowid(db);
125  }
126 
127  vector<unsigned>
128  getExistingRangeSetIDs(sqlite3* db, art::RangeSet const& rs)
129  {
130  vector<unsigned> rangeSetIDs;
131  cet::transform_all(
132  rs, std::back_inserter(rangeSetIDs), [db](auto const& range) {
133  sqlite::query_result<unsigned> r;
134  r << sqlite::select("ROWID")
135  .from(db, "EventRanges")
136  .where("SubRun=" + std::to_string(range.subRun()) +
137  " AND "
138  "begin=" +
139  std::to_string(range.begin()) +
140  " AND "
141  "end=" +
142  std::to_string(range.end()));
143  return unique_value(r);
144  });
145  return rangeSetIDs;
146  }
147 
148  void
149  insertIntoEventRanges(sqlite3* db, art::RangeSet const& rs)
150  {
151  sqlite::Transaction txn{db};
152  sqlite3_stmt* stmt{nullptr};
153  std::string const ddl{"INSERT INTO EventRanges(SubRun, begin, end) "
154  "VALUES(?, ?, ?);"};
155  sqlite3_prepare_v2(db, ddl.c_str(), -1, &stmt, nullptr);
156  for (auto const& range : rs) {
157  insert_eventRanges_row(stmt, range.subRun(), range.begin(), range.end());
158  }
159  sqlite3_finalize(stmt);
160  txn.commit();
161  }
162 
163  void
164  insertIntoJoinTable(sqlite3* db,
165  art::BranchType const bt,
166  unsigned const rsID,
167  vector<unsigned> const& eventRangesIDs)
168  {
169  sqlite::Transaction txn{db};
170  sqlite3_stmt* stmt{nullptr};
171  std::string const ddl{
172  "INSERT INTO " + art::BranchTypeToString(bt) +
173  "RangeSets_EventRanges(RangeSetsID, EventRangesID) Values(?,?);"};
174  sqlite3_prepare_v2(db, ddl.c_str(), -1, &stmt, nullptr);
175  cet::for_all(eventRangesIDs, [stmt, rsID](auto const eventRangeID) {
176  insert_rangeSets_eventSets_row(stmt, rsID, eventRangeID);
177  });
178  sqlite3_finalize(stmt);
179  txn.commit();
180  }
181 
182  void
183  maybeInvalidateRangeSet(BranchType const bt,
184  art::RangeSet const& principalRS,
185  art::RangeSet& productRS)
186  {
187  assert(principalRS.is_sorted());
188  assert(productRS.is_sorted());
189 
190  if (!productRS.is_valid())
191  return;
192  if (bt == art::InRun && productRS.is_full_run())
193  return;
194  if (bt == art::InSubRun && productRS.is_full_subRun())
195  return;
196  if (productRS.ranges().empty())
197  return;
198 
199  auto const r = productRS.run();
200  auto const& productFront = productRS.ranges().front();
201  if (!principalRS.contains(r, productFront.subRun(), productFront.begin()))
202  productRS = art::RangeSet::invalid();
203  }
204 
206 
207  // The purpose of 'maybeInvalidateRangeSet' is to support the
208  // following situation. Suppose process 1 creates three files with
209  // one Run product each, all corresponding to the same Run. Let's
210  // call the individual Run product instances in the three separate
211  // files as A, B, and C. Now suppose that the three files serve as
212  // inputs to process 2, where a concatenation is being performed AND
213  // ALSO an output file switch. Process 2 results in two output
214  // files, and now, in process 3, we concatenate the outputs from
215  // process 2. The situation would look like this:
216  //
217  // Process 1: [A] [B] [C]
218  // \ / \ /
219  // Process 2: [A + B] [B + C]
220  // \ / \ /
221  // D=agg(A,B) | | E=agg(B,C)
222  // \ /
223  // Process 3: [D + E]
224  //
225  // Notice the complication in process 3: product 'B' will be
226  // aggregated twice: once with A, and once with C. Whenever the
227  // output from process 3 is read as input to another process, the
228  // fetched product will be equivalent to A+2B+C.
229  //
230  // To avoid this situation, we compare the RangeSet of the product
231  // with the RangeSet of the in-memory RunAuxiliary. If the
232  // beginning of B's RangeSet is not contained within the auxiliary's
233  // RangeSet, then a dummy product with an invalid RangeSet is
234  // written to disk. Instead of the diagram above, we have:
235  //
236  // Process 1: [A] [B] [C]
237  // \ / \ /
238  // Process 2: [A + B] [x + C]
239  // \ / \ /
240  // D=agg(A,B) | | E=agg(x,C)=C
241  // \ /
242  // Process 3: [D + E]
243  //
244  // where 'x' represent a dummy product. Upon aggregating D and E,
245  // we obtain the correctly formed A+B+C product.
246  template <BranchType BT>
248  getRangeSet(art::OutputHandle const& oh,
249  art::RangeSet const& principalRS,
250  bool const producedInThisProcess)
251  {
252  auto rs = oh.isValid() ? oh.rangeOfValidity() : art::RangeSet::invalid();
253  // Because a user can specify (e.g.):
254  // r.put(std::move(myProd), art::runFragment(myRangeSet));
255  // products that are produced in this process can have valid, yet
256  // arbitrary RangeSets. We therefore never invalidate a RangeSet
257  // that corresponds to a product produced in this process.
258  //
259  // It is possible for a user to specify a RangeSet which does not
260  // correspond AT ALL to the in-memory auxiliary RangeSet. In that
261  // case, users should not expect to be able to retrieve products
262  // for which no corresponding events or sub-runs were processed.
263  if (!producedInThisProcess) {
264  maybeInvalidateRangeSet(BT, principalRS, rs);
265  }
266  return rs;
267  }
268 
269  template <BranchType BT>
271  getRangeSet(art::OutputHandle const&,
272  art::RangeSet const& /*principalRS*/,
273  bool const /*producedInThisProcess*/)
274  {
275  return art::RangeSet::invalid();
276  }
277 
278  template <BranchType BT>
280  setProductRangeSetID(art::RangeSet const& /*rs*/,
281  sqlite3*,
283  std::map<unsigned, unsigned>& /*checksumToIndexLookup*/)
284  {}
285 
286  template <BranchType BT>
288  setProductRangeSetID(art::RangeSet const& rs,
289  sqlite3* db,
290  art::EDProduct* product,
291  std::map<unsigned, unsigned>& checksumToIndexLookup)
292  {
293  if (!rs.is_valid()) // Invalid range-sets not written to DB
294  return;
295 
296  // Set range sets for SubRun and Run products
297  auto it = checksumToIndexLookup.find(rs.checksum());
298  if (it != checksumToIndexLookup.cend()) {
299  product->setRangeSetID(it->second);
300  } else {
301  unsigned const rsID = getNewRangeSetID(db, BT, rs.run());
302  product->setRangeSetID(rsID);
303  checksumToIndexLookup.emplace(rs.checksum(), rsID);
304  insertIntoEventRanges(db, rs);
305  auto const& eventRangesIDs = getExistingRangeSetIDs(db, rs);
306  insertIntoJoinTable(db, BT, rsID, eventRangesIDs);
307  }
308  }
309 
310 } // unnamed namespace
311 
313  string const& fileName,
314  ClosingCriteria const& fileSwitchCriteria,
315  int const compressionLevel,
316  int64_t const saveMemoryObjectThreshold,
317  int64_t const treeMaxVirtualSize,
318  int const splitLevel,
319  int const basketSize,
320  DropMetaData dropMetaData,
321  bool const dropMetaDataForDroppedData,
322  bool const fastCloningRequested)
323  : om_{om}
324  , file_{fileName}
325  , fileSwitchCriteria_{fileSwitchCriteria}
326  , compressionLevel_{compressionLevel}
327  , saveMemoryObjectThreshold_{saveMemoryObjectThreshold}
328  , treeMaxVirtualSize_{treeMaxVirtualSize}
329  , splitLevel_{splitLevel}
330  , basketSize_{basketSize}
331  , dropMetaData_{dropMetaData}
332  , dropMetaDataForDroppedData_{dropMetaDataForDroppedData}
333  , fastCloningEnabledAtConstruction_{fastCloningRequested}
334  , filePtr_{TFile::Open(file_.c_str(), "recreate", "", compressionLevel)}
335  , treePointers_{{// Order (and number) must match BranchTypes.h!
336  std::make_unique<RootOutputTree>(
337  filePtr_.get(),
338  InEvent,
339  pEventAux_,
341  basketSize,
342  splitLevel,
343  treeMaxVirtualSize,
344  saveMemoryObjectThreshold),
345  std::make_unique<RootOutputTree>(
346  filePtr_.get(),
347  InSubRun,
348  pSubRunAux_,
350  basketSize,
351  splitLevel,
352  treeMaxVirtualSize,
353  saveMemoryObjectThreshold),
354  std::make_unique<RootOutputTree>(
355  filePtr_.get(),
356  InRun,
357  pRunAux_,
359  basketSize,
360  splitLevel,
361  treeMaxVirtualSize,
362  saveMemoryObjectThreshold),
363  std::make_unique<RootOutputTree>(
364  filePtr_.get(),
365  InResults,
366  pResultsAux_,
368  basketSize,
369  splitLevel,
370  treeMaxVirtualSize,
371  saveMemoryObjectThreshold)}}
373  "RootFileDB",
374  filePtr_.get(),
375  SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE)}
376 {
377  // Don't split metadata tree or event description tree
378  metaDataTree_ =
384  // Create the tree that will carry (event) History objects.
386  filePtr_.get(), rootNames::eventHistoryTreeName(), splitLevel);
387  if (!eventHistoryTree_) {
389  << "Failed to create the tree for History objects\n";
390  }
391 
392  pHistory_ = new History;
393  if (!eventHistoryTree_->Branch(rootNames::eventHistoryBranchName().c_str(),
394  &pHistory_,
395  basketSize,
396  0)) {
398  << "Failed to create a branch for History in the output file\n";
399  }
400  delete pHistory_;
401  pHistory_ = nullptr;
402 
403  // Check that dictionaries for the auxiliaries exist
404  root::DictionaryChecker checker{};
405  checker.checkDictionaries<EventAuxiliary>();
406  checker.checkDictionaries<SubRunAuxiliary>();
407  checker.checkDictionaries<RunAuxiliary>();
408  checker.checkDictionaries<ResultsAuxiliary>();
409  checker.reportMissingDictionaries();
410 
412 }
413 
414 void
416 {
417  // Event ranges
418  create_table(rootFileDB_,
419  "EventRanges",
420  {"SubRun INTEGER",
421  "begin INTEGER",
422  "end INTEGER",
423  "UNIQUE (SubRun,begin,end) ON CONFLICT IGNORE"});
424 
425  // SubRun range sets
426  using namespace cet::sqlite;
427  create_table(rootFileDB_, "SubRunRangeSets", column<int>{"Run"});
428  create_table(rootFileDB_,
429  "SubRunRangeSets_EventRanges",
430  {"RangeSetsID INTEGER",
431  "EventRangesID INTEGER",
432  "PRIMARY KEY(RangeSetsID,EventRangesID)"},
433  "WITHOUT ROWID");
434 
435  // Run range sets
436  create_table(rootFileDB_, "RunRangeSets", column<int>{"Run"});
437  create_table(rootFileDB_,
438  "RunRangeSets_EventRanges",
439  {"RangeSetsID INTEGER",
440  "EventRangesID INTEGER",
441  "PRIMARY KEY(RangeSetsID,EventRangesID)"},
442  "WITHOUT ROWID");
443 }
444 
445 void
447 {
448  for (int i = InEvent; i < NumBranchTypes; ++i) {
449  auto const bt = static_cast<BranchType>(i);
450  auto& items = selectedOutputItemList_[bt];
451 
452  for (auto const& pd : om_->keptProducts()[bt]) {
453  // Persist Results products only if they have been produced by
454  // the current process.
455  if (bt == InResults && !pd->produced())
456  continue;
457  checkDictionaries(*pd);
458 
459  // Although the transient flag is already checked when
460  // OutputModule::doSelectProducts is called, it can be flipped
461  // to 'true' after the BranchDescription transients have been
462  // fluffed, which happens during the checkDictionaries call.
463  if (pd->transient()) {
464  continue;
465  }
466 
467  items.emplace(pd);
468  }
469 
470  for (auto const& val : items) {
471  treePointers_[bt]->addOutputBranch(*val.branchDescription_, val.product_);
472  }
473  }
474 }
475 
476 void
478  bool const fastCloneFromOutputModule)
479 {
480  // FIXME: the logic here is nasty.
482  fastCloneFromOutputModule && rfb};
483  // Create output branches, and then redo calculation to determine if
484  // fast cloning should be done.
485  selectProducts();
486  if (shouldFastClone &&
487  !treePointers_[InEvent]->checkSplitLevelAndBasketSize(rfb->tree())) {
488  mf::LogWarning("FastCloning")
489  << "Fast cloning deactivated for this input file due to "
490  << "splitting level and/or basket size.";
491  shouldFastClone = false;
492  } else if (rfb && rfb->tree() &&
493  rfb->tree()->GetCurrentFile()->GetVersion() < 60001) {
494  mf::LogWarning("FastCloning")
495  << "Fast cloning deactivated for this input file due to "
496  << "ROOT version used to write it (< 6.00/01)\n"
497  "having a different splitting policy.";
498  shouldFastClone = false;
499  }
500 
501  if (shouldFastClone && rfb->fileFormatVersion().value_ < 10) {
502  mf::LogWarning("FastCloning")
503  << "Fast cloning deactivated for this input file due to "
504  << "reading in file that has a different ProductID schema.";
505  shouldFastClone = false;
506  }
507 
509  mf::LogWarning("FastCloning")
510  << "Fast cloning reactivated for this input file.";
511  }
512  treePointers_[InEvent]->beginInputFile(shouldFastClone);
513  auto tree = (rfb && rfb->tree()) ? rfb->tree() : nullptr;
514  wasFastCloned_ = treePointers_[InEvent]->fastCloneTree(tree);
515 }
516 
517 void
519 {
521 }
522 
523 void
525 {
526  cet::for_all(treePointers_, [](auto const& p) { p->setEntries(); });
527 }
528 
529 bool
531 {
532  using namespace std::chrono;
533  unsigned int constexpr oneK{1024u};
534  fp_.updateSize(filePtr_->GetSize() / oneK);
535  fp_.updateAge(duration_cast<seconds>(steady_clock::now() - beginTime_));
537 }
538 
539 void
541 {
542  // Auxiliary branch.
543  // Note: pEventAux_ must be set before calling fillBranches
544  // since it gets written out in that routine.
545  pEventAux_ = &e.aux();
546  // Because getting the data may cause an exception to be
547  // thrown we want to do that first before writing anything
548  // to the file about this event.
549  fillBranches<InEvent>(e, pEventProductProvenanceVector_);
550  // History branch.
551  History historyForOutput{e.history()};
552  historyForOutput.addEventSelectionEntry(om_->selectorConfig());
553  pHistory_ = &historyForOutput;
554  int sz = eventHistoryTree_->Fill();
555  if (sz <= 0) {
557  << "Failed to fill the History tree for event: " << e.id()
558  << "\nTTree::Fill() returned " << sz << " bytes written." << endl;
559  }
560  // Add the dataType to the job report if it hasn't already been done
561  if (!dataTypeReported_) {
562  string dataType{"MC"};
563  if (pEventAux_->isRealData()) {
564  dataType = "Data";
565  }
566  dataTypeReported_ = true;
567  }
568  pHistory_ = &e.history();
569  // Add event to index
572 }
573 
574 void
576 {
577  pSubRunAux_ = &sr.aux();
579  fillBranches<InSubRun>(sr, pSubRunProductProvenanceVector_);
583 }
584 
585 void
587 {
588  pRunAux_ = &r.aux();
590  fillBranches<InRun>(r, pRunProductProvenanceVector_);
592  fp_.runEntryNumber());
594 }
595 
596 void
598 {
599  auto pid = root::getObjectRequireDict<ParentageID>();
600  ParentageID const* hash = &pid;
601  if (!parentageTree_->Branch(
602  rootNames::parentageIDBranchName().c_str(), &hash, basketSize_, 0)) {
604  << "Failed to create a branch for ParentageIDs in the output file";
605  }
606  hash = nullptr;
607 
608  auto par = root::getObjectRequireDict<Parentage>();
609  Parentage const* desc = &par;
610  if (!parentageTree_->Branch(
611  rootNames::parentageBranchName().c_str(), &desc, basketSize_, 0)) {
613  << "Failed to create a branch for Parentages in the output file";
614  }
615  desc = nullptr;
616 
617  for (auto const& pr : ParentageRegistry::get()) {
618  hash = &pr.first;
619  desc = &pr.second;
620  parentageTree_->Fill();
621  }
622  parentageTree_->SetBranchAddress(rootNames::parentageIDBranchName().c_str(),
623  nullptr);
624  parentageTree_->SetBranchAddress(rootNames::parentageBranchName().c_str(),
625  nullptr);
626 }
627 
628 void
630 {
632  auto const* pver = &ver;
633  TBranch* b = metaDataTree_->Branch(
634  metaBranchRootName<FileFormatVersion>(), &pver, basketSize_, 0);
635  // FIXME: Turn this into a throw!
636  assert(b);
637  b->Fill();
638 }
639 
640 void
642 {
644  FileIndex::Element elem{};
645  auto const* findexElemPtr = &elem;
646  TBranch* b = fileIndexTree_->Branch(
647  metaBranchRootName<FileIndex::Element>(), &findexElemPtr, basketSize_, 0);
648  // FIXME: Turn this into a throw!
649  assert(b);
650  for (auto& entry : fileIndex_) {
651  findexElemPtr = &entry;
652  b->Fill();
653  }
654  b->SetAddress(0);
655 }
656 
657 void
659 {
661 }
662 
663 void
665 {
666  // We don't do this yet; currently we're storing a slightly
667  // bloated ProcessHistoryRegistry.
668 }
669 
670 void
672 {
673  ProcessHistoryMap pHistMap;
674  for (auto const& pr : ProcessHistoryRegistry::get()) {
675  pHistMap.emplace(pr);
676  }
677  auto const* p = &pHistMap;
678  TBranch* b = metaDataTree_->Branch(
679  metaBranchRootName<ProcessHistoryMap>(), &p, basketSize_, 0);
680  if (b != nullptr) {
681  b->Fill();
682  } else {
683  throw Exception(errors::LogicError) << "Unable to locate required "
684  "ProcessHistoryMap branch in output "
685  "metadata tree.\n";
686  }
687 }
688 
689 void
691  FileStatsCollector const& stats,
694 {
695  using namespace cet::sqlite;
696  Ntuple<std::string, std::string> fileCatalogMetadata{
697  rootFileDB_, "FileCatalog_metadata", {{"Name", "Value"}}, true};
698  Transaction txn{rootFileDB_};
699  for (auto const& kv : md) {
700  fileCatalogMetadata.insert(kv.first, kv.second);
701  }
702  // Add our own specific information: File format and friends.
703  fileCatalogMetadata.insert("file_format", "\"artroot\"");
704  fileCatalogMetadata.insert("file_format_era",
705  cet::canonical_string(getFileFormatEra()));
706  fileCatalogMetadata.insert("file_format_version",
708 
709  // File start time.
710  namespace bpt = boost::posix_time;
711  auto formatted_time = [](auto const& t) {
712  return cet::canonical_string(bpt::to_iso_extended_string(t));
713  };
714  fileCatalogMetadata.insert("start_time",
715  formatted_time(stats.outputFileOpenTime()));
716  // File "end" time: now, since file is not actually closed yet.
717  fileCatalogMetadata.insert(
718  "end_time",
719  formatted_time(boost::posix_time::second_clock::universal_time()));
720 
721  // Run/subRun information.
722  if (!stats.seenSubRuns().empty()) {
723 
724  auto I = find_if(md.crbegin(), md.crend(), [](auto const& p) {
725  return p.first == "run_type";
726  });
727 
728  if (I != md.crend()) {
729  ostringstream buf;
730  buf << "[ ";
731  for (auto const& srid : stats.seenSubRuns()) {
732  buf << "[ " << srid.run() << ", " << srid.subRun() << ", "
733  << cet::canonical_string(I->second) << " ], ";
734  }
735  // Rewind over last delimiter.
736  buf.seekp(-2, ios_base::cur);
737  buf << " ]";
738  fileCatalogMetadata.insert("runs", buf.str());
739  }
740  }
741  // Number of events.
742  fileCatalogMetadata.insert("event_count", to_string(stats.eventsThisFile()));
743  // first_event and last_event.
744  auto eidToTuple = [](EventID const& eid) -> string {
745  ostringstream eidStr;
746  eidStr << "[ " << eid.run() << ", " << eid.subRun() << ", " << eid.event()
747  << " ]";
748  return eidStr.str();
749  };
750  fileCatalogMetadata.insert("first_event", eidToTuple(stats.lowestEventID()));
751  fileCatalogMetadata.insert("last_event", eidToTuple(stats.highestEventID()));
752  // File parents.
753  if (!stats.parents().empty()) {
754  ostringstream pstring;
755  pstring << "[ ";
756  for (auto const& parent : stats.parents()) {
757  pstring << cet::canonical_string(parent) << ", ";
758  }
759  // Rewind over last delimiter.
760  pstring.seekp(-2, ios_base::cur);
761  pstring << " ]";
762  fileCatalogMetadata.insert("parents", pstring.str());
763  }
764  // Incoming stream-specific metadata overrides.
765  for (auto const& kv : ssmd) {
766  fileCatalogMetadata.insert(kv.first, kv.second);
767  }
768  txn.commit();
769 }
770 
771 void
773 {
775 }
776 
777 void
779 {
780  // Make a local copy of the MasterProductRegistry's ProductList,
781  // removing any transient or pruned products.
782  auto end = branchesWithStoredHistory_.end();
783 
784  ProductRegistry reg;
785  for (auto const& pr : ProductMetaData::instance().productList()) {
786  if (branchesWithStoredHistory_.find(pr.second.productID()) == end) {
787  continue;
788  }
789  reg.productList_.emplace_hint(reg.productList_.end(), pr);
790  }
791 
792  ProductRegistry const* regp = &reg;
793  TBranch* b = metaDataTree_->Branch(
794  metaBranchRootName<ProductRegistry>(), &regp, basketSize_, 0);
795  // FIXME: Turn this into a throw!
796  assert(b);
797  b->Fill();
798 }
799 
800 void
802 {
803  BranchChildren const* ppDeps = &om_->branchChildren();
804  TBranch* b = metaDataTree_->Branch(
805  metaBranchRootName<BranchChildren>(), &ppDeps, basketSize_, 0);
806  // FIXME: Turn this into a throw!
807  assert(b);
808  b->Fill();
809 }
810 
811 void
813 {
814  pResultsAux_ = &resp.aux();
815  fillBranches<InResults>(resp, pResultsProductProvenanceVector_);
816 }
817 
818 void
820 {
824  // Write out the tree corresponding to each BranchType
825  for (int i = InEvent; i < NumBranchTypes; ++i) {
826  auto const branchType = static_cast<BranchType>(i);
827  treePointers_[branchType]->writeTree();
828  }
829 }
830 
831 template <art::BranchType BT>
832 void
834  vector<ProductProvenance>* vpp)
835 {
836  bool const fastCloning = (BT == InEvent) && wasFastCloned_;
837  detail::KeptProvenance keptProvenance{
839  map<unsigned, unsigned> checksumToIndex;
840 
841  auto const& principalRS = principal.seenRanges();
842 
843  for (auto const& val : selectedOutputItemList_[BT]) {
844  auto const* pd = val.branchDescription_;
845  auto const pid = pd->productID();
846  branchesWithStoredHistory_.insert(pid);
847  bool const produced{pd->produced()};
848  bool const resolveProd = (produced || !fastCloning ||
849  treePointers_[BT]->uncloned(pd->branchName()));
850 
851  // Update the kept provenance
852  bool const keepProvenance =
854  (dropMetaData_ == DropMetaData::DropPrior && produced));
855  auto const& oh = principal.getForOutput(pid, resolveProd);
856 
857  unique_ptr<ProductProvenance> prov{nullptr};
858  if (keepProvenance) {
859  if (oh.productProvenance()) {
860  prov = std::make_unique<ProductProvenance>(
861  keptProvenance.insert(*oh.productProvenance()));
862  keptProvenance.insertAncestors(*oh.productProvenance(), principal);
863  } else {
864  // No provenance, product was either not produced, or was
865  // dropped, create provenance to remember that.
866  auto const status =
868  prov = std::make_unique<ProductProvenance>(
869  keptProvenance.emplace(pid, status));
870  }
871  }
872 
873  // Resolve the product if necessary
874  if (resolveProd) {
875  auto const& rs = getRangeSet<BT>(oh, principalRS, produced);
877  // Unfortunately, 'unknown' is the only viable product status
878  // when this condition is triggered (due to the assert
879  // statement in ProductStatus::setNotPresent). Whenever the
880  // metadata revolution comes, this should be revised.
881  keptProvenance.setStatus(*prov, productstatus::unknown());
882  }
883 
884  auto const* product = getProduct<BT>(oh, rs, pd->wrappedName());
885  setProductRangeSetID<BT>(
886  rs, rootFileDB_, const_cast<EDProduct*>(product), checksumToIndex);
887  val.product_ = product;
888  }
889  }
890  vpp->assign(keptProvenance.begin(), keptProvenance.end());
891  treePointers_[BT]->fillTree();
892  vpp->clear();
893 }
894 
895 void
897 {
898  subRunRSID_ = getNewRangeSetID(rootFileDB_, InSubRun, ranges.run());
899  insertIntoEventRanges(rootFileDB_, ranges);
900  auto const& eventRangesIDs = getExistingRangeSetIDs(rootFileDB_, ranges);
901  insertIntoJoinTable(rootFileDB_, InSubRun, subRunRSID_, eventRangesIDs);
902 }
903 
904 void
906 {
907  runRSID_ = getNewRangeSetID(rootFileDB_, InRun, ranges.run());
908  insertIntoEventRanges(rootFileDB_, ranges);
909  auto const& eventRangesIDs = getExistingRangeSetIDs(rootFileDB_, ranges);
910  insertIntoJoinTable(rootFileDB_, InRun, runRSID_, eventRangesIDs);
911 }
912 
913 template <BranchType BT>
916  art::RangeSet const& /*prunedProductRS*/,
917  std::string const& wrappedName)
918 {
919  return oh.isValid() ? oh.wrapper() : dummyProductCache_.product(wrappedName);
920 }
921 
922 template <BranchType BT>
925  art::RangeSet const& prunedProductRS,
926  std::string const& wrappedName)
927 {
928  return (oh.isValid() && prunedProductRS.is_valid()) ?
929  oh.wrapper() :
930  dummyProductCache_.product(wrappedName);
931 }
bool isRealData() const
SubRunAuxiliary const * pSubRunAux_
EventAuxiliary const * pEventAux_
RunID const & id() const
Definition: RunAuxiliary.h:51
std::set< ProductID > branchesWithStoredHistory_
std::vector< std::pair< std::string, std::string >> collection_type
int const compressionLevel_
static constexpr EventID invalidEvent()
Definition: EventID.h:203
cet::sqlite::Connection rootFileDB_
bool fastCloningEnabledAtConstruction_
ProductProvenance const * productProvenance() const
Definition: OutputHandle.h:90
RootOutputFile(OutputModule *, std::string const &fileName, ClosingCriteria const &fileSwitchCriteria, int const compressionLevel, int64_t const saveMemoryObjectThreshold, int64_t const treeMaxVirtualSize, int const splitLevel, int const basketSize, DropMetaData dropMetaData, bool dropMetaDataForDroppedData, bool fastCloningRequested)
std::enable_if_t<!detail::RangeSetsSupported< BT >::value, art::EDProduct const * > getProduct(OutputHandle const &, RangeSet const &productRS, std::string const &wrappedName)
ProductProvenances * pRunProductProvenanceVector_
int64_t const treeMaxVirtualSize_
BranchChildren const & branchChildren() const
Definition: OutputModule.h:349
T * get() const
Definition: ServiceHandle.h:71
std::chrono::steady_clock::time_point beginTime_
std::unique_ptr< TFile > filePtr_
std::string const & parentageTreeName()
Definition: rootNames.cc:22
void beginInputFile(RootFileBlock const *, bool fastClone)
SubRunID const & id() const
std::string const & eventHistoryTreeName()
Definition: rootNames.cc:55
EventAuxiliary const & aux() const
DropMetaData dropMetaData_
STL namespace.
ProductStatus unknown()
Definition: ProductStatus.h:31
void writeOne(EventPrincipal const &)
bool is_sorted() const
Definition: RangeSet.cc:259
virtual RangeSet seenRanges() const =0
void setRunAuxiliaryRangeSetID(RangeSet const &)
RangeSet const & rangeOfValidity() const
Definition: OutputHandle.h:78
RunNumber_t run() const
Definition: RangeSet.h:36
std::map< ProcessHistoryID const, ProcessHistory > ProcessHistoryMap
std::vector< std::string > parents(bool want_basename=true) const
static void writeTTree(TTree *) noexcept(false)
ProductProvenances * pResultsProductProvenanceVector_
void writeFileCatalogMetadata(FileStatsCollector const &stats, FileCatalogMetadata::collection_type const &, FileCatalogMetadata::collection_type const &)
OutputFileStatus status_
void setSubRunAuxiliaryRangeSetID(RangeSet const &)
ProductList productList_
void addEventSelectionEntry(EventSelectionID const &eventSelection)
Definition: History.cc:12
std::vector< EventRange > const & ranges() const
Definition: RangeSet.h:42
std::string const & metaDataTreeName()
Definition: rootNames.cc:41
OutputHandle getForOutput(ProductID const, bool resolveProd) const
Definition: Principal.cc:424
bool should_close(FileProperties const &) const
static collection_type const & get()
void updateAge(std::chrono::seconds const age)
FileProperties fp_
static ProductMetaData const & instance()
auto eventEntryNumber() const
char const * metaBranchRootName()
Definition: rootNames.h:61
void writeSubRun(SubRunPrincipal const &)
std::string const & parentageBranchName()
Definition: rootNames.cc:34
void sortBy_Run_SubRun_Event()
Definition: FileIndex.cc:44
detail::DummyProductCache dummyProductCache_
Int_t col[ntarg]
Definition: Style.C:29
bool is_full_run() const
Definition: RangeSet.cc:246
boost::posix_time::ptime outputFileOpenTime() const
int getFileFormatVersion()
void writeResults(ResultsPrincipal &resp)
IDNumber_t< Level::SubRun > SubRunNumber_t
Definition: IDNumber.h:118
bool isValid() const
Definition: OutputHandle.h:72
RootOutputTreePtrArray treePointers_
auto runEntryNumber() const
std::string const & parentageIDBranchName()
Definition: rootNames.cc:28
SelectionsArray const & keptProducts() const
Definition: OutputModule.h:336
void respondToCloseInputFile(FileBlock const &)
EDProduct const * product(std::string const &wrappedName)
void addEntry(EventID const &eID, EntryNumber_t entry)
Definition: FileIndex.cc:29
ProductProvenances * pEventProductProvenanceVector_
SubRunAuxiliary const & aux() const
EventID const & highestEventID() const
ProductStatus dropped()
Definition: ProductStatus.h:26
void setRangeSetID(unsigned const id) const
void writeProductDescriptionRegistry()
std::string const & getFileFormatEra()
std::string const & BranchTypeToString(BranchType const bt)
Definition: BranchType.cc:65
bool is_valid() const
Definition: RangeSet.cc:230
EDProduct const * wrapper() const
Definition: OutputHandle.h:84
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
bool is_full_subRun() const
Definition: RangeSet.cc:253
static TTree * makeTTree(TFile *, std::string const &name, int splitLevel)
std::string const & fileIndexTreeName()
Definition: rootNames.cc:48
std::enable_if_t< B!=Granularity::InputFile > update(OutputFileStatus const status)
fhicl::ParameterSetID selectorConfig() const
std::string const & eventHistoryBranchName()
Definition: rootNames.cc:62
std::string value(boost::any const &)
void checkDictionaries(BranchDescription const &productDesc)
std::string to_string(Flag_t< Storage > const flag)
Convert a flag into a stream (shows its index).
Definition: BitMask.h:187
void writeProcessConfigurationRegistry()
bool contains(RunNumber_t, SubRunNumber_t, EventNumber_t) const
Definition: RangeSet.cc:214
OutputModule const * om_
auto subRunEntryNumber() const
int64_t const saveMemoryObjectThreshold_
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
static RangeSet invalid()
Definition: RangeSet.cc:46
RunAuxiliary const & aux() const
Definition: RunPrincipal.h:41
void writeRun(RunPrincipal const &)
IDNumber_t< Level::Event > EventNumber_t
Definition: IDNumber.h:117
BranchType
Definition: BranchType.h:18
void writeProcessHistoryRegistry()
ResultsAuxiliary const & aux() const
ResultsAuxiliary const * pResultsAux_
std::vector< evd::details::RawDigitInfo_t >::const_iterator end(RawDigitCacheDataClass const &cache)
std::set< SubRunID > const & seenSubRuns() const
ProductProvenances * pSubRunProductProvenanceVector_
std::string to_string(ModuleType mt)
Definition: ModuleType.h:32
RunAuxiliary const * pRunAux_
OutputItemListArray selectedOutputItemList_
Float_t e
Definition: plot.C:34
void setRangeSetID(unsigned const id)
Definition: EDProduct.h:73
History const * pHistory_
bool shouldFastClone(bool fastCloningSet, bool fastCloning, bool wantAllEvents, ClosingCriteria const &fileProperties)
std::size_t eventsThisFile() const
ClosingCriteria fileSwitchCriteria_
void updateSize(unsigned const size)
unsigned checksum() const
Definition: RangeSet.cc:184
ProductStatus neverCreated()
Definition: ProductStatus.h:21
EventID const & lowestEventID() const
void fillBranches(Principal const &, std::vector< ProductProvenance > *)
static void exportTo(sqlite3 *db)
EventID const & id() const
void setRangeSetID(unsigned const id) const
Definition: RunAuxiliary.h:87
IDNumber_t< Level::Run > RunNumber_t
Definition: IDNumber.h:119