5 #include "TBranchElement.h" 27 #include "boost/date_time/posix_time/posix_time.hpp" 44 #include "canvas_root_io/Utilities/DictionaryChecker.h" 45 #include "cetlib/canonical_string.h" 46 #include "cetlib/container_algorithms.h" 47 #include "cetlib/exempt_ptr.h" 48 #include "cetlib/sqlite/Ntuple.h" 49 #include "cetlib/sqlite/Transaction.h" 50 #include "cetlib/sqlite/create_table.h" 51 #include "cetlib/sqlite/exec.h" 52 #include "cetlib/sqlite/insert.h" 70 create_table(sqlite3*
const db,
71 std::string
const& name,
72 std::vector<std::string>
const& columns,
73 std::string
const& suffix = {})
77 <<
"Number of sqlite columns specified for table: " << name <<
'\n' 80 std::string ddl =
"DROP TABLE IF EXISTS " + name +
83 name +
"(" + columns.front();
84 std::for_each(columns.begin() + 1, columns.end(), [&ddl](
auto const&
col) {
90 sqlite::exec(db, ddl);
94 insert_eventRanges_row(sqlite3_stmt* stmt,
99 sqlite3_bind_int64(stmt, 1, sr);
100 sqlite3_bind_int64(stmt, 2, b);
101 sqlite3_bind_int64(stmt, 3, e);
107 insert_rangeSets_eventSets_row(sqlite3_stmt* stmt,
111 sqlite3_bind_int64(stmt, 1, rsid);
112 sqlite3_bind_int64(stmt, 2, esid);
118 getNewRangeSetID(sqlite3* db,
124 return sqlite3_last_insert_rowid(db);
130 vector<unsigned> rangeSetIDs;
132 rs, std::back_inserter(rangeSetIDs), [db](
auto const& range) {
133 sqlite::query_result<unsigned> r;
134 r << sqlite::select(
"ROWID")
135 .from(db,
"EventRanges")
143 return unique_value(r);
151 sqlite::Transaction txn{db};
152 sqlite3_stmt* stmt{
nullptr};
153 std::string
const ddl{
"INSERT INTO EventRanges(SubRun, begin, end) " 155 sqlite3_prepare_v2(db, ddl.c_str(), -1, &stmt,
nullptr);
156 for (
auto const& range : rs) {
157 insert_eventRanges_row(stmt, range.subRun(), range.begin(), range.end());
159 sqlite3_finalize(stmt);
164 insertIntoJoinTable(sqlite3* db,
167 vector<unsigned>
const& eventRangesIDs)
169 sqlite::Transaction txn{db};
170 sqlite3_stmt* stmt{
nullptr};
171 std::string
const ddl{
173 "RangeSets_EventRanges(RangeSetsID, EventRangesID) Values(?,?);"};
174 sqlite3_prepare_v2(db, ddl.c_str(), -1, &stmt,
nullptr);
175 cet::for_all(eventRangesIDs, [stmt, rsID](
auto const eventRangeID) {
176 insert_rangeSets_eventSets_row(stmt, rsID, eventRangeID);
178 sqlite3_finalize(stmt);
196 if (productRS.
ranges().empty())
199 auto const r = productRS.
run();
200 auto const& productFront = productRS.
ranges().front();
201 if (!principalRS.
contains(r, productFront.subRun(), productFront.begin()))
246 template <BranchType BT>
250 bool const producedInThisProcess)
263 if (!producedInThisProcess) {
264 maybeInvalidateRangeSet(BT, principalRS, rs);
269 template <BranchType BT>
278 template <BranchType BT>
283 std::map<unsigned, unsigned>& )
286 template <BranchType BT>
291 std::map<unsigned, unsigned>& checksumToIndexLookup)
297 auto it = checksumToIndexLookup.find(rs.
checksum());
298 if (it != checksumToIndexLookup.cend()) {
301 unsigned const rsID = getNewRangeSetID(db, BT, rs.
run());
303 checksumToIndexLookup.emplace(rs.
checksum(), rsID);
304 insertIntoEventRanges(db, rs);
305 auto const& eventRangesIDs = getExistingRangeSetIDs(db, rs);
306 insertIntoJoinTable(db, BT, rsID, eventRangesIDs);
313 string const& fileName,
315 int const compressionLevel,
316 int64_t
const saveMemoryObjectThreshold,
317 int64_t
const treeMaxVirtualSize,
318 int const splitLevel,
319 int const basketSize,
321 bool const dropMetaDataForDroppedData,
322 bool const fastCloningRequested)
336 std::make_unique<RootOutputTree>(
344 saveMemoryObjectThreshold),
345 std::make_unique<RootOutputTree>(
353 saveMemoryObjectThreshold),
354 std::make_unique<RootOutputTree>(
362 saveMemoryObjectThreshold),
363 std::make_unique<RootOutputTree>(
371 saveMemoryObjectThreshold)}}
375 SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE)}
387 if (!eventHistoryTree_) {
389 <<
"Failed to create the tree for History objects\n";
398 <<
"Failed to create a branch for History in the output file\n";
404 root::DictionaryChecker checker{};
409 checker.reportMissingDictionaries();
423 "UNIQUE (SubRun,begin,end) ON CONFLICT IGNORE"});
427 create_table(
rootFileDB_,
"SubRunRangeSets", column<int>{
"Run"});
429 "SubRunRangeSets_EventRanges",
430 {
"RangeSetsID INTEGER",
431 "EventRangesID INTEGER",
432 "PRIMARY KEY(RangeSetsID,EventRangesID)"},
436 create_table(
rootFileDB_,
"RunRangeSets", column<int>{
"Run"});
438 "RunRangeSets_EventRanges",
439 {
"RangeSetsID INTEGER",
440 "EventRangesID INTEGER",
441 "PRIMARY KEY(RangeSetsID,EventRangesID)"},
463 if (pd->transient()) {
470 for (
auto const& val : items) {
471 treePointers_[bt]->addOutputBranch(*val.branchDescription_, val.product_);
478 bool const fastCloneFromOutputModule)
482 fastCloneFromOutputModule && rfb};
489 <<
"Fast cloning deactivated for this input file due to " 490 <<
"splitting level and/or basket size.";
492 }
else if (rfb && rfb->tree() &&
493 rfb->tree()->GetCurrentFile()->GetVersion() < 60001) {
495 <<
"Fast cloning deactivated for this input file due to " 496 <<
"ROOT version used to write it (< 6.00/01)\n" 497 "having a different splitting policy.";
503 <<
"Fast cloning deactivated for this input file due to " 504 <<
"reading in file that has a different ProductID schema.";
510 <<
"Fast cloning reactivated for this input file.";
513 auto tree = (rfb && rfb->tree()) ? rfb->tree() :
nullptr;
526 cet::for_all(
treePointers_, [](
auto const& p) { p->setEntries(); });
532 using namespace std::chrono;
533 unsigned int constexpr oneK{1024u};
551 History historyForOutput{e.history()};
557 <<
"Failed to fill the History tree for event: " << e.id()
558 <<
"\nTTree::Fill() returned " << sz <<
" bytes written." << endl;
562 string dataType{
"MC"};
599 auto pid = root::getObjectRequireDict<ParentageID>();
604 <<
"Failed to create a branch for ParentageIDs in the output file";
608 auto par = root::getObjectRequireDict<Parentage>();
613 <<
"Failed to create a branch for Parentages in the output file";
632 auto const* pver = &ver;
634 metaBranchRootName<FileFormatVersion>(), &pver,
basketSize_, 0);
645 auto const* findexElemPtr = &elem;
647 metaBranchRootName<FileIndex::Element>(), &findexElemPtr,
basketSize_, 0);
651 findexElemPtr = &entry;
675 pHistMap.emplace(pr);
677 auto const* p = &pHistMap;
679 metaBranchRootName<ProcessHistoryMap>(), &p,
basketSize_, 0);
684 "ProcessHistoryMap branch in output " 696 Ntuple<std::string, std::string> fileCatalogMetadata{
697 rootFileDB_,
"FileCatalog_metadata", {{
"Name",
"Value"}},
true};
699 for (
auto const& kv : md) {
700 fileCatalogMetadata.insert(kv.first, kv.second);
703 fileCatalogMetadata.insert(
"file_format",
"\"artroot\"");
704 fileCatalogMetadata.insert(
"file_format_era",
706 fileCatalogMetadata.insert(
"file_format_version",
710 namespace bpt = boost::posix_time;
711 auto formatted_time = [](
auto const& t) {
712 return cet::canonical_string(bpt::to_iso_extended_string(t));
714 fileCatalogMetadata.insert(
"start_time",
717 fileCatalogMetadata.insert(
719 formatted_time(boost::posix_time::second_clock::universal_time()));
724 auto I = find_if(md.crbegin(), md.crend(), [](
auto const& p) {
725 return p.first ==
"run_type";
728 if (I != md.crend()) {
732 buf <<
"[ " << srid.run() <<
", " << srid.subRun() <<
", " 733 << cet::canonical_string(I->second) <<
" ], ";
736 buf.seekp(-2, ios_base::cur);
738 fileCatalogMetadata.insert(
"runs", buf.str());
744 auto eidToTuple = [](
EventID const& eid) ->
string {
745 ostringstream eidStr;
746 eidStr <<
"[ " << eid.run() <<
", " << eid.subRun() <<
", " << eid.event()
750 fileCatalogMetadata.insert(
"first_event", eidToTuple(stats.
lowestEventID()));
751 fileCatalogMetadata.insert(
"last_event", eidToTuple(stats.
highestEventID()));
753 if (!stats.
parents().empty()) {
754 ostringstream pstring;
756 for (
auto const& parent : stats.
parents()) {
757 pstring << cet::canonical_string(parent) <<
", ";
760 pstring.seekp(-2, ios_base::cur);
762 fileCatalogMetadata.insert(
"parents", pstring.str());
765 for (
auto const& kv : ssmd) {
766 fileCatalogMetadata.insert(kv.first, kv.second);
794 metaBranchRootName<ProductRegistry>(), ®p,
basketSize_, 0);
805 metaBranchRootName<BranchChildren>(), &ppDeps,
basketSize_, 0);
826 auto const branchType =
static_cast<BranchType>(i);
831 template <art::BranchType BT>
834 vector<ProductProvenance>* vpp)
839 map<unsigned, unsigned> checksumToIndex;
841 auto const& principalRS = principal.
seenRanges();
844 auto const* pd = val.branchDescription_;
845 auto const pid = pd->productID();
846 branchesWithStoredHistory_.insert(
pid);
847 bool const produced{pd->produced()};
848 bool const resolveProd = (produced || !fastCloning ||
852 bool const keepProvenance =
857 unique_ptr<ProductProvenance> prov{
nullptr};
858 if (keepProvenance) {
860 prov = std::make_unique<ProductProvenance>(
868 prov = std::make_unique<ProductProvenance>(
869 keptProvenance.emplace(
pid, status));
875 auto const& rs = getRangeSet<BT>(oh, principalRS, produced);
884 auto const* product = getProduct<BT>(oh, rs, pd->wrappedName());
885 setProductRangeSetID<BT>(
887 val.product_ = product;
890 vpp->assign(keptProvenance.begin(), keptProvenance.end());
900 auto const& eventRangesIDs = getExistingRangeSetIDs(
rootFileDB_, ranges);
909 auto const& eventRangesIDs = getExistingRangeSetIDs(
rootFileDB_, ranges);
913 template <BranchType BT>
917 std::string
const& wrappedName)
922 template <BranchType BT>
926 std::string
const& wrappedName)
SubRunAuxiliary const * pSubRunAux_
EventAuxiliary const * pEventAux_
std::set< ProductID > branchesWithStoredHistory_
int const compressionLevel_
static constexpr EventID invalidEvent()
cet::sqlite::Connection rootFileDB_
bool fastCloningEnabledAtConstruction_
ProductProvenance const * productProvenance() const
RootOutputFile(OutputModule *, std::string const &fileName, ClosingCriteria const &fileSwitchCriteria, int const compressionLevel, int64_t const saveMemoryObjectThreshold, int64_t const treeMaxVirtualSize, int const splitLevel, int const basketSize, DropMetaData dropMetaData, bool dropMetaDataForDroppedData, bool fastCloningRequested)
std::enable_if_t<!detail::RangeSetsSupported< BT >::value, art::EDProduct const * > getProduct(OutputHandle const &, RangeSet const &productRS, std::string const &wrappedName)
ProductProvenances * pRunProductProvenanceVector_
int64_t const treeMaxVirtualSize_
BranchChildren const & branchChildren() const
std::chrono::steady_clock::time_point beginTime_
std::unique_ptr< TFile > filePtr_
std::string const & parentageTreeName()
void beginInputFile(RootFileBlock const *, bool fastClone)
SubRunID const & id() const
std::string const & eventHistoryTreeName()
EventAuxiliary const & aux() const
DropMetaData dropMetaData_
void writeOne(EventPrincipal const &)
void writeProductDependencies()
virtual RangeSet seenRanges() const =0
void setRunAuxiliaryRangeSetID(RangeSet const &)
RangeSet const & rangeOfValidity() const
void writeFileFormatVersion()
void writeParentageRegistry()
std::map< ProcessHistoryID const, ProcessHistory > ProcessHistoryMap
std::vector< std::string > parents(bool want_basename=true) const
static void writeTTree(TTree *) noexcept(false)
ProductProvenances * pResultsProductProvenanceVector_
void incrementInputFileNumber()
void writeFileCatalogMetadata(FileStatsCollector const &stats, FileCatalogMetadata::collection_type const &, FileCatalogMetadata::collection_type const &)
bool dropMetaDataForDroppedData_
void setSubRunAuxiliaryRangeSetID(RangeSet const &)
void addEventSelectionEntry(EventSelectionID const &eventSelection)
std::vector< EventRange > const & ranges() const
std::string const & metaDataTreeName()
OutputHandle getForOutput(ProductID const, bool resolveProd) const
bool should_close(FileProperties const &) const
static collection_type const & get()
void updateAge(std::chrono::seconds const age)
auto eventEntryNumber() const
char const * metaBranchRootName()
void writeSubRun(SubRunPrincipal const &)
std::string const & parentageBranchName()
void sortBy_Run_SubRun_Event()
detail::DummyProductCache dummyProductCache_
boost::posix_time::ptime outputFileOpenTime() const
int getFileFormatVersion()
void writeResults(ResultsPrincipal &resp)
IDNumber_t< Level::SubRun > SubRunNumber_t
RootOutputTreePtrArray treePointers_
auto runEntryNumber() const
std::string const & parentageIDBranchName()
SelectionsArray const & keptProducts() const
void respondToCloseInputFile(FileBlock const &)
EDProduct const * product(std::string const &wrappedName)
void addEntry(EventID const &eID, EntryNumber_t entry)
ProductProvenances * pEventProductProvenanceVector_
SubRunAuxiliary const & aux() const
EventID const & highestEventID() const
void setRangeSetID(unsigned const id) const
void writeProductDescriptionRegistry()
std::string const & getFileFormatEra()
std::string const & BranchTypeToString(BranchType const bt)
void writeParameterSetRegistry()
EDProduct const * wrapper() const
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
bool is_full_subRun() const
TTree * eventHistoryTree_
static TTree * makeTTree(TFile *, std::string const &name, int splitLevel)
std::string const & fileIndexTreeName()
std::enable_if_t< B!=Granularity::InputFile > update(OutputFileStatus const status)
fhicl::ParameterSetID selectorConfig() const
std::string const & eventHistoryBranchName()
std::string value(boost::any const &)
void checkDictionaries(BranchDescription const &productDesc)
std::string to_string(Flag_t< Storage > const flag)
Convert a flag into a stream (shows its index).
void writeProcessConfigurationRegistry()
void createDatabaseTables()
bool contains(RunNumber_t, SubRunNumber_t, EventNumber_t) const
auto subRunEntryNumber() const
int64_t const saveMemoryObjectThreshold_
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
static RangeSet invalid()
RunAuxiliary const & aux() const
void writeRun(RunPrincipal const &)
IDNumber_t< Level::Event > EventNumber_t
void writeProcessHistoryRegistry()
ResultsAuxiliary const & aux() const
ResultsAuxiliary const * pResultsAux_
std::vector< evd::details::RawDigitInfo_t >::const_iterator end(RawDigitCacheDataClass const &cache)
std::set< SubRunID > const & seenSubRuns() const
ProductProvenances * pSubRunProductProvenanceVector_
std::string to_string(ModuleType mt)
RunAuxiliary const * pRunAux_
OutputItemListArray selectedOutputItemList_
void setRangeSetID(unsigned const id)
History const * pHistory_
bool shouldFastClone(bool fastCloningSet, bool fastCloning, bool wantAllEvents, ClosingCriteria const &fileProperties)
std::size_t eventsThisFile() const
ClosingCriteria fileSwitchCriteria_
void updateSize(unsigned const size)
unsigned checksum() const
ProductStatus neverCreated()
EventID const & lowestEventID() const
void fillBranches(Principal const &, std::vector< ProductProvenance > *)
static void exportTo(sqlite3 *db)
EventID const & id() const
void setRangeSetID(unsigned const id) const
bool requestsToCloseFile()
IDNumber_t< Level::Run > RunNumber_t