LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
OutputModule.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
24 #include "boost/json.hpp"
33 #include "cetlib/BasicPluginFactory.h"
34 #include "cetlib/canonical_string.h"
35 #include "fhiclcpp/ParameterSet.h"
36 #include "range/v3/view.hpp"
37 
38 #include <array>
39 #include <atomic>
40 #include <cassert>
41 #include <cstddef>
42 #include <iostream>
43 #include <memory>
44 #include <set>
45 #include <string>
46 #include <utility>
47 #include <vector>
48 
49 using namespace std;
50 
52 
53 namespace art {
54 
55  OutputModule::~OutputModule() = default;
56 
57  OutputModule::OutputModule(fhicl::TableFragment<Config> const& config)
58  : Observer{config().eoFragment().selectEvents(),
59  config().eoFragment().rejectEvents()}
60  , groupSelectorRules_{config().outputCommands(),
61  "outputCommands",
62  "OutputModule"}
63  , configuredFileName_{config().fileName()}
64  , dataTier_{config().dataTier()}
65  , streamName_{config().streamName()}
66  {
67  std::vector<ParameterSet> fcmdPluginPSets;
68  if (config().fcmdPlugins.get_if_present(fcmdPluginPSets)) {
69  plugins_ = makePlugins_(fcmdPluginPSets);
70  }
72  }
73 
75  : Observer{pset}
76  , groupSelectorRules_{pset.get<vector<string>>("outputCommands",
77  {"keep *"}),
78  "outputCommands",
79  "OutputModule"}
80  , configuredFileName_{pset.get<string>("fileName", "")}
81  , dataTier_{pset.get<string>("dataTier", "")}
82  , streamName_{pset.get<string>("streamName", "")}
83  , plugins_{makePlugins_(pset.get<vector<ParameterSet>>("FCMDPlugins", {}))}
84  {
86  }
87 
88  std::unique_ptr<Worker>
90  {
91  return std::make_unique<OutputWorker>(this, wp);
92  }
93 
94  bool
96  {
97  return isFileOpen();
98  }
99 
100  void
102  {}
103 
104  bool
106  {
107  return false;
108  }
109 
112  {
113  return Granularity::Unset;
114  }
115 
116  string const&
118  {
119  return configuredFileName_;
120  }
121 
122  void
124  {
125  // Note: The keptProducts_ data member records all of the
126  // BranchDescription objects that may be persisted to disk. Since
127  // we do not reset it, the list never shrinks. This behavior should
128  // be reconsidered for future use cases of art.
129  auto selectProductForBranchType = [this, &tables](BranchType const bt) {
130  auto const& productList = tables.descriptions(bt);
131  groupSelector_[bt] =
132  std::make_unique<GroupSelector const>(groupSelectorRules_, productList);
133  // TODO: See if we can collapse keptProducts_ and groupSelector into
134  // a single object. See the notes in the header for GroupSelector
135  // for more information.
136  for (auto const& pd : productList | ::ranges::views::values) {
137  if (pd.transient() || pd.dropped()) {
138  continue;
139  }
140  if (selected(pd)) {
141  // Here, we take care to merge the BranchDescription objects
142  // if one was already present in the keptProducts list.
143  auto& keptProducts = keptProducts_[bt];
144  if (auto it = keptProducts.find(pd.productID());
145  it != end(keptProducts)) {
146  auto& found_pd = it->second;
147  assert(combinable(found_pd, pd));
148  found_pd.merge(pd);
149  } else {
150  // New product
151  keptProducts.emplace(pd.productID(), pd);
152  }
153  continue;
154  }
155  hasNewlyDroppedBranch_[bt] = true;
156  }
157  };
158  for_each_branch_type(selectProductForBranchType);
159  }
160 
161  void
163  {
164  doSelectProducts(tables);
166  }
167 
168  void
170  {}
171 
172  void
174  {
175  doRegisterProducts(producedProducts, moduleDescription());
176  }
177 
178  void
180  ModuleDescription const&)
181  {}
182 
183  void
185  {
186  createQueues(resources);
187  beginJob();
188  cet::for_all(plugins_, [](auto& p) { p->doBeginJob(); });
189  }
190 
191  bool
193  {
194  FDEBUG(2) << "beginRun called\n";
195  beginRun(rp);
196  auto const r = rp.makeRun(mc);
197  cet::for_all(plugins_, [&r](auto& p) { p->doBeginRun(r); });
198  return true;
199  }
200 
201  bool
203  ModuleContext const& mc)
204  {
205  FDEBUG(2) << "beginSubRun called\n";
206  beginSubRun(srp);
207  auto const sr = srp.makeSubRun(mc);
208  cet::for_all(plugins_, [&sr](auto& p) { p->doBeginSubRun(sr); });
209  return true;
210  }
211 
212  bool
214  ModuleContext const& mc,
215  std::atomic<std::size_t>& counts_run,
216  std::atomic<std::size_t>& counts_passed,
217  std::atomic<std::size_t>& /*counts_failed*/)
218  {
219  FDEBUG(2) << "doEvent called\n";
220  if (wantEvent(mc.scheduleID(), ep.makeEvent(mc))) {
221  ++counts_run;
222  event(ep);
223  ++counts_passed;
224  }
225  return true;
226  }
227 
228  void
230  {
231  FDEBUG(2) << "writeEvent called\n";
232  auto const e = std::as_const(ep).makeEvent(mc);
233  if (wantEvent(mc.scheduleID(), e)) {
234  write(ep);
235  // Declare that the event was selected for write to the catalog interface.
237  auto const& trRef(trHandle.isValid() ?
238  static_cast<HLTGlobalStatus>(*trHandle) :
239  HLTGlobalStatus{});
240  ci_->eventSelected(
241  moduleDescription().moduleLabel(), ep.eventID(), trRef);
242  // ... and invoke the plugins:
243  cet::for_all(plugins_, [&e](auto& p) { p->doCollectMetadata(e); });
245  }
246  }
247 
248  void
250  {
252  }
253 
254  bool
256  {
257  FDEBUG(2) << "endSubRun called\n";
258  endSubRun(srp);
259  auto const sr = srp.makeSubRun(mc);
260  cet::for_all(plugins_, [&sr](auto& p) { p->doEndSubRun(sr); });
261  return true;
262  }
263 
264  void
266  {
267  FDEBUG(2) << "writeSubRun called\n";
268  writeSubRun(srp);
269  }
270 
271  void
273  {
274  FDEBUG(2) << "writeAuxiliaryRangeSets(rp) called\n";
276  }
277 
278  bool
280  {
281  FDEBUG(2) << "endRun called\n";
282  endRun(rp);
283  auto const r = rp.makeRun(mc);
284  cet::for_all(plugins_, [&r](auto& p) { p->doEndRun(r); });
285  return true;
286  }
287 
288  void
290  {
291  FDEBUG(2) << "writeRun called\n";
292  writeRun(rp);
293  }
294 
295  void
297  {
298  endJob();
299  cet::for_all(plugins_, [](auto& p) { p->doEndJob(); });
300  }
301 
302  bool
304  {
305  if (isFileOpen()) {
306  return false;
307  }
308  openFile(fb);
309  return true;
310  }
311 
312  void
314  {
316  unique_ptr<ResultsPrincipal> respHolder;
317  ResultsPrincipal const* respPtr = fb.resultsPrincipal();
318  if (respPtr == nullptr) {
319  respHolder = make_unique<ResultsPrincipal>(
322  nullptr);
323  respPtr = respHolder.get();
324  }
325  readResults(*respPtr);
326  }
327 
328  void
330  {
332  }
333 
334  void
336  {
338  }
339 
340  void
342  {
344  }
345 
346  bool
348  {
349  if (isFileOpen()) {
350  reallyCloseFile();
351  return true;
352  }
353  return false;
354  }
355 
356  void
358  {
360  startEndFile();
363  writeFileIndex();
371  finishEndFile();
372  branchParents_.clear();
374  }
375 
376  // Called every event (by doWriteEvent) toupdate branchParents_
377  // and branchChildren_.
378  void
380  {
381  // Note: threading: We are implicitly using the Principal
382  // iterators here which iterate over the groups held by the
383  // principal, which may be updated by a producer task in
384  // another stream while we are iterating! But only for Run,
385  // SubRun, and Results principals, in the case of Event
386  // principals we arrange that no producer or filter tasks
387  // are running when we run. So since we are only called for
388  // event principals we are safe.
389  //
390  // Note: threading: We update branchParents_ and branchChildren_
391  // here which must be protected if we become a stream or
392  // global module.
393  //
394  for (auto const& [pid, group] : ep) {
395  if (auto provenance = group->productProvenance()) {
396  auto iter = branchParents_.find(pid);
397  if (iter == branchParents_.end()) {
398  iter = branchParents_.emplace(pid, set<ParentageID>{}).first;
399  }
400  iter->second.insert(provenance->parentageID());
402  }
403  }
404  }
405 
406  // Called at file close to update branchChildren_ from the accumulated
407  // branchParents_.
408  void
410  {
411  for (auto const& [child, eIds] : branchParents_) {
412  for (auto const& eId : eIds) {
413  Parentage par;
414  if (!ParentageRegistry::get(eId, par)) {
415  continue;
416  }
417  for (auto const& p : par.parents()) {
418  branchChildren_.insertChild(p, child);
419  }
420  }
421  }
422  }
423 
424  void
426  {}
427 
428  void
430  {}
431 
432  void
434  {}
435 
436  void
438  {}
439 
440  void
442  {}
443 
444  void
446  {}
447 
448  void
450  {}
451 
452  void
454  {}
455 
456  void
458  {}
459 
460  void
462  {}
463 
464  void
466  {}
467 
468  void
470  {}
471 
472  void
474  {}
475 
476  void
478  {}
479 
480  void
482  {}
483 
484  bool
486  {
487  return true;
488  }
489 
490  void
492  {}
493 
494  void
496  {}
497 
498  void
500  {}
501 
502  void
504  {}
505 
506  void
508  {}
509 
510  void
512  {}
513 
514  void
516  {}
517 
518  void
520  {}
521 
522  void
524  {}
525 
526  void
528  {}
529 
530  namespace {
531  void
532  collectStreamSpecificMetadata(
533  vector<unique_ptr<FileCatalogMetadataPlugin>> const& plugins,
534  vector<string> const& pluginNames,
536  {
537  size_t pluginCounter = 0;
538  ostringstream errors;
539  for (auto& plugin : plugins) {
540  FileCatalogMetadata::collection_type tmp = plugin->doProduceMetadata();
541  ssmd.reserve(tmp.size() + ssmd.size());
542  for (auto&& entry : tmp) {
544  -> wantCheckSyntax()) {
545  string checkString("{ ");
546  checkString +=
547  cet::canonical_string(entry.first) + " : " + entry.second + " }";
548  boost::json::error_code ec;
550  auto const n_parsed_chars = p.write_some(checkString, ec);
551  if (ec) {
552  errors << "OutputModule::writeCatalogMetadata():" << ec.message()
553  << " in metadata produced by plugin "
554  << pluginNames[pluginCounter] << ":\n"
555  << " Faulty key/value clause:\n"
556  << checkString << '\n'
557  << (n_parsed_chars ? string(n_parsed_chars, '-') : "")
558  << "^\n";
559  }
560  }
561  ssmd.emplace_back(std::move(entry));
562  }
563  ++pluginCounter;
564  }
565  auto const errMsg = errors.str();
566  if (!errMsg.empty()) {
567  throw Exception(errors::DataCorruption) << errMsg;
568  }
569  }
570  } // namespace
571 
572  void
574  {
575  // Obtain metadata from service for output.
578  {
579  } -> getMetadata(md);
580  if (!dataTier_.empty()) {
581  md.emplace_back("data_tier", cet::canonical_string(dataTier_));
582  }
583  if (!streamName_.empty()) {
584  md.emplace_back("data_stream", cet::canonical_string(streamName_));
585  }
586  // Ask any plugins for their list of metadata, and put it in a
587  // separate list for the output module. The user stream-specific
588  // metadata should override stream-specific metadata generated by the
589  // output module iself.
591  collectStreamSpecificMetadata(plugins_, pluginNames_, ssmd);
592  doWriteFileCatalogMetadata(md, ssmd);
593  }
594 
595  void
599  {}
600 
601  void
603  {}
604 
605  void
607  {}
608 
610  OutputModule::makePlugins_(vector<ParameterSet> const& psets)
611  {
612  PluginCollection_t result;
613  result.reserve(psets.size());
614  size_t count{0};
615  try {
616  for (auto const& pset : psets) {
617  auto const& libspec =
618  pluginNames_.emplace_back(pset.get<string>("plugin_type"));
619  auto const pluginType = pluginFactory_.pluginType(libspec);
620  if (pluginType !=
622  throw Exception(errors::Configuration, "OutputModule: ")
623  << "unrecognized plugin type " << pluginType << ".\n";
624  }
625  result.emplace_back(
626  pluginFactory_.makePlugin<unique_ptr<FileCatalogMetadataPlugin>>(
627  libspec, pset));
628  ++count;
629  }
630  }
631  catch (cet::exception& e) {
632  throw Exception(errors::Configuration, "OutputModule: ", e)
633  << "Exception caught while processing FCMDPlugins[" << count
634  << "] in module " << moduleDescription().moduleLabel() << ".\n";
635  }
636  return result;
637  }
638 
639  SelectionsArray const&
641  {
642  return keptProducts_;
643  }
644 
645  bool
647  {
648  auto const bt = pd.branchType();
649  assert(groupSelector_[bt]);
650  return groupSelector_[bt]->selected(pd);
651  }
652 
653  std::array<bool, NumBranchTypes> const&
655  {
656  return hasNewlyDroppedBranch_;
657  }
658 
659  BranchChildren const&
661  {
662  return branchChildren_;
663  }
664 
665 } // namespace art
virtual void writeFileIdentifier()
TRandom r
Definition: spectrum.C:23
bool doOpenFile(FileBlock const &fb)
std::array< bool, NumBranchTypes > const & hasNewlyDroppedBranch() const
void doWriteEvent(EventPrincipal &ep, ModuleContext const &mc)
std::vector< std::pair< std::string, std::string >> collection_type
virtual void writeProcessConfigurationRegistry()
void updateBranchParents(EventPrincipal &ep)
virtual void readResults(ResultsPrincipal const &resp)
virtual void doBeginJob(detail::SharedResources const &resources)
virtual void finishEndFile()
Handle< TriggerResults > getTriggerResults(Event const &e) const
Definition: Observer.cc:75
virtual void incrementInputFileNumber()
std::string streamName_
Definition: OutputModule.h:236
virtual void event(EventPrincipal const &)
Event makeEvent(ModuleContext const &mc)
virtual void respondToCloseOutputFiles(FileBlock const &)
std::string const & moduleLabel() const
virtual void writeRun(RunPrincipal &r)=0
virtual void writeSubRun(SubRunPrincipal &sr)=0
virtual void doRegisterProducts(ProductDescriptions &, ModuleDescription const &)
std::vector< std::string > pluginNames_
Definition: OutputModule.h:241
bool doEndSubRun(SubRunPrincipal const &srp, ModuleContext const &mc)
std::vector< ProductID > const & parents() const
Definition: Parentage.cc:37
virtual void respondToOpenInputFile(FileBlock const &)
void writeFileCatalogMetadata()
auto scheduleID() const
Definition: ModuleContext.h:28
STL namespace.
void doWriteSubRun(SubRunPrincipal &srp)
Float_t tmp
Definition: plot.C:35
std::vector< BranchDescription > ProductDescriptions
void doSelectProducts(ProductTables const &)
bool doBeginSubRun(SubRunPrincipal const &srp, ModuleContext const &mc)
virtual void writeFileFormatVersion()
virtual void beginRun(RunPrincipal const &)
bool doEvent(EventPrincipal const &ep, ModuleContext const &mc, std::atomic< std::size_t > &counts_run, std::atomic< std::size_t > &counts_passed, std::atomic< std::size_t > &counts_failed)
auto const & descriptions(BranchType const bt) const
Definition: ProductTables.h:43
BranchChildren branchChildren_
Definition: OutputModule.h:233
virtual void endRun(RunPrincipal const &)
virtual bool isFileOpen() const
virtual std::string const & lastClosedFileName() const
std::vector< std::unique_ptr< FileCatalogMetadataPlugin >> PluginCollection_t
Definition: OutputModule.h:57
void insertEmpty(ProductID parent)
void doRespondToCloseInputFile(FileBlock const &fb)
decltype(auto) constexpr end(T &&obj)
ADL-aware version of std::end.
Definition: StdUtils.h:77
void selectProducts(ProductTables const &)
SharedResource_t const LegacyResource
virtual void writeFileIndex()
auto vector(Vector const &v)
Returns a manipulator which will print the specified array.
Definition: DumpUtils.h:289
virtual void respondToOpenOutputFiles(FileBlock const &)
BranchType branchType() const noexcept
GroupSelectorRules groupSelectorRules_
Definition: OutputModule.h:230
PluginCollection_t plugins_
Definition: OutputModule.h:242
virtual void setSubRunAuxiliaryRangeSetID(RangeSet const &)
OutputFileStatus
static collection_type const & get()
virtual void writeParameterSetRegistry()
std::array< bool, NumBranchTypes > hasNewlyDroppedBranch_
Definition: OutputModule.h:229
virtual void endSubRun(SubRunPrincipal const &)
bool doEndRun(RunPrincipal const &rp, ModuleContext const &mc)
Run makeRun(ModuleContext const &mc, RangeSet const &rs=RangeSet::invalid())
Definition: RunPrincipal.cc:24
std::array< std::unique_ptr< GroupSelector const >, NumBranchTypes > groupSelector_
Definition: OutputModule.h:228
virtual void setRunAuxiliaryRangeSetID(RangeSet const &)
decltype(auto) values(Coll &&coll)
Range-for loop helper iterating across the values of the specified collection.
virtual void writeParentageRegistry()
EventID const & eventID() const
virtual void endJob()
bool wantEvent(ScheduleID id, Event const &e) const
Definition: Observer.cc:63
virtual void write(EventPrincipal &e)=0
void doSetSubRunAuxiliaryRangeSetID(RangeSet const &)
SelectionsArray keptProducts_
Definition: OutputModule.h:226
ServiceHandle< CatalogInterface > ci_
Definition: OutputModule.h:237
virtual void writeProductDescriptionRegistry()
CommandLineParser * parser(0)
std::array< Selections, NumBranchTypes > SelectionsArray
Definition: Selections.h:12
virtual Granularity fileGranularity() const
OutputModule(fhicl::ParameterSet const &pset)
Definition: OutputModule.cc:74
std::map< ProductID, std::set< ParentageID > > branchParents_
Definition: OutputModule.h:232
bool selected(BranchDescription const &) const
bool fileIsOpen() const
Definition: OutputModule.cc:95
ProcessConfiguration const & processConfiguration() const
std::string configuredFileName_
Definition: OutputModule.h:234
double value
Definition: spectrum.C:18
TFile fb("Li6.root")
ResultsPrincipal const * resultsPrincipal() const
Definition: FileBlock.cc:38
bool combinable(BranchDescription const &a, BranchDescription const &b)
virtual bool requestsToCloseFile() const
std::string dataTier_
Definition: OutputModule.h:235
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
void fillDependencyGraph()
void insertChild(ProductID parent, ProductID child)
cet::BasicPluginFactory pluginFactory_
Definition: OutputModule.h:238
void createQueues(SharedResources const &resources)
Definition: SharedModule.cc:34
void doRespondToOpenOutputFiles(FileBlock const &fb)
virtual void postSelectProducts()
PluginCollection_t makePlugins_(std::vector< fhicl::ParameterSet > const &psets)
virtual void openFile(FileBlock const &)
void doWriteRun(RunPrincipal &rp)
void registerProducts(ProductDescriptions &)
SelectionsArray const & keptProducts() const
BranchType
Definition: BranchType.h:20
virtual void writeProcessHistoryRegistry()
Definition: MVAAlg.h:12
void serialize(T const &...)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
SubRun makeSubRun(ModuleContext const &mc, RangeSet const &rs=RangeSet::invalid())
virtual void writeProductDependencies()
virtual void setFileStatus(OutputFileStatus)
void for_each_branch_type(F f)
Definition: BranchType.h:38
virtual void doWriteFileCatalogMetadata(FileCatalogMetadata::collection_type const &md, FileCatalogMetadata::collection_type const &ssmd)
virtual void respondToCloseInputFile(FileBlock const &)
void doRespondToCloseOutputFiles(FileBlock const &fb)
Float_t e
Definition: plot.C:35
std::unique_ptr< Worker > doMakeWorker(WorkerParams const &wp) final
Definition: OutputModule.cc:89
BranchChildren const & branchChildren() const
virtual void beginSubRun(SubRunPrincipal const &)
virtual void beginJob()
void doRespondToOpenInputFile(FileBlock const &fb)
virtual void startEndFile()
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void doSetRunAuxiliaryRangeSetID(RangeSet const &)
ModuleDescription const & moduleDescription() const
Definition: ModuleBase.cc:13
bool doBeginRun(RunPrincipal const &rp, ModuleContext const &)