LArSoft  v07_13_02
Liquid Argon Software toolkit - http://larsoft.org/
OutputModule.cc
Go to the documentation of this file.
2 
20 #include "cetlib/canonical_string.h"
21 #include "cetlib/exempt_ptr.h"
22 #include "cetlib_except/demangle.h"
23 IGNORE_FALLTHROUGH_START
24 #include "rapidjson/document.h"
25 #include "rapidjson/error/en.h"
27 
28 #include <utility>
29 
31 using std::string;
32 using std::vector;
33 
35  ParameterSet const& containing_pset)
36  : EventObserverBase{config().eoFragment().selectEvents(), containing_pset}
37  , groupSelectorRules_{config().outputCommands(),
38  "outputCommands",
39  "OutputModule"}
40  , configuredFileName_{config().fileName()}
41  , dataTier_{config().dataTier()}
42  , streamName_{config().streamName()}
43  , plugins_{makePlugins_(containing_pset)}
44 {}
45 
47  : EventObserverBase{pset}
48  , groupSelectorRules_{pset.get<vector<string>>("outputCommands", {"keep *"}),
49  "outputCommands",
50  "OutputModule"}
51  , configuredFileName_{pset.get<string>("fileName", "")}
52  , dataTier_{pset.get<string>("dataTier", "")}
53  , streamName_{pset.get<string>("streamName", "")}
54  , plugins_{makePlugins_(pset)}
55 {}
56 
57 string const&
59 {
60  return configuredFileName_;
61 }
62 
63 void
65 {
67 }
68 
69 void
71 {
72  // These variables cause problems for MT. This function must be
73  // called in a single-threaded context.
75  std::make_unique<GroupSelector const>(groupSelectorRules_, productList);
76  keptProducts_ = {{}};
77 
78  // TODO: See if we can collapse keptProducts_ and groupSelector into
79  // a single object. See the notes in the header for GroupSelector
80  // for more information.
81 
82  for (auto const& val : productList) {
83  BranchDescription const& pd = val.second;
84  auto const bt = pd.branchType();
85  if (pd.transient() || pd.dropped()) {
86  continue;
87  }
88  if (selected(pd)) {
89  keptProducts_[bt].push_back(&pd);
90  continue;
91  }
92  // Newly dropped, skip it.
93  hasNewlyDroppedBranch_[bt] = true;
94  }
95 }
96 
97 void
99 {
100  doSelectProducts(productList);
102 }
103 
104 void
106  ProductDescriptions& producedProducts,
107  ModuleDescription const& md)
108 {
109  doRegisterProducts(mpr, producedProducts, md);
110 }
111 
112 void
114 {}
115 
116 void
119  ModuleDescription const&)
120 {}
121 
122 void
124 {
125  beginJob();
126  cet::for_all(plugins_, [](auto& p) { p->doBeginJob(); });
127 }
128 
129 bool
131  CurrentProcessingContext const* cpc)
132 {
133  detail::CPCSentry sentry{current_context_, cpc};
134  FDEBUG(2) << "beginRun called\n";
135  beginRun(rp);
137  cet::for_all(plugins_, [&r](auto& p) { p->doBeginRun(r); });
138  return true;
139 }
140 
141 bool
143  CurrentProcessingContext const* cpc)
144 {
145  detail::CPCSentry sentry{current_context_, cpc};
146  FDEBUG(2) << "beginSubRun called\n";
147  beginSubRun(srp);
149  cet::for_all(plugins_, [&sr](auto& p) { p->doBeginSubRun(sr); });
150  return true;
151 }
152 
153 bool
155  CurrentProcessingContext const* cpc,
156  CountingStatistics& counts)
157 {
158  detail::CPCSentry sentry{current_context_, cpc};
159  FDEBUG(2) << "doEvent called\n";
160  Event const e{ep, moduleDescription_, this};
161  if (wantAllEvents() || wantEvent(e)) {
162  // Run is incremented before event(ep); to properly count whenever
163  // an exception is thrown in the user's module.
164  counts.increment<stats::Run>();
165  event(ep);
166  counts.increment<stats::Passed>();
167  }
168  return true;
169 }
170 
171 void
173 {
174  detail::PVSentry clearTriggerResults{cachedProducts()};
175  FDEBUG(2) << "writeEvent called\n";
176  Event const e{ep, moduleDescription_, this};
177  if (wantAllEvents() || wantEvent(e)) {
178  write(ep); // Write the event.
179  // Declare that the event was selected for write to the catalog
180  // interface
182  auto const& trRef(trHandle.isValid() ?
183  static_cast<HLTGlobalStatus>(*trHandle) :
184  HLTGlobalStatus{});
185  ci_->eventSelected(moduleDescription_.moduleLabel(), ep.id(), trRef);
186  // ... and invoke the plugins:
187  // ... The transactional object presented to the plugins is
188  // different since the relevant context information is not the
189  // same for the consumes functionality.
191  cet::for_all(plugins_, [&we](auto& p) { p->doCollectMetadata(we); });
192  // Finish.
194  if (remainingEvents_ > 0) {
196  }
197  }
198 }
199 
200 void
202 {
204 }
205 
206 bool
208  CurrentProcessingContext const* cpc)
209 {
210  detail::CPCSentry sentry{current_context_, cpc};
211  FDEBUG(2) << "endSubRun called\n";
212  endSubRun(srp);
214  cet::for_all(plugins_, [&sr](auto& p) { p->doEndSubRun(sr); });
215  return true;
216 }
217 
218 void
220 {
221  FDEBUG(2) << "writeSubRun called\n";
222  writeSubRun(srp);
223 }
224 
225 void
227 {
228  FDEBUG(2) << "writeAuxiliaryRangeSets(rp) called\n";
230 }
231 
232 bool
234  CurrentProcessingContext const* cpc)
235 {
236  detail::CPCSentry sentry{current_context_, cpc};
237  FDEBUG(2) << "endRun called\n";
238  endRun(rp);
240  cet::for_all(plugins_, [&r](auto& p) { p->doEndRun(r); });
241  return true;
242 }
243 
244 void
246 {
247  FDEBUG(2) << "writeRun called\n";
248  writeRun(rp);
249 }
250 
251 void
253 {
254  endJob();
255  cet::for_all(plugins_, [](auto& p) { p->doEndJob(); });
256 }
257 
258 void
260 {
261  openFile(fb);
262 }
263 
264 void
266 {
268  std::unique_ptr<ResultsPrincipal> respHolder;
269  art::ResultsPrincipal const* respPtr = fb.resultsPrincipal();
270  if (respPtr == nullptr) {
271  respHolder = std::make_unique<ResultsPrincipal>(
273  respPtr = respHolder.get();
274  }
275  readResults(*respPtr);
276 }
277 
278 void
280 {
282 }
283 
284 void
286 {
288 }
289 
290 void
292 {
294 }
295 
296 void
298 {
299  if (isFileOpen()) {
300  reallyCloseFile();
301  }
302 }
303 
304 void
306 {
308  startEndFile();
311  writeFileIndex();
321  finishEndFile();
322  branchParents_.clear();
324 }
325 
326 void
328 {
329  for (auto const& groupPr : ep) {
330  auto const& group = *groupPr.second;
331  if (group.productProvenancePtr()) {
332  ProductID const pid = groupPr.first;
333  auto it = branchParents_.find(pid);
334  if (it == branchParents_.end()) {
335  it = branchParents_.emplace(pid, std::set<ParentageID>{}).first;
336  }
337  it->second.insert(group.productProvenancePtr()->parentageID());
339  }
340  }
341 }
342 
343 void
345 {
346  for (auto const& bp : branchParents_) {
347  ProductID const child = bp.first;
348  std::set<ParentageID> const& eIds = bp.second;
349  for (auto const& eId : eIds) {
350  Parentage par;
351  if (!ParentageRegistry::get(eId, par)) {
352  continue;
353  }
354  for (auto const& p : par.parents())
355  branchChildren_.insertChild(p, child);
356  }
357  }
358 }
359 
360 void
362 {}
363 
364 void
366 {}
367 
368 void
370 {}
371 
372 void
374 {}
375 
376 void
378 {}
379 
380 void
382 {}
383 
384 void
386 {}
387 
388 void
390 {}
391 
392 void
394 {}
395 
396 void
398 {}
399 
400 void
402 {}
403 
404 void
406 {}
407 
408 void
410 {}
411 
412 void
414 {}
415 
416 void
418 {}
419 
420 bool
422 {
423  return true;
424 }
425 
426 void
428 {}
429 
430 void
432 {}
433 
434 void
436 {}
437 
438 void
440 {}
441 
442 void
444 {}
445 
446 void
448 {}
449 
450 void
452 {}
453 
454 void
456 {}
457 
458 void
460 {}
461 
462 void
464 {}
465 
466 void
468 {}
469 
470 namespace {
471  void
472  collectStreamSpecificMetadata(
473  vector<std::unique_ptr<art::FileCatalogMetadataPlugin>> const& plugins,
474  vector<string> const& pluginNames,
476  {
477  std::size_t pluginCounter{0};
478  std::ostringstream errors; // Collect errors from all plugins.
479  for (auto& plugin : plugins) {
481  plugin->doProduceMetadata();
482  ssmd.reserve(tmp.size() + ssmd.size());
483  for (auto&& entry : tmp) {
485  ->wantCheckSyntax()) {
486  rapidjson::Document d;
487  string checkString("{ ");
488  checkString +=
489  cet::canonical_string(entry.first) + " : " + entry.second + " }";
490  if (d.Parse(checkString.c_str()).HasParseError()) {
491  auto const nSpaces = d.GetErrorOffset();
492  std::cerr << "nSpaces = " << nSpaces << ".\n";
493  errors << "art::OutputModule::writeCatalogMetadata():"
494  << "syntax error in metadata produced by plugin "
495  << pluginNames[pluginCounter] << ":\n"
496  << rapidjson::GetParseError_En(d.GetParseError())
497  << " Faulty key/value clause:\n"
498  << checkString << "\n"
499  << (nSpaces ? string(nSpaces, '-') : "") << "^\n";
500  }
501  }
502  ssmd.emplace_back(std::move(entry));
503  }
504  ++pluginCounter;
505  }
506  auto const errMsg = errors.str();
507  if (!errMsg.empty()) {
509  }
510  }
511 }
512 
513 void
515 {
516  // Obtain metadata from service for output.
519  ->getMetadata(md);
520  if (!dataTier_.empty()) {
521  md.emplace_back("data_tier", cet::canonical_string(dataTier_));
522  }
523  if (!streamName_.empty()) {
524  md.emplace_back("data_stream", cet::canonical_string(streamName_));
525  }
526  // Ask any plugins for their list of metadata, and put it in a
527  // separate list for the output module. The user stream-specific
528  // metadata should override stream-specific metadata generated by the
529  // output module iself.
530  collectStreamSpecificMetadata(plugins_, pluginNames_, ssmd);
531  doWriteFileCatalogMetadata(md, ssmd);
532 }
533 
534 void
538 {}
539 
540 void
542 {}
543 
544 void
546 {}
547 
548 void
550 {}
551 
552 auto
555 {
556  auto const psets = top_pset.get<vector<ParameterSet>>("FCMDPlugins", {});
557  PluginCollection_t result;
558  result.reserve(psets.size());
559  size_t count{0};
560  try {
561  for (auto const& pset : psets) {
562  pluginNames_.emplace_back(pset.get<string>("plugin_type"));
563  auto const& libspec = pluginNames_.back();
564  auto const pluginType = pluginFactory_.pluginType(libspec);
565  if (pluginType ==
567  result.emplace_back(
568  pluginFactory_.makePlugin<std::unique_ptr<FileCatalogMetadataPlugin>>(
569  libspec, pset));
570  } else {
571  throw Exception(errors::Configuration, "OutputModule: ")
572  << "unrecognized plugin type " << pluginType << ".\n";
573  }
574  ++count;
575  }
576  }
577  catch (cet::exception& e) {
578  throw Exception(errors::Configuration, "OutputModule: ", e)
579  << "Exception caught while processing FCMDPlugins[" << count
580  << "] in module " << description().moduleLabel() << ".\n";
581  }
582  return result;
583 }
virtual void doRegisterProducts(MasterProductRegistry &, ProductDescriptions &, ModuleDescription const &)
virtual void writeFileIdentifier()
bool doBeginSubRun(SubRunPrincipal const &srp, CurrentProcessingContext const *cpc)
std::unique_ptr< GroupSelector const > groupSelector_
Definition: OutputModule.h:161
std::vector< std::pair< std::string, std::string >> collection_type
virtual void writeProcessConfigurationRegistry()
virtual void readResults(ResultsPrincipal const &resp)
virtual void finishEndFile()
std::string streamName_
Definition: OutputModule.h:175
virtual void event(EventPrincipal const &)
virtual void respondToCloseOutputFiles(FileBlock const &)
virtual void writeRun(RunPrincipal &r)=0
virtual void writeSubRun(SubRunPrincipal &sr)=0
std::vector< std::string > pluginNames_
Definition: OutputModule.h:179
bool wantEvent(Event const &e)
static cet::exempt_ptr< Consumer > non_module_context()
Definition: Consumer.cc:76
virtual void respondToOpenInputFile(FileBlock const &)
void writeFileCatalogMetadata()
std::map< BranchKey, BranchDescription > ProductList
Definition: ProductList.h:15
void doWriteSubRun(SubRunPrincipal &srp)
Float_t tmp
Definition: plot.C:37
std::vector< BranchDescription > ProductDescriptions
virtual void writeFileFormatVersion()
virtual void beginRun(RunPrincipal const &)
BranchChildren branchChildren_
Definition: OutputModule.h:171
virtual void endRun(RunPrincipal const &)
virtual bool isFileOpen() const
Definition: Run.h:30
virtual std::string const & lastClosedFileName() const
Definition: OutputModule.cc:58
std::vector< std::unique_ptr< FileCatalogMetadataPlugin >> PluginCollection_t
Definition: OutputModule.h:182
void insertEmpty(ProductID parent)
void doRespondToCloseInputFile(FileBlock const &fb)
virtual void writeFileIndex()
auto vector(Vector const &v)
Returns a manipulator which will print the specified array.
Definition: DumpUtils.h:265
virtual void respondToOpenOutputFiles(FileBlock const &)
std::vector< ProductID > const & parents() const
Definition: Parentage.h:30
GroupSelectorRules groupSelectorRules_
Definition: OutputModule.h:160
PluginCollection_t plugins_
Definition: OutputModule.h:183
virtual void setSubRunAuxiliaryRangeSetID(RangeSet const &)
OutputFileStatus
static collection_type const & get()
ModuleDescription moduleDescription_
Definition: OutputModule.h:165
ResultsPrincipal const * resultsPrincipal() const
Definition: FileBlock.h:55
virtual void writeBranchMapper()
virtual void writeParameterSetRegistry()
std::array< bool, NumBranchTypes > hasNewlyDroppedBranch_
Definition: OutputModule.h:158
virtual void endSubRun(SubRunPrincipal const &)
art::Handle< art::TriggerResults > getTriggerResults(Event const &e) const
virtual void setRunAuxiliaryRangeSetID(RangeSet const &)
BranchType branchType() const
virtual void writeParentageRegistry()
virtual void endJob()
cet::exempt_ptr< CurrentProcessingContext const > current_context_
Definition: OutputModule.h:166
TFile fb("Li6.root")
PluginCollection_t makePlugins_(fhicl::ParameterSet const &top_pset)
virtual void write(EventPrincipal &e)=0
void doSetSubRunAuxiliaryRangeSetID(RangeSet const &)
Float_t d
Definition: plot.C:237
SelectionsArray keptProducts_
Definition: OutputModule.h:157
ServiceHandle< CatalogInterface > ci_
Definition: OutputModule.h:176
void doWriteEvent(EventPrincipal &ep)
virtual void writeProductDescriptionRegistry()
void registerProducts(MasterProductRegistry &, ProductDescriptions &, ModuleDescription const &)
bool doEndRun(RunPrincipal const &rp, CurrentProcessingContext const *cpc)
bool doBeginRun(RunPrincipal const &rp, CurrentProcessingContext const *cpc)
bool selected(BranchDescription const &) const
Definition: OutputModule.h:329
ModuleDescription const & description() const
Definition: OutputModule.h:311
std::string configuredFileName_
Definition: OutputModule.h:173
std::string const & moduleLabel() const
std::string dataTier_
Definition: OutputModule.h:174
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
void fillDependencyGraph()
void insertChild(ProductID parent, ProductID child)
detail::CachedProducts & cachedProducts()
cet::BasicPluginFactory pluginFactory_
Definition: OutputModule.h:178
std::string value(boost::any const &)
void doRespondToOpenOutputFiles(FileBlock const &fb)
virtual void postSelectProducts()
virtual void openFile(FileBlock const &)
void selectProducts(ProductList const &)
Definition: OutputModule.cc:98
void doWriteRun(RunPrincipal &rp)
virtual void writeEventHistory()
void doSelectProducts(ProductList const &)
Definition: OutputModule.cc:70
virtual void writeProcessHistoryRegistry()
EventID const & id() const
void doOpenFile(FileBlock const &fb)
#define FDEBUG(lev)
Definition: DebugMacros.h:26
virtual void writeProductDependencies()
virtual void setFileStatus(OutputFileStatus)
virtual void doWriteFileCatalogMetadata(FileCatalogMetadata::collection_type const &md, FileCatalogMetadata::collection_type const &ssmd)
virtual void respondToCloseInputFile(FileBlock const &)
void doRespondToCloseOutputFiles(FileBlock const &fb)
Float_t e
Definition: plot.C:34
bool doEndSubRun(SubRunPrincipal const &srp, CurrentProcessingContext const *cpc)
void updateBranchParents(EventPrincipal const &ep)
bool doEvent(EventPrincipal const &ep, CurrentProcessingContext const *cpc, CountingStatistics &)
virtual void beginSubRun(SubRunPrincipal const &)
virtual void beginJob()
void doRespondToOpenInputFile(FileBlock const &fb)
virtual void startEndFile()
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
OutputModule(OutputModule const &)=delete
BranchParents branchParents_
Definition: OutputModule.h:169
ProcessConfiguration const & processConfiguration() const
void doSetRunAuxiliaryRangeSetID(RangeSet const &)
void configure(OutputModuleDescription const &desc)
Definition: OutputModule.cc:64