24 #include "boost/json.hpp" 33 #include "cetlib/BasicPluginFactory.h" 34 #include "cetlib/canonical_string.h" 36 #include "range/v3/view.hpp" 55 OutputModule::~OutputModule() =
default;
58 :
Observer{config().eoFragment().selectEvents(),
59 config().eoFragment().rejectEvents()}
67 std::vector<ParameterSet> fcmdPluginPSets;
68 if (config().fcmdPlugins.get_if_present(fcmdPluginPSets)) {
81 ,
dataTier_{pset.get<
string>(
"dataTier",
"")}
88 std::unique_ptr<Worker>
91 return std::make_unique<OutputWorker>(
this, wp);
129 auto selectProductForBranchType = [
this, &tables](
BranchType const bt) {
137 if (pd.transient() || pd.dropped()) {
146 auto& found_pd = it->second;
188 cet::for_all(
plugins_, [](
auto& p) { p->doBeginJob(); });
194 FDEBUG(2) <<
"beginRun called\n";
197 cet::for_all(
plugins_, [&
r](
auto& p) { p->doBeginRun(
r); });
205 FDEBUG(2) <<
"beginSubRun called\n";
208 cet::for_all(
plugins_, [&sr](
auto& p) { p->doBeginSubRun(sr); });
215 std::atomic<std::size_t>& counts_run,
216 std::atomic<std::size_t>& counts_passed,
217 std::atomic<std::size_t>& )
219 FDEBUG(2) <<
"doEvent called\n";
231 FDEBUG(2) <<
"writeEvent called\n";
232 auto const e = std::as_const(ep).makeEvent(mc);
237 auto const& trRef(trHandle.isValid() ?
243 cet::for_all(
plugins_, [&
e](
auto& p) { p->doCollectMetadata(
e); });
257 FDEBUG(2) <<
"endSubRun called\n";
260 cet::for_all(
plugins_, [&sr](
auto& p) { p->doEndSubRun(sr); });
267 FDEBUG(2) <<
"writeSubRun called\n";
274 FDEBUG(2) <<
"writeAuxiliaryRangeSets(rp) called\n";
281 FDEBUG(2) <<
"endRun called\n";
284 cet::for_all(
plugins_, [&
r](
auto& p) { p->doEndRun(
r); });
291 FDEBUG(2) <<
"writeRun called\n";
299 cet::for_all(
plugins_, [](
auto& p) { p->doEndJob(); });
316 unique_ptr<ResultsPrincipal> respHolder;
318 if (respPtr ==
nullptr) {
319 respHolder = make_unique<ResultsPrincipal>(
323 respPtr = respHolder.get();
394 for (
auto const& [
pid, group] : ep) {
395 if (
auto provenance = group->productProvenance()) {
400 iter->second.insert(provenance->parentageID());
412 for (
auto const& eId : eIds) {
417 for (
auto const& p : par.
parents()) {
532 collectStreamSpecificMetadata(
533 vector<unique_ptr<FileCatalogMetadataPlugin>>
const& plugins,
534 vector<string>
const& pluginNames,
537 size_t pluginCounter = 0;
538 ostringstream errors;
539 for (
auto&
plugin : plugins) {
541 ssmd.reserve(tmp.size() + ssmd.size());
542 for (
auto&& entry : tmp) {
544 -> wantCheckSyntax()) {
545 string checkString(
"{ ");
547 cet::canonical_string(entry.first) +
" : " + entry.second +
" }";
548 boost::json::error_code ec;
550 auto const n_parsed_chars = p.write_some(checkString, ec);
552 errors <<
"OutputModule::writeCatalogMetadata():" << ec.message()
553 <<
" in metadata produced by plugin " 554 << pluginNames[pluginCounter] <<
":\n" 555 <<
" Faulty key/value clause:\n" 556 << checkString <<
'\n' 557 << (n_parsed_chars ? string(n_parsed_chars,
'-') :
"")
561 ssmd.emplace_back(std::move(entry));
565 auto const errMsg = errors.str();
566 if (!errMsg.empty()) {
579 } -> getMetadata(md);
581 md.emplace_back(
"data_tier", cet::canonical_string(
dataTier_));
584 md.emplace_back(
"data_stream", cet::canonical_string(
streamName_));
613 result.reserve(psets.size());
616 for (
auto const& pset : psets) {
617 auto const& libspec =
618 pluginNames_.emplace_back(pset.get<
string>(
"plugin_type"));
623 <<
"unrecognized plugin type " << pluginType <<
".\n";
633 <<
"Exception caught while processing FCMDPlugins[" << count
653 std::array<bool, NumBranchTypes>
const&
virtual void writeFileIdentifier()
bool doOpenFile(FileBlock const &fb)
std::array< bool, NumBranchTypes > const & hasNewlyDroppedBranch() const
void doWriteEvent(EventPrincipal &ep, ModuleContext const &mc)
virtual void writeProcessConfigurationRegistry()
void updateBranchParents(EventPrincipal &ep)
virtual void readResults(ResultsPrincipal const &resp)
virtual void doBeginJob(detail::SharedResources const &resources)
virtual void finishEndFile()
Handle< TriggerResults > getTriggerResults(Event const &e) const
virtual void incrementInputFileNumber()
virtual void event(EventPrincipal const &)
Event makeEvent(ModuleContext const &mc)
virtual void respondToCloseOutputFiles(FileBlock const &)
std::string const & moduleLabel() const
virtual void writeRun(RunPrincipal &r)=0
virtual void writeSubRun(SubRunPrincipal &sr)=0
virtual void doRegisterProducts(ProductDescriptions &, ModuleDescription const &)
std::vector< std::string > pluginNames_
bool doEndSubRun(SubRunPrincipal const &srp, ModuleContext const &mc)
std::vector< ProductID > const & parents() const
virtual void respondToOpenInputFile(FileBlock const &)
void writeFileCatalogMetadata()
void doWriteSubRun(SubRunPrincipal &srp)
std::vector< BranchDescription > ProductDescriptions
void doSelectProducts(ProductTables const &)
bool doBeginSubRun(SubRunPrincipal const &srp, ModuleContext const &mc)
virtual void writeFileFormatVersion()
virtual void beginRun(RunPrincipal const &)
bool doEvent(EventPrincipal const &ep, ModuleContext const &mc, std::atomic< std::size_t > &counts_run, std::atomic< std::size_t > &counts_passed, std::atomic< std::size_t > &counts_failed)
auto const & descriptions(BranchType const bt) const
BranchChildren branchChildren_
virtual void endRun(RunPrincipal const &)
virtual bool isFileOpen() const
virtual std::string const & lastClosedFileName() const
std::vector< std::unique_ptr< FileCatalogMetadataPlugin >> PluginCollection_t
void insertEmpty(ProductID parent)
void doRespondToCloseInputFile(FileBlock const &fb)
decltype(auto) constexpr end(T &&obj)
ADL-aware version of std::end.
void selectProducts(ProductTables const &)
SharedResource_t const LegacyResource
virtual void writeFileIndex()
auto vector(Vector const &v)
Returns a manipulator which will print the specified array.
virtual void respondToOpenOutputFiles(FileBlock const &)
BranchType branchType() const noexcept
GroupSelectorRules groupSelectorRules_
PluginCollection_t plugins_
virtual void setSubRunAuxiliaryRangeSetID(RangeSet const &)
static collection_type const & get()
virtual void writeParameterSetRegistry()
std::array< bool, NumBranchTypes > hasNewlyDroppedBranch_
virtual void endSubRun(SubRunPrincipal const &)
bool doEndRun(RunPrincipal const &rp, ModuleContext const &mc)
Run makeRun(ModuleContext const &mc, RangeSet const &rs=RangeSet::invalid())
std::array< std::unique_ptr< GroupSelector const >, NumBranchTypes > groupSelector_
virtual void setRunAuxiliaryRangeSetID(RangeSet const &)
decltype(auto) values(Coll &&coll)
Range-for loop helper iterating across the values of the specified collection.
virtual void writeParentageRegistry()
EventID const & eventID() const
bool wantEvent(ScheduleID id, Event const &e) const
virtual void write(EventPrincipal &e)=0
void doSetSubRunAuxiliaryRangeSetID(RangeSet const &)
SelectionsArray keptProducts_
ServiceHandle< CatalogInterface > ci_
virtual void writeProductDescriptionRegistry()
CommandLineParser * parser(0)
std::array< Selections, NumBranchTypes > SelectionsArray
virtual Granularity fileGranularity() const
OutputModule(fhicl::ParameterSet const &pset)
std::map< ProductID, std::set< ParentageID > > branchParents_
bool selected(BranchDescription const &) const
ProcessConfiguration const & processConfiguration() const
std::string configuredFileName_
ResultsPrincipal const * resultsPrincipal() const
bool combinable(BranchDescription const &a, BranchDescription const &b)
virtual bool requestsToCloseFile() const
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
void fillDependencyGraph()
void insertChild(ProductID parent, ProductID child)
cet::BasicPluginFactory pluginFactory_
void createQueues(SharedResources const &resources)
void doRespondToOpenOutputFiles(FileBlock const &fb)
virtual void postSelectProducts()
PluginCollection_t makePlugins_(std::vector< fhicl::ParameterSet > const &psets)
virtual void openFile(FileBlock const &)
void doWriteRun(RunPrincipal &rp)
void registerProducts(ProductDescriptions &)
SelectionsArray const & keptProducts() const
virtual void writeProcessHistoryRegistry()
void serialize(T const &...)
SubRun makeSubRun(ModuleContext const &mc, RangeSet const &rs=RangeSet::invalid())
virtual void writeProductDependencies()
virtual void setFileStatus(OutputFileStatus)
void for_each_branch_type(F f)
virtual void doWriteFileCatalogMetadata(FileCatalogMetadata::collection_type const &md, FileCatalogMetadata::collection_type const &ssmd)
virtual void respondToCloseInputFile(FileBlock const &)
void doRespondToCloseOutputFiles(FileBlock const &fb)
std::unique_ptr< Worker > doMakeWorker(WorkerParams const &wp) final
BranchChildren const & branchChildren() const
virtual void beginSubRun(SubRunPrincipal const &)
void doRespondToOpenInputFile(FileBlock const &fb)
virtual void startEndFile()
cet::coded_exception< error, detail::translate > exception
void doSetRunAuxiliaryRangeSetID(RangeSet const &)
ModuleDescription const & moduleDescription() const
bool doBeginRun(RunPrincipal const &rp, ModuleContext const &)