35 #include "canvas_root_io/Streamers/ProductIDStreamer.h" 36 #include "canvas_root_io/Utilities/DictionaryChecker.h" 37 #include "cetlib/compiler_macros.h" 38 #include "cetlib/container_algorithms.h" 67 have_table(sqlite3* db, std::string
const& table, std::string
const& filename)
70 sqlite3_stmt* stmt =
nullptr;
71 std::string
const ddl{
72 "select 1 from sqlite_master where type='table' and name='" + table +
74 auto rc = sqlite3_prepare_v2(db, ddl.c_str(), -1, &stmt,
nullptr);
75 if (rc == SQLITE_OK) {
76 switch (rc = sqlite3_step(stmt)) {
87 rc = sqlite3_finalize(stmt);
88 if (rc != SQLITE_OK) {
90 <<
"Error interrogating SQLite3 DB in file " << filename <<
".\n";
98 RootInputFile::RootInputFile(
string const& fileName,
99 string const& catalogName,
100 ProcessConfiguration
const& processConfiguration,
101 string const& logicalFileName,
102 unique_ptr<TFile>&& filePtr,
103 EventID
const& origEventID,
104 unsigned int eventsToSkip,
105 bool const compactSubRunRanges,
106 FastCloningInfoProvider
const& fcip,
107 unsigned int treeCacheSize,
108 int64_t treeMaxVirtualSize,
109 int64_t saveMemoryObjectThreshold,
110 bool delayedReadEventProducts,
111 bool delayedReadSubRunProducts,
112 bool delayedReadRunProducts,
113 InputSource::ProcessingMode processingMode,
116 GroupSelectorRules
const& groupSelectorRules,
117 shared_ptr<DuplicateChecker> duplicateChecker,
118 bool dropDescendants,
119 bool const readIncomingParameterSets,
120 exempt_ptr<RootInputFile> primaryFile,
121 vector<string>
const& secondaryFileNames,
122 RootInputFileSequence* rifSequence,
123 MasterProductRegistry& mpr)
124 : fileName_{fileName}
127 , logicalFileName_{logicalFileName}
128 , filePtr_{std::move(filePtr)}
129 , origEventID_{origEventID}
130 , eventsToSkip_{eventsToSkip}
131 , compactSubRunRanges_{compactSubRunRanges}
133 std::make_unique<RootInputTree>(filePtr_.get(),
135 saveMemoryObjectThreshold,
137 compactSubRunRanges_,
139 std::make_unique<RootInputTree>(filePtr_.get(),
141 saveMemoryObjectThreshold,
143 compactSubRunRanges_,
145 std::make_unique<RootInputTree>(filePtr_.get(),
147 saveMemoryObjectThreshold,
149 compactSubRunRanges_,
151 std::make_unique<RootInputTree>(filePtr_.get(),
153 saveMemoryObjectThreshold,
155 compactSubRunRanges_,
157 , delayedReadEventProducts_{delayedReadEventProducts}
158 , delayedReadSubRunProducts_{delayedReadSubRunProducts}
159 , delayedReadRunProducts_{delayedReadRunProducts}
161 , forcedRunOffset_{forcedRunOffset}
162 , noEventSort_{noEventSort}
163 , duplicateChecker_{duplicateChecker}
164 , primaryFile_{primaryFile ? primaryFile :
this}
165 , secondaryFileNames_{secondaryFileNames}
166 , rifSequence_{rifSequence}
168 secondaryFiles_.resize(secondaryFileNames_.size());
169 eventTree().setCacheSize(treeCacheSize);
170 eventTree().setTreeMaxVirtualSize(treeMaxVirtualSize);
171 subRunTree().setTreeMaxVirtualSize(treeMaxVirtualSize);
172 subRunTree().setCacheSize(treeCacheSize);
173 runTree().setTreeMaxVirtualSize(treeMaxVirtualSize);
174 runTree().setCacheSize(treeCacheSize);
176 resultsTree().setTreeMaxVirtualSize(treeMaxVirtualSize);
177 resultsTree().setCacheSize(treeCacheSize);
187 metaDataTree->SetCacheSize(static_cast<Long64_t>(treeCacheSize));
191 fileFormatVersion_ = detail::readMetadata<FileFormatVersion>(metaDataTree);
194 auto findexPtr = &fileIndex_;
198 BranchIDLists branchIDLists{};
201 std::make_unique<BranchIDLists>(std::move(branchIDLists));
202 configureProductIDStreamer(branchIDLists_.get());
208 if (readIncomingParameterSets &&
211 for (
auto const& psEntry : psetMap) {
224 auto pHistMap = detail::readMetadata<ProcessHistoryMap>(metaDataTree);
235 if (fileFormatVersion_.era_ != expected_era) {
237 <<
"Can only read files written during the \"" << expected_era
240 <<
"\"" << fileName_ <<
"\" was " 241 << (fileFormatVersion_.era_.empty() ?
243 (
"set to \"" + fileFormatVersion_.era_ +
"\" "))
248 if (fileFormatVersion_.value_ >= 5) {
249 sqliteDB_ = ServiceHandle<DatabaseConnection> {}
250 ->get<TKeyVFSOpenPolicy>(
"RootFileDB", filePtr_.get());
251 if (readIncomingParameterSets &&
252 have_table(sqliteDB_,
"ParameterSets", fileName_)) {
255 if (art::ServiceRegistry::isAvailable<art::FileCatalogMetadata>() &&
256 have_table(sqliteDB_,
"FileCatalog_metadata", fileName_)) {
257 sqlite3_stmt* stmt{
nullptr};
258 sqlite3_prepare_v2(sqliteDB_,
259 "SELECT Name, Value from FileCatalog_metadata;",
264 std::vector<std::pair<std::string, std::string>> md;
265 while (sqlite3_step(stmt) == SQLITE_ROW) {
266 std::string
const name{
267 reinterpret_cast<char const*
>(sqlite3_column_text(stmt, 0))};
268 std::string
const value{
269 reinterpret_cast<char const*
>(sqlite3_column_text(stmt, 1))};
270 md.emplace_back(name,
value);
272 int const finalize_status = sqlite3_finalize(stmt);
273 if (finalize_status != SQLITE_OK) {
275 <<
"Unexpected status from DB status cleanup: " 276 << sqlite3_errmsg(sqliteDB_) <<
" (0x" << finalize_status <<
").\n";
279 ->setMetadataFromInput(md);
286 readParentageTree(treeCacheSize);
287 initializeDuplicateChecker();
289 fileIndex_.sortBy_Run_SubRun_EventEntry();
291 fiIter_ = fileIndex_.begin();
292 fiBegin_ = fileIndex_.begin();
293 fiEnd_ = fileIndex_.end();
294 readEventHistoryTree(treeCacheSize);
297 productListHolder_ = detail::readMetadata<ProductRegistry>(metaDataTree);
298 auto& prodList = productListHolder_.productList_;
299 dropOnInput(groupSelectorRules, dropDescendants, prodList);
301 auto availableProducts = fillPerBranchTypePresenceFlags(prodList);
303 for (
auto& prod : prodList) {
304 auto& pd = prod.second;
305 auto const& presenceLookup = availableProducts[pd.branchType()];
306 bool const present{presenceLookup.find(pd.productID()) !=
307 cend(presenceLookup)};
308 auto const validity =
present ?
311 pd.setValidity(validity);
312 treePointers_[pd.branchType()]->addBranch(prod.first, pd);
315 mpr.updateFromInputFile(prodList);
317 presentProducts_ = ProductTables{descriptions, availableProducts};
320 fastClonable_ = setIfFastClonable(fcip);
324 root::DictionaryChecker checker{};
325 checker.checkDictionaries<EventAuxiliary>();
326 checker.checkDictionaries<SubRunAuxiliary>();
327 checker.checkDictionaries<RunAuxiliary>();
328 checker.checkDictionaries<ResultsAuxiliary>();
329 checker.reportMissingDictionaries();
332 configureProductIDStreamer();
341 auto parentageTree =
static_cast<TTree*
>(
343 if (!parentageTree) {
347 parentageTree->SetCacheSize(static_cast<Long64_t>(treeCacheSize));
348 auto idBuffer = root::getObjectRequireDict<ParentageID>();
349 auto pidBuffer = &idBuffer;
353 auto parentageBuffer = root::getObjectRequireDict<Parentage>();
354 auto pParentageBuffer = &parentageBuffer;
359 for (
EntryNumber i = 0, numEntries = parentageTree->GetEntries();
363 if (idBuffer != parentageBuffer.id()) {
365 <<
"Corruption of Parentage tree detected.\n";
379 if (fiIter_ == fiEnd_) {
382 return fiIter_->eventID_;
391 if (secondaryFileNames_.size() != 0) {
394 if (!fileIndex_.allEventsInEntryOrder()) {
397 if (eventsToSkip_ != 0) {
411 if (forcedRunOffset_ != 0) {
422 if (it->eventID_ < origEventID_) {
431 if (fiBegin_ == fiEnd_) {
434 forcedRunOffset_ = 0;
435 if (!
RunID(forcedRunNumber).isValid()) {
438 forcedRunOffset_ = forcedRunNumber - fiBegin_->eventID_.run();
439 if (forcedRunOffset_ != 0) {
440 fastClonable_ =
false;
442 return forcedRunOffset_;
445 std::unique_ptr<FileBlock>
448 return std::make_unique<RootFileBlock>(
452 cet::make_exempt_ptr(eventTree().tree()),
459 if (fiIter_ == fiEnd_) {
462 return fiIter_->getEntryType();
468 auto entryType = getEntryType();
472 RunID currentRun(fiIter_->eventID_.runID());
473 if (!currentRun.isValid()) {
478 if (currentRun < origEventID_.runID()) {
479 fiIter_ = fileIndex_.findPosition(origEventID_.runID(),
false);
480 return getNextEntryTypeWanted();
485 fiIter_ = fileIndex_.findPosition(
486 currentRun.isValid() ? currentRun.next() : currentRun,
false);
487 return getNextEntryTypeWanted();
489 SubRunID const& currentSubRun = fiIter_->eventID_.subRunID();
492 if ((currentRun == origEventID_.runID()) &&
493 (currentSubRun < origEventID_.subRunID())) {
494 fiIter_ = fileIndex_.findSubRunOrRunPosition(origEventID_.subRunID());
495 return getNextEntryTypeWanted();
500 fiIter_ = fileIndex_.findSubRunOrRunPosition(currentSubRun.
next());
501 return getNextEntryTypeWanted();
505 if (fiIter_->eventID_ < origEventID_) {
506 fiIter_ = fileIndex_.findPosition(origEventID_);
507 return getNextEntryTypeWanted();
509 if (duplicateChecker_.get() && duplicateChecker_->isDuplicateAndCheckActive(
510 fiIter_->eventID_, fileName_)) {
512 return getNextEntryTypeWanted();
514 if (eventsToSkip_ == 0) {
520 while ((eventsToSkip_ != 0) && (fiIter_ != fiEnd_) &&
524 while ((eventsToSkip_ != 0) && (fiIter_ != fiEnd_) &&
526 duplicateChecker_.get() &&
527 duplicateChecker_->isDuplicateAndCheckActive(fiIter_->eventID_,
532 return getNextEntryTypeWanted();
538 if (!fileFormatVersion_.isValid()) {
539 fileFormatVersion_.value_ = 0;
541 if (!eventTree().isValid()) {
543 <<
"'Events' tree is corrupted or not present\n" 544 <<
"in the input file.\n";
546 if (fileIndex_.empty()) {
548 <<
"FileIndex information is missing for the input file.\n";
563 for (
auto const& sf : secondaryFiles_) {
567 sf->filePtr_->Close();
577 auto pHistory = history_.get();
578 auto eventHistoryBranch =
580 if (!eventHistoryBranch) {
582 <<
"Failed to find history branch in event history tree.\n";
584 eventHistoryBranch->SetAddress(&pHistory);
591 while ((offset > 0) && (fiIter_ != fiEnd_)) {
597 while ((offset < 0) && (fiIter_ != fiBegin_)) {
603 while ((fiIter_ != fiEnd_) &&
623 unique_ptr<EventPrincipal>
626 assert(fiIter_ != fiEnd_);
628 assert(fiIter_->eventID_.runID().isValid());
630 auto const& entryNumbers = getEntryNumbers(
InEvent);
631 if (!eventTree().current(entryNumbers.first)) {
636 auto ep = readCurrentEvent(entryNumbers);
638 assert(eventAux().run() == fiIter_->eventID_.run() + forcedRunOffset_);
639 assert(eventAux().subRunID() == fiIter_->eventID_.subRunID());
646 unique_ptr<EventPrincipal>
648 std::pair<EntryNumbers, bool>
const& entryNumbers)
650 assert(entryNumbers.first.size() == 1ull);
651 fillAuxiliary<InEvent>(entryNumbers.first.front());
652 assert(eventAux().
id() == fiIter_->eventID_);
654 overrideRunNumber(const_cast<EventID&>(eventAux().
id()),
655 eventAux().isRealData());
656 auto ep = std::make_unique<EventPrincipal>(
658 processConfiguration_,
659 &presentProducts_.get(
InEvent),
661 eventTree().makeBranchMapper(),
662 eventTree().makeDelayedReader(
664 branchIDLists_.get(),
668 entryNumbers.second);
669 eventTree().fillGroups(*ep);
670 if (!delayedReadEventProducts_) {
673 primaryEP_ = make_exempt_ptr(ep.get());
683 if (!setEntry<InEvent>(eID,
true)) {
687 auto const& entryNumbers = getEntryNumbers(
InEvent);
688 assert(entryNumbers.first.size() == 1ull);
689 fillAuxiliary<InEvent>(entryNumbers.first.front());
691 overrideRunNumber(const_cast<EventID&>(eventAux().
id()),
692 eventAux().isRealData());
693 auto ep = std::make_unique<EventPrincipal>(
695 processConfiguration_,
696 &presentProducts_.get(
InEvent),
698 eventTree().makeBranchMapper(),
699 eventTree().makeDelayedReader(
701 branchIDLists_.get(),
705 entryNumbers.second);
706 eventTree().fillGroups(*ep);
707 primaryFile_->primaryEP_->addSecondaryPrincipal(move(ep));
711 unique_ptr<RangeSetHandler>
714 return std::move(runRangeSetHandler_);
717 unique_ptr<RunPrincipal>
720 assert(fiIter_ != fiEnd_);
722 assert(fiIter_->eventID_.runID().isValid());
724 auto const& entryNumbers = getEntryNumbers(
InRun).first;
725 if (!runTree().current(entryNumbers)) {
730 auto rp = readCurrentRun(entryNumbers);
731 advanceEntry(entryNumbers.size());
735 unique_ptr<RunPrincipal>
738 runRangeSetHandler_ = fillAuxiliary<InRun>(entryNumbers);
739 assert(runAux().
id() == fiIter_->eventID_.runID());
740 overrideRunNumber(runAux().id_);
744 if (eventTree().next()) {
745 fillAuxiliary<InEvent>(eventTree().entryNumber());
747 eventTree().previous();
749 runAux().beginTime_ = eventAux().time();
752 auto rp = std::make_unique<RunPrincipal>(
754 processConfiguration_,
755 &presentProducts_.get(
InRun),
756 runTree().makeBranchMapper(),
757 runTree().makeDelayedReader(fileFormatVersion_,
763 runTree().fillGroups(*rp);
764 if (!delayedReadRunProducts_) {
767 primaryRP_ = make_exempt_ptr(rp.get());
777 if (!setEntry<InRun>(rID)) {
781 auto const& entryNumbers = getEntryNumbers(
InRun).first;
782 assert(fiIter_ != fiEnd_);
784 assert(fiIter_->eventID_.runID().isValid());
785 runRangeSetHandler_ = fillAuxiliary<InRun>(entryNumbers);
786 assert(runAux().
id() == fiIter_->eventID_.runID());
787 overrideRunNumber(runAux().id_);
791 if (eventTree().next()) {
792 fillAuxiliary<InEvent>(eventTree().entryNumber());
794 eventTree().previous();
796 runAux().beginTime_ = eventAux().time();
799 auto rp = std::make_unique<RunPrincipal>(
801 processConfiguration_,
802 &presentProducts_.get(
InRun),
803 runTree().makeBranchMapper(),
804 runTree().makeDelayedReader(fileFormatVersion_,
810 runTree().fillGroups(*rp);
811 if (!delayedReadRunProducts_) {
814 primaryFile_->primaryRP_->addSecondaryPrincipal(move(rp));
818 unique_ptr<RangeSetHandler>
821 return std::move(subRunRangeSetHandler_);
824 unique_ptr<SubRunPrincipal>
827 assert(fiIter_ != fiEnd_);
830 auto const& entryNumbers = getEntryNumbers(
InSubRun).first;
831 if (!subRunTree().current(entryNumbers)) {
836 auto srp = readCurrentSubRun(entryNumbers, rp);
837 advanceEntry(entryNumbers.size());
841 unique_ptr<SubRunPrincipal>
844 cet::exempt_ptr<RunPrincipal> rp[[gnu::unused]])
846 subRunRangeSetHandler_ = fillAuxiliary<InSubRun>(entryNumbers);
847 assert(subRunAux().
id() == fiIter_->eventID_.subRunID());
848 overrideRunNumber(subRunAux().id_);
849 assert(subRunAux().runID() == rp->id());
853 if (eventTree().next()) {
854 fillAuxiliary<InEvent>(eventTree().entryNumber());
856 eventTree().previous();
858 subRunAux().beginTime_ = eventAux().time();
862 auto srp = std::make_unique<SubRunPrincipal>(
864 processConfiguration_,
866 subRunTree().makeBranchMapper(),
867 subRunTree().makeDelayedReader(fileFormatVersion_,
873 subRunTree().fillGroups(*srp);
874 if (!delayedReadSubRunProducts_) {
875 srp->readImmediate();
877 primarySRP_ = make_exempt_ptr(srp.get());
887 if (!setEntry<InSubRun>(srID)) {
891 auto const& entryNumbers = getEntryNumbers(
InSubRun).first;
892 assert(fiIter_ != fiEnd_);
894 subRunRangeSetHandler_ = fillAuxiliary<InSubRun>(entryNumbers);
895 assert(subRunAux().
id() == fiIter_->eventID_.subRunID());
896 overrideRunNumber(subRunAux().id_);
900 if (eventTree().next()) {
901 fillAuxiliary<InEvent>(eventTree().entryNumber());
903 eventTree().previous();
905 subRunAux().beginTime_ = eventAux().time();
908 auto srp = std::make_unique<SubRunPrincipal>(
910 processConfiguration_,
912 subRunTree().makeBranchMapper(),
913 subRunTree().makeDelayedReader(fileFormatVersion_,
919 subRunTree().fillGroups(*srp);
920 if (!delayedReadSubRunProducts_) {
921 srp->readImmediate();
923 primaryFile_->primarySRP_->addSecondaryPrincipal(move(srp));
930 if (forcedRunOffset_ != 0) {
931 id =
RunID(
id.run() + forcedRunOffset_);
941 if (forcedRunOffset_ != 0) {
942 id =
SubRunID(
id.run() + forcedRunOffset_,
id.subRun());
949 if (forcedRunOffset_ == 0) {
954 "RootInputFile::overrideRunNumber()"}
955 <<
"The 'setRunNumber' parameter of RootInput cannot " 956 <<
"be used with real data.\n";
958 id =
EventID(
id.run() + forcedRunOffset_,
id.subRun(),
id.
event());
965 eventHistoryTree_ =
static_cast<TTree*
>(
967 if (!eventHistoryTree_) {
969 <<
"Failed to find the event history tree.\n";
971 eventHistoryTree_->SetCacheSize(static_cast<Long64_t>(treeCacheSize));
977 if (duplicateChecker_.get() ==
nullptr) {
980 if (eventTree().next()) {
981 fillAuxiliary<InEvent>(eventTree().entryNumber());
982 duplicateChecker_->init(eventAux().isRealData(), fileIndex_);
984 eventTree().setEntryNumber(-1);
987 std::pair<RootInputFile::EntryNumbers, bool>
993 return std::pair<EntryNumbers, bool>{entries,
true};
995 auto const eid = it->eventID_;
996 auto const subrun = eid.subRun();
997 for (; it != fiEnd_ && eid == it->eventID_; ++it) {
998 entries.push_back(it->entry_);
1001 if (t ==
InEvent && entries.size() > 1ul) {
1003 <<
" has multiple entries for\n" 1007 bool const lastInSubRun{it == fiEnd_ || it->eventID_.subRun() != subrun};
1008 return std::pair<EntryNumbers, bool>{entries, lastInSubRun};
1011 std::array<AvailableProducts_t, NumBranchTypes>
1014 std::array<AvailableProducts_t, NumBranchTypes> result{{}};
1015 for (
auto const& prodpr : prodList) {
1016 auto const& desc = prodpr.second;
1017 if (treePointers_[desc.branchType()]->hasBranch(desc.branchName())) {
1018 result[desc.branchType()].emplace(desc.productID());
1026 bool const dropDescendants,
1035 set<ProductID> branchesToDrop;
1036 for (
auto const& prod : prodList) {
1037 auto const& pd = prod.second;
1038 if (!groupSelector.selected(pd)) {
1039 if (dropDescendants) {
1042 branchesToDrop.insert(pd.productID());
1047 auto branchesToDropEnd = branchesToDrop.cend();
1048 for (
auto I = prodList.begin(),
E = prodList.end(); I !=
E;) {
1049 auto const& pd = I->second;
1050 bool drop = branchesToDrop.find(pd.productID()) != branchesToDropEnd;
1056 if (groupSelector.selected(pd)) {
1058 <<
"Branch '" << pd.branchName()
1059 <<
"' is being dropped from the input\n" 1060 <<
"of file '" << fileName_
1061 <<
"' because it is dependent on a branch\n" 1062 <<
"that was explicitly dropped.\n";
1064 treePointers_[pd.branchType()]->dropBranch(pd.branchName());
1066 prodList.erase(icopy);
1073 secondaryFiles_[idx] =
1074 rifSequence_->openSecondaryFile(secondaryFileNames_[idx],
this);
1077 std::unique_ptr<art::ResultsPrincipal>
1080 std::unique_ptr<art::ResultsPrincipal> resp;
1081 if (resultsTree()) {
1082 resultsTree().rewind();
1083 EntryNumbers const& entryNumbers{resultsTree().entryNumber()};
1084 assert(entryNumbers.size() == 1ull);
1085 fillAuxiliary<InResults>(entryNumbers.front());
1086 resp = std::make_unique<ResultsPrincipal>(
1088 processConfiguration_,
1090 resultsTree().makeBranchMapper(),
1091 resultsTree().makeDelayedReader(
1093 resultsTree().fillGroups(*resp);
1095 resp = std::make_unique<ResultsPrincipal>(
void appendToDescendants(ProductID parent, ProductIDSet &descendants) const
static ParameterSetID const & put(ParameterSet const &ps)
static void put(C const &container)
void readFileIndex(TFile *file, TTree *metaDataTree, FileIndex *&findexPtr)
std::string const & parentageTreeName()
std::map< BranchKey, BranchDescription > ProductList
std::string const & eventHistoryTreeName()
void make_ParameterSet(intermediate_table const &tbl, ParameterSet &ps)
bool fastCloningPermitted() const
std::string couldNotFindTree(std::string const &treename)
std::string const & metaDataTreeName()
off_t remainingSubRuns() const
std::string const & parentageBranchName()
std::string const & parentageIDBranchName()
ParameterSetID id() const
std::string const & getFileFormatEra()
static constexpr Timestamp invalidTimestamp()
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
off_t remainingEvents() const
std::string const & eventHistoryBranchName()
std::string value(boost::any const &)
void checkDictionaries(BranchDescription const &productDesc)
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
std::map< fhicl::ParameterSetID, ParameterSetBlob > ParameterSetMap
auto make_product_descriptions(ProductList const &productList)
static auto emplace(value_type const &value)
T readMetadata(TTree *md, bool const requireDict=true)
Event finding and building.
IDNumber_t< Level::Run > RunNumber_t
static void importFrom(sqlite3 *db)