LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
art::OutputWorker Class Reference

#include "OutputWorker.h"

Inheritance diagram for art::OutputWorker:
art::Worker

Public Types

enum  State {
  Ready, Pass, Fail, Working,
  ExceptionThrown
}
 

Public Member Functions

virtual ~OutputWorker ()
 
 OutputWorker (OutputModule *mod, WorkerParams const &)
 
std::string const & lastClosedFileName () const
 
void closeFile ()
 
bool fileIsOpen () const
 
void incrementInputFileNumber ()
 
bool requestsToCloseFile () const
 
void openFile (FileBlock const &fb)
 
void writeRun (RunPrincipal &rp)
 
void writeSubRun (SubRunPrincipal &srp)
 
void writeEvent (EventPrincipal &ep, PathContext const &pc)
 
void setRunAuxiliaryRangeSetID (RangeSet const &)
 
void setSubRunAuxiliaryRangeSetID (RangeSet const &)
 
void setFileStatus (OutputFileStatus)
 
Granularity fileGranularity () const
 
void selectProducts (ProductTables const &)
 
void beginJob (detail::SharedResources const &)
 
void endJob ()
 
void respondToOpenInputFile (FileBlock const &fb)
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenOutputFiles (FileBlock const &fb)
 
void respondToCloseOutputFiles (FileBlock const &fb)
 
void doWork (Transition, Principal &, ModuleContext const &)
 
void doWork_event (hep::concurrency::WaitingTaskPtr workerInPathDoneTask, EventPrincipal &, ModuleContext const &)
 
void doWork_event (EventPrincipal &, ModuleContext const &)
 
ScheduleID scheduleID () const
 
bool returnCode () const
 
ModuleDescription const & description () const
 
hep::concurrency::SerialTaskQueueChain * serialTaskQueueChain () const
 
void reset ()
 
std::size_t timesVisited () const
 
std::size_t timesRun () const
 
std::size_t timesPassed () const
 
std::size_t timesFailed () const
 
std::size_t timesExcept () const
 
void runWorker (EventPrincipal &, ModuleContext const &)
 
bool isUnique () const
 

Protected Member Functions

std::string const & label () const
 

Protected Attributes

std::atomic< std::size_t > counts_visited_ {}
 
std::atomic< std::size_t > counts_run_ {}
 
std::atomic< std::size_t > counts_passed_ {}
 
std::atomic< std::size_t > counts_failed_ {}
 
std::atomic< std::size_t > counts_thrown_ {}
 

Private Member Functions

hep::concurrency::SerialTaskQueueChain * doSerialTaskQueueChain () const override
 
void doBeginJob (detail::SharedResources const &) override
 
void doEndJob () override
 
void doRespondToOpenInputFile (FileBlock const &) override
 
void doRespondToCloseInputFile (FileBlock const &) override
 
void doRespondToOpenOutputFiles (FileBlock const &) override
 
void doRespondToCloseOutputFiles (FileBlock const &) override
 
void doBegin (RunPrincipal &, ModuleContext const &) override
 
void doEnd (RunPrincipal &, ModuleContext const &) override
 
void doBegin (SubRunPrincipal &, ModuleContext const &) override
 
void doEnd (SubRunPrincipal &, ModuleContext const &) override
 
bool doProcess (EventPrincipal &, ModuleContext const &) override
 

Private Attributes

cet::exempt_ptr< OutputModulemodule_
 
ServiceHandle< CatalogInterfaceci_ {}
 
ActivityRegistry const & actReg_
 
Granularity fileGranularity_ {Granularity::Unset}
 

Detailed Description

Definition at line 25 of file OutputWorker.h.

Member Enumeration Documentation

enum art::Worker::State
inherited
Enumerator
Ready 
Pass 
Fail 
Working 
ExceptionThrown 

Definition at line 49 of file Worker.h.

Constructor & Destructor Documentation

art::OutputWorker::~OutputWorker ( )
virtualdefault
art::OutputWorker::OutputWorker ( OutputModule mod,
WorkerParams const &  wp 
)

Definition at line 18 of file OutputWorker.cc.

References actReg_, ci_, art::Worker::description(), art::ScheduleID::first(), fhicl::ParameterSetRegistry::get(), art::Worker::label(), art::module, module_, and art::ModuleBase::moduleDescription().

19  : Worker{module->moduleDescription(), wp}
20  , module_{module}
21  , actReg_{wp.actReg_}
22  {
23  if (wp.scheduleID_ == ScheduleID::first()) {
24  // We only want to register the products (and any shared
25  // resources) once, not once for every schedule)
26  module_->registerProducts(wp.producedProducts_);
27  wp.resources_.registerSharedResources(module_->sharedResources());
28  }
29  ci_->outputModuleInitiated(
30  label(),
31  fhicl::ParameterSetRegistry::get(description().parameterSetID()));
32  }
static collection_type const & get() noexcept
std::string const & label() const
Definition: Worker.cc:127
static constexpr ScheduleID first()
Definition: ScheduleID.h:50
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
ActivityRegistry const & actReg_
Definition: OutputWorker.h:65
Worker(ModuleDescription const &, WorkerParams const &)
Definition: Worker.cc:108
ModuleDescription const & description() const
Definition: Worker.cc:121
ServiceHandle< CatalogInterface > ci_
Definition: OutputWorker.h:64

Member Function Documentation

void art::Worker::beginJob ( detail::SharedResources const &  resources)
inherited

Definition at line 196 of file Worker.cc.

References art::Worker::actReg_, art::Worker::doBeginJob(), art::Worker::md_, art::ActivityRegistry::sPostModuleBeginJob, and art::ActivityRegistry::sPreModuleBeginJob.

197  {
199  doBeginJob(resources);
201  }
202  catch (...) {
203  rethrow_with_context(std::current_exception(), md_, "beginJob");
204  }
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleBeginJob
virtual void doBeginJob(detail::SharedResources const &resources)=0
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleBeginJob
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
void art::OutputWorker::closeFile ( )

Definition at line 116 of file OutputWorker.cc.

References actReg_, ci_, art::Worker::label(), lastClosedFileName(), module_, art::ActivityRegistry::sPostCloseOutputFile, and art::ActivityRegistry::sPreCloseOutputFile.

117  {
119  if (module_->doCloseFile()) {
120  ci_->outputFileClosed(label(), lastClosedFileName());
121  }
123  OutputFileInfo{label(), lastClosedFileName()});
124  }
std::string const & label() const
Definition: Worker.cc:127
std::string const & lastClosedFileName() const
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
ActivityRegistry const & actReg_
Definition: OutputWorker.h:65
GlobalSignal< detail::SignalResponseType::FIFO, void(std::string const &)> sPreCloseOutputFile
GlobalSignal< detail::SignalResponseType::LIFO, void(OutputFileInfo const &)> sPostCloseOutputFile
ServiceHandle< CatalogInterface > ci_
Definition: OutputWorker.h:64
ModuleDescription const & art::Worker::description ( ) const
inherited

Definition at line 121 of file Worker.cc.

References art::Worker::md_.

Referenced by OutputWorker(), and writeEvent().

122  {
123  return md_;
124  }
ModuleDescription const md_
Definition: Worker.h:121
void art::OutputWorker::doBegin ( RunPrincipal rp,
ModuleContext const &  mc 
)
overrideprivatevirtual

Implements art::Worker.

Definition at line 77 of file OutputWorker.cc.

References module_.

78  {
79  module_->doBeginRun(rp, mc);
80  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
void art::OutputWorker::doBegin ( SubRunPrincipal srp,
ModuleContext const &  mc 
)
overrideprivatevirtual

Implements art::Worker.

Definition at line 89 of file OutputWorker.cc.

References module_.

90  {
91  module_->doBeginSubRun(srp, mc);
92  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
void art::OutputWorker::doBeginJob ( detail::SharedResources const &  resources)
overrideprivatevirtual

Implements art::Worker.

Definition at line 41 of file OutputWorker.cc.

References module_.

42  {
43  module_->doBeginJob(resources);
44  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
void art::OutputWorker::doEnd ( RunPrincipal rp,
ModuleContext const &  mc 
)
overrideprivatevirtual

Implements art::Worker.

Definition at line 83 of file OutputWorker.cc.

References module_.

84  {
85  module_->doEndRun(rp, mc);
86  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
void art::OutputWorker::doEnd ( SubRunPrincipal srp,
ModuleContext const &  mc 
)
overrideprivatevirtual

Implements art::Worker.

Definition at line 95 of file OutputWorker.cc.

References module_.

96  {
97  module_->doEndSubRun(srp, mc);
98  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
void art::OutputWorker::doEndJob ( )
overrideprivatevirtual

Implements art::Worker.

Definition at line 47 of file OutputWorker.cc.

References module_.

48  {
49  module_->doEndJob();
50  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
bool art::OutputWorker::doProcess ( EventPrincipal ep,
ModuleContext const &  mc 
)
overrideprivatevirtual

Implements art::Worker.

Definition at line 101 of file OutputWorker.cc.

References art::Worker::counts_failed_, art::Worker::counts_passed_, art::Worker::counts_run_, and module_.

102  {
103  // Note, only filters ever return false, and when they do it means
104  // they have rejected.
105  return module_->doEvent(
107  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
std::atomic< std::size_t > counts_run_
Definition: Worker.h:99
std::atomic< std::size_t > counts_passed_
Definition: Worker.h:100
std::atomic< std::size_t > counts_failed_
Definition: Worker.h:101
void art::OutputWorker::doRespondToCloseInputFile ( FileBlock const &  fb)
overrideprivatevirtual

Implements art::Worker.

Definition at line 59 of file OutputWorker.cc.

References module_.

60  {
61  module_->doRespondToCloseInputFile(fb);
62  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
TFile fb("Li6.root")
void art::OutputWorker::doRespondToCloseOutputFiles ( FileBlock const &  fb)
overrideprivatevirtual

Implements art::Worker.

Definition at line 71 of file OutputWorker.cc.

References module_.

72  {
73  module_->doRespondToCloseOutputFiles(fb);
74  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
TFile fb("Li6.root")
void art::OutputWorker::doRespondToOpenInputFile ( FileBlock const &  fb)
overrideprivatevirtual

Implements art::Worker.

Definition at line 53 of file OutputWorker.cc.

References module_.

54  {
55  module_->doRespondToOpenInputFile(fb);
56  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
TFile fb("Li6.root")
void art::OutputWorker::doRespondToOpenOutputFiles ( FileBlock const &  fb)
overrideprivatevirtual

Implements art::Worker.

Definition at line 65 of file OutputWorker.cc.

References module_.

66  {
67  module_->doRespondToOpenOutputFiles(fb);
68  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
TFile fb("Li6.root")
hep::concurrency::SerialTaskQueueChain * art::OutputWorker::doSerialTaskQueueChain ( ) const
overrideprivatevirtual

Implements art::Worker.

Definition at line 35 of file OutputWorker.cc.

References module_.

36  {
37  return module_->serialTaskQueueChain();
38  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
void art::Worker::doWork ( Transition  trans,
Principal principal,
ModuleContext const &  mc 
)
inherited

Definition at line 250 of file Worker.cc.

References art::Worker::actReg_, art::errors::BadAlloc, art::errors::BadExceptionType, art::BeginRun, art::BeginSubRun, art::Worker::cached_exception_, art::Worker::doBegin(), art::Worker::doEnd(), art::EndRun, art::EndSubRun, art::Worker::ExceptionThrown, art::Worker::Fail, art::Worker::md_, art::errors::OtherArt, art::Worker::Pass, art::Worker::Ready, art::errors::ScheduleExecutionFailure, art::ActivityRegistry::sPostModuleBeginRun, art::ActivityRegistry::sPostModuleBeginSubRun, art::ActivityRegistry::sPostModuleEndRun, art::ActivityRegistry::sPostModuleEndSubRun, art::ActivityRegistry::sPreModuleBeginRun, art::ActivityRegistry::sPreModuleBeginSubRun, art::ActivityRegistry::sPreModuleEndRun, art::ActivityRegistry::sPreModuleEndSubRun, art::Worker::state_, art::errors::StdException, art::errors::Unknown, and art::Worker::Working.

253  {
254  switch (state_.load()) {
255  case Ready:
256  break;
257  case Pass:
258  case Fail:
259  return;
260  case ExceptionThrown: {
261  // Rethrow the cached exception again. It seems impossible to
262  // get here a second time unless a cet::exception has been
263  // thrown previously.
264  mf::LogWarning("repeat") << "A module has been invoked a second time "
265  "even though it caught an exception during "
266  "the previous invocation.\nThis may be an "
267  "indication of a configuration problem.";
268  rethrow_exception(cached_exception_);
269  }
270  case Working:
271  break; // See below.
272  }
273 
274  try {
275  if (state_.load() == Working) {
276  // Not part of the switch statement above because we want the
277  // exception to be caught by our handling mechanism.
279  << "A Module has been invoked while it is still being executed.\n"
280  << "Product dependencies have invoked a module execution cycle.\n";
281  }
282  state_ = Working;
283  if (trans == Transition::BeginRun) {
284  actReg_.sPreModuleBeginRun.invoke(mc);
285  doBegin(dynamic_cast<RunPrincipal&>(principal), mc);
286  actReg_.sPostModuleBeginRun.invoke(mc);
287  } else if (trans == Transition::EndRun) {
288  actReg_.sPreModuleEndRun.invoke(mc);
289  doEnd(dynamic_cast<RunPrincipal&>(principal), mc);
290  actReg_.sPostModuleEndRun.invoke(mc);
291  } else if (trans == Transition::BeginSubRun) {
292  actReg_.sPreModuleBeginSubRun.invoke(mc);
293  doBegin(dynamic_cast<SubRunPrincipal&>(principal), mc);
294  actReg_.sPostModuleBeginSubRun.invoke(mc);
295  } else if (trans == Transition::EndSubRun) {
296  actReg_.sPreModuleEndSubRun.invoke(mc);
297  doEnd(dynamic_cast<SubRunPrincipal&>(principal), mc);
298  actReg_.sPostModuleEndSubRun.invoke(mc);
299  }
300  state_ = Pass;
301  }
302  catch (cet::exception& e) {
304  e << "The above exception was thrown while processing module "
305  << brief_context(md_, principal) << '\n';
306  if (auto edmEx = dynamic_cast<Exception*>(&e)) {
307  cached_exception_ = std::make_exception_ptr(*edmEx);
308  } else {
309  auto art_ex = Exception{errors::OtherArt, std::string(), e};
310  cached_exception_ = std::make_exception_ptr(art_ex);
311  }
312  throw;
313  }
314  catch (std::bad_alloc const& bda) {
316  auto art_ex =
318  << "A bad_alloc exception occurred during a call to the module "
319  << brief_context(md_, principal) << '\n'
320  << "The job has probably exhausted the virtual memory available to the "
321  "process.\n";
322  cached_exception_ = make_exception_ptr(art_ex);
323  rethrow_exception(cached_exception_);
324  }
325  catch (std::exception const& e) {
327  auto art_ex = Exception{errors::StdException}
328  << "An exception occurred during a call to the module "
329  << brief_context(md_, principal) << e.what();
330  cached_exception_ = make_exception_ptr(art_ex);
331  rethrow_exception(cached_exception_);
332  }
333  catch (std::string const& s) {
335  auto art_ex = Exception{errors::BadExceptionType, "string"}
336  << "A string thrown as an exception occurred during a call "
337  "to the module "
338  << brief_context(md_, principal) << '\n'
339  << s << '\n';
340  cached_exception_ = make_exception_ptr(art_ex);
341  rethrow_exception(cached_exception_);
342  }
343  catch (char const* c) {
345  auto art_ex = Exception{errors::BadExceptionType, "char const*"}
346  << "A char const* thrown as an exception occurred during a "
347  "call to the module "
348  << brief_context(md_, principal) << '\n'
349  << c << '\n';
350  cached_exception_ = make_exception_ptr(art_ex);
351  rethrow_exception(cached_exception_);
352  }
353  catch (...) {
355  auto art_ex =
356  Exception{errors::Unknown, "repeated"}
357  << "An unknown occurred during a previous call to the module "
358  << brief_context(md_, principal) << '\n';
359  cached_exception_ = make_exception_ptr(art_ex);
360  rethrow_exception(cached_exception_);
361  }
362  }
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleBeginRun
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleEndRun
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleEndRun
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleEndSubRun
std::atomic< int > state_
Definition: Worker.h:124
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleBeginRun
virtual void doBegin(RunPrincipal &rp, ModuleContext const &mc)=0
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
virtual void doEnd(RunPrincipal &rp, ModuleContext const &mc)=0
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleEndSubRun
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleBeginSubRun
Float_t e
Definition: plot.C:35
std::exception_ptr cached_exception_
Definition: Worker.h:134
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleBeginSubRun
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::Worker::doWork_event ( hep::concurrency::WaitingTaskPtr  workerInPathDoneTask,
EventPrincipal ,
ModuleContext const &   
)
inherited

Referenced by art::Worker::isUnique().

void art::Worker::doWork_event ( EventPrincipal p,
ModuleContext const &  mc 
)
inherited

Definition at line 366 of file Worker.cc.

References art::Worker::actions_, art::Worker::actReg_, art::errors::BadAlloc, art::errors::BadExceptionType, art::Worker::cached_exception_, art::Worker::counts_failed_, art::Worker::counts_passed_, art::Worker::counts_thrown_, art::Worker::counts_visited_, art::Worker::doProcess(), art::Worker::ExceptionThrown, art::Worker::Fail, art::actions::FailModule, art::ActionTable::find(), art::actions::IgnoreCompletely, art::Worker::md_, art::errors::OtherArt, art::Worker::Pass, art::Worker::returnCode_, art::ActivityRegistry::sPostModule, art::ActivityRegistry::sPreModule, art::Worker::state_, art::errors::StdException, art::errors::Unknown, and art::Worker::Working.

367  {
368  ++counts_visited_;
369  returnCode_ = false;
370  // Transition from Ready state to Working state.
371  state_ = Working;
372  actReg_.sPreModule.invoke(mc);
373  // Note: Only the TriggerResults inserter--a producer--calls this
374  // function. The return code is thus always true.
375  returnCode_ = doProcess(p, mc);
376  actReg_.sPostModule.invoke(mc);
377  assert(returnCode_.load());
378  state_ = Pass;
379  }
380  catch (cet::exception& e) {
381  auto action = actions_.find(e.root_cause());
382  assert(not mc.onEndPath());
383  if (action == actions::IgnoreCompletely) {
384  state_ = Pass;
385  returnCode_ = true;
386  ++counts_passed_;
387  mf::LogWarning("IgnoreCompletely") << "Module ignored an exception\n"
388  << e.what();
389  // WARNING: We will continue execution below!!!
390  } else if (action == actions::FailModule) {
391  state_ = Fail;
392  returnCode_ = true;
393  ++counts_failed_;
394  mf::LogWarning("FailModule") << "Module failed due to an exception\n"
395  << e.what();
396  // WARNING: We will continue execution below!!!
397  } else {
399  ++counts_thrown_;
400  e << "The above exception was thrown while processing module "
401  << brief_context(md_, p) << '\n';
402  if (auto edmEx = dynamic_cast<Exception*>(&e)) {
403  cached_exception_ = make_exception_ptr(*edmEx);
404  } else {
406  make_exception_ptr(Exception{errors::OtherArt, string(), e});
407  }
408  rethrow_exception(cached_exception_);
409  }
410  }
411  catch (bad_alloc const& bda) {
413  ++counts_thrown_;
414  auto art_ex =
416  << "A bad_alloc exception occurred during a call to the module "
417  << brief_context(md_, p) << '\n'
418  << "The job has probably exhausted the virtual memory available to the "
419  "process.\n";
420  cached_exception_ = make_exception_ptr(art_ex);
421  rethrow_exception(cached_exception_);
422  }
423  catch (exception const& e) {
425  ++counts_thrown_;
426  auto art_ex = Exception{errors::StdException}
427  << "An exception occurred during a call to the module "
428  << brief_context(md_, p) << '\n'
429  << e.what();
430  cached_exception_ = make_exception_ptr(art_ex);
431  rethrow_exception(cached_exception_);
432  }
433  catch (string const& s) {
435  ++counts_thrown_;
436  auto art_ex = Exception{errors::BadExceptionType, "string"}
437  << "A string thrown as an exception occurred during a call "
438  "to the module "
439  << brief_context(md_, p) << '\n'
440  << s << '\n';
441  cached_exception_ = make_exception_ptr(art_ex);
442  rethrow_exception(cached_exception_);
443  }
444  catch (char const* c) {
446  ++counts_thrown_;
447  auto art_ex = Exception{errors::BadExceptionType, "char const*"}
448  << "A char const* thrown as an exception occurred during a "
449  "call to the module "
450  << brief_context(md_, p) << '\n'
451  << c << '\n';
452  cached_exception_ = make_exception_ptr(art_ex);
453  rethrow_exception(cached_exception_);
454  }
455  catch (...) {
456  ++counts_thrown_;
458  auto art_ex = Exception{errors::Unknown, "repeated"}
459  << "An unknown occurred during a previous call to the module "
460  << brief_context(md_, p) << '\n';
461  cached_exception_ = make_exception_ptr(art_ex);
462  rethrow_exception(cached_exception_);
463  }
actions::ActionCodes find(std::string const &category) const
Definition: Actions.cc:58
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModule
ActionTable const & actions_
Definition: Worker.h:122
std::atomic< std::size_t > counts_visited_
Definition: Worker.h:98
std::atomic< int > state_
Definition: Worker.h:124
virtual bool doProcess(EventPrincipal &, ModuleContext const &)=0
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
std::atomic< bool > returnCode_
Definition: Worker.h:137
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModule
std::atomic< std::size_t > counts_thrown_
Definition: Worker.h:102
std::atomic< std::size_t > counts_passed_
Definition: Worker.h:100
std::atomic< std::size_t > counts_failed_
Definition: Worker.h:101
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
Float_t e
Definition: plot.C:35
std::exception_ptr cached_exception_
Definition: Worker.h:134
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::Worker::endJob ( )
inherited

Definition at line 207 of file Worker.cc.

References art::Worker::actReg_, art::Worker::doEndJob(), art::Worker::md_, art::ActivityRegistry::sPostModuleEndJob, and art::ActivityRegistry::sPreModuleEndJob.

208  {
209  actReg_.sPreModuleEndJob.invoke(md_);
210  doEndJob();
211  actReg_.sPostModuleEndJob.invoke(md_);
212  }
213  catch (...) {
214  rethrow_with_context(std::current_exception(), md_, "endJob");
215  }
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
virtual void doEndJob()=0
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleEndJob
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleEndJob
Granularity art::OutputWorker::fileGranularity ( ) const

Definition at line 199 of file OutputWorker.cc.

References module_.

200  {
201  return module_->fileGranularity();
202  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
bool art::OutputWorker::fileIsOpen ( ) const

Definition at line 181 of file OutputWorker.cc.

References module_.

182  {
183  return module_->fileIsOpen();
184  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
void art::OutputWorker::incrementInputFileNumber ( )

Definition at line 127 of file OutputWorker.cc.

References module_.

128  {
129  return module_->incrementInputFileNumber();
130  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
string const & art::Worker::label ( ) const
protectedinherited

Definition at line 127 of file Worker.cc.

References art::Worker::md_, and art::ModuleDescription::moduleLabel().

Referenced by closeFile(), openFile(), and OutputWorker().

128  {
129  return md_.moduleLabel();
130  }
std::string const & moduleLabel() const
ModuleDescription const md_
Definition: Worker.h:121
std::string const & art::OutputWorker::lastClosedFileName ( ) const

Definition at line 110 of file OutputWorker.cc.

References module_.

Referenced by closeFile().

111  {
112  return module_->lastClosedFileName();
113  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
void art::OutputWorker::openFile ( FileBlock const &  fb)

Definition at line 139 of file OutputWorker.cc.

References actReg_, ci_, art::Worker::label(), module_, and art::ActivityRegistry::sPostOpenOutputFile.

140  {
141  if (module_->doOpenFile(fb)) {
142  ci_->outputFileOpened(label());
144  }
145  }
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &)> sPostOpenOutputFile
std::string const & label() const
Definition: Worker.cc:127
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
ActivityRegistry const & actReg_
Definition: OutputWorker.h:65
TFile fb("Li6.root")
ServiceHandle< CatalogInterface > ci_
Definition: OutputWorker.h:64
bool art::OutputWorker::requestsToCloseFile ( ) const

Definition at line 133 of file OutputWorker.cc.

References module_.

134  {
135  return module_->requestsToCloseFile();
136  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
void art::Worker::reset ( )
inherited

Definition at line 149 of file Worker.cc.

References art::Worker::cached_exception_, art::Worker::Ready, art::Worker::returnCode_, art::Worker::scheduleID_, art::Worker::state_, TDEBUG_FUNC_SI, art::Worker::waitingTasks_, and art::Worker::workStarted_.

150  {
151  state_ = Ready;
152  cached_exception_ = std::exception_ptr{};
154  << hex << this << dec << " Resetting waitingTasks_";
155  waitingTasks_.reset();
156  workStarted_ = false;
157  returnCode_ = false;
158  }
std::atomic< int > state_
Definition: Worker.h:124
#define TDEBUG_FUNC_SI(LEVEL, SI)
ScheduleID const scheduleID_
Definition: Worker.h:120
std::atomic< bool > returnCode_
Definition: Worker.h:137
std::exception_ptr cached_exception_
Definition: Worker.h:134
hep::concurrency::WaitingTaskList waitingTasks_
Definition: Worker.h:143
std::atomic< bool > workStarted_
Definition: Worker.h:136
void art::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inherited

Definition at line 226 of file Worker.cc.

References art::Worker::actReg_, art::Worker::doRespondToCloseInputFile(), art::Worker::md_, art::ActivityRegistry::sPostModuleRespondToCloseInputFile, and art::ActivityRegistry::sPreModuleRespondToCloseInputFile.

227  {
231  }
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToCloseInputFile
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
virtual void doRespondToCloseInputFile(FileBlock const &fb)=0
TFile fb("Li6.root")
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToCloseInputFile
void art::Worker::respondToCloseOutputFiles ( FileBlock const &  fb)
inherited

Definition at line 242 of file Worker.cc.

References art::Worker::actReg_, art::Worker::doRespondToCloseOutputFiles(), art::Worker::md_, art::ActivityRegistry::sPostModuleRespondToCloseOutputFiles, and art::ActivityRegistry::sPreModuleRespondToCloseOutputFiles.

243  {
247  }
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToCloseOutputFiles
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
TFile fb("Li6.root")
virtual void doRespondToCloseOutputFiles(FileBlock const &fb)=0
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToCloseOutputFiles
void art::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inherited

Definition at line 218 of file Worker.cc.

References art::Worker::actReg_, art::Worker::doRespondToOpenInputFile(), art::Worker::md_, art::ActivityRegistry::sPostModuleRespondToOpenInputFile, and art::ActivityRegistry::sPreModuleRespondToOpenInputFile.

219  {
223  }
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToOpenInputFile
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToOpenInputFile
TFile fb("Li6.root")
virtual void doRespondToOpenInputFile(FileBlock const &fb)=0
void art::Worker::respondToOpenOutputFiles ( FileBlock const &  fb)
inherited

Definition at line 234 of file Worker.cc.

References art::Worker::actReg_, art::Worker::doRespondToOpenOutputFiles(), art::Worker::md_, art::ActivityRegistry::sPostModuleRespondToOpenOutputFiles, and art::ActivityRegistry::sPreModuleRespondToOpenOutputFiles.

235  {
239  }
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
TFile fb("Li6.root")
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToOpenOutputFiles
virtual void doRespondToOpenOutputFiles(FileBlock const &fb)=0
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToOpenOutputFiles
bool art::Worker::returnCode ( ) const
inherited

Definition at line 134 of file Worker.cc.

References art::Worker::returnCode_.

135  {
136  return returnCode_.load();
137  }
std::atomic< bool > returnCode_
Definition: Worker.h:137
void art::Worker::runWorker ( EventPrincipal p,
ModuleContext const &  mc 
)
inherited

Definition at line 466 of file Worker.cc.

References art::Worker::actions_, art::Worker::actReg_, art::errors::BadAlloc, art::errors::BadExceptionType, art::Worker::cached_exception_, art::Worker::counts_failed_, art::Worker::counts_passed_, art::Worker::counts_thrown_, art::Worker::doProcess(), art::Worker::ExceptionThrown, art::Worker::Fail, art::actions::FailModule, art::actions::FailPath, art::ActionTable::find(), art::actions::IgnoreCompletely, art::Worker::md_, art::ModuleContext::onEndPath(), art::errors::OtherArt, art::Worker::Pass, art::Worker::returnCode_, art::ModuleContext::scheduleID(), art::actions::SkipEvent, art::ActivityRegistry::sPostModule, art::ActivityRegistry::sPreModule, art::Worker::state_, art::errors::StdException, TDEBUG_BEGIN_TASK_SI, TDEBUG_END_TASK_SI, art::errors::Unknown, art::Worker::waitingTasks_, and art::Worker::Working.

Referenced by art::Worker::isUnique().

467  {
468  auto const sid = mc.scheduleID();
469  TDEBUG_BEGIN_TASK_SI(4, sid);
470  returnCode_ = false;
471  try {
472  // Transition from Ready state to Working state.
473  state_ = Working;
474  actReg_.sPreModule.invoke(mc);
475  // Note: Only filters ever return false, and when they do it
476  // means they have rejected.
477  returnCode_ = doProcess(p, mc);
478  actReg_.sPostModule.invoke(mc);
479  state_ = Fail;
480  if (returnCode_.load()) {
481  state_ = Pass;
482  }
483  }
484  catch (cet::exception& e) {
485  auto action = actions_.find(e.root_cause());
486  // If we are processing an endPath, treat SkipEvent or FailPath
487  // as FailModule, so any subsequent OutputModules are still run.
488  if (mc.onEndPath()) {
489  if ((action == actions::SkipEvent) || (action == actions::FailPath)) {
490  action = actions::FailModule;
491  }
492  }
493  if (action == actions::IgnoreCompletely) {
494  state_ = Pass;
495  returnCode_ = true;
496  ++counts_passed_;
497  mf::LogWarning("IgnoreCompletely") << "Module ignored an exception\n"
498  << e.what();
499  // WARNING: We will continue execution below!!!
500  } else if (action == actions::FailModule) {
501  state_ = Fail;
502  returnCode_ = true;
503  ++counts_failed_;
504  mf::LogWarning("FailModule") << "Module failed due to an exception\n"
505  << e.what();
506  // WARNING: We will continue execution below!!!
507  } else {
509  ++counts_thrown_;
510  e << "The above exception was thrown while processing module "
511  << brief_context(md_, p);
512  if (auto art_ex = dynamic_cast<Exception*>(&e)) {
513  cached_exception_ = make_exception_ptr(*art_ex);
514  } else {
516  make_exception_ptr(Exception{errors::OtherArt, string(), e});
517  }
518  waitingTasks_.doneWaiting(cached_exception_);
519  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
520  return;
521  }
522  }
523  catch (bad_alloc const& bda) {
525  ++counts_thrown_;
526  auto art_ex =
528  << "A bad_alloc exception was thrown while processing module "
529  << brief_context(md_, p) << '\n'
530  << "The job has probably exhausted the virtual memory available to the "
531  "process.\n";
532  cached_exception_ = make_exception_ptr(art_ex);
533  waitingTasks_.doneWaiting(cached_exception_);
534  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
535  return;
536  }
537  catch (exception const& e) {
539  ++counts_thrown_;
540  auto art_ex = Exception{errors::StdException}
541  << "An exception was thrown while processing module "
542  << brief_context(md_, p) << '\n'
543  << e.what();
544  cached_exception_ = make_exception_ptr(art_ex);
545  waitingTasks_.doneWaiting(cached_exception_);
546  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
547  return;
548  }
549  catch (string const& s) {
551  ++counts_thrown_;
552  auto art_ex =
554  << "A string was thrown as an exception while processing module "
555  << brief_context(md_, p) << '\n'
556  << s << '\n';
557  cached_exception_ = make_exception_ptr(art_ex);
558  waitingTasks_.doneWaiting(cached_exception_);
559  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
560  return;
561  }
562  catch (char const* c) {
564  ++counts_thrown_;
565  auto art_ex =
566  Exception{errors::BadExceptionType, "char const*"}
567  << "A char const* was thrown as an exception while processing module "
568  << brief_context(md_, p) << '\n'
569  << c << '\n';
570  cached_exception_ = make_exception_ptr(art_ex);
571  waitingTasks_.doneWaiting(cached_exception_);
572  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
573  return;
574  }
575  catch (...) {
576  ++counts_thrown_;
578  auto art_ex =
579  Exception{errors::Unknown, "repeated"}
580  << "An unknown exception was thrown while processing module "
581  << brief_context(md_, p) << '\n';
582  cached_exception_ = make_exception_ptr(art_ex);
583  waitingTasks_.doneWaiting(cached_exception_);
584  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
585  return;
586  }
587  waitingTasks_.doneWaiting(exception_ptr{});
588  TDEBUG_END_TASK_SI(4, sid);
589  }
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
actions::ActionCodes find(std::string const &category) const
Definition: Actions.cc:58
#define TDEBUG_END_TASK_SI(LEVEL, SI)
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModule
ActionTable const & actions_
Definition: Worker.h:122
std::atomic< int > state_
Definition: Worker.h:124
virtual bool doProcess(EventPrincipal &, ModuleContext const &)=0
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
std::atomic< bool > returnCode_
Definition: Worker.h:137
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModule
std::atomic< std::size_t > counts_thrown_
Definition: Worker.h:102
std::atomic< std::size_t > counts_passed_
Definition: Worker.h:100
std::atomic< std::size_t > counts_failed_
Definition: Worker.h:101
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
Float_t e
Definition: plot.C:35
std::exception_ptr cached_exception_
Definition: Worker.h:134
hep::concurrency::WaitingTaskList waitingTasks_
Definition: Worker.h:143
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
ScheduleID art::Worker::scheduleID ( ) const
inlineinherited

Definition at line 70 of file Worker.h.

71  {
72  return scheduleID_;
73  }
ScheduleID const scheduleID_
Definition: Worker.h:120
void art::OutputWorker::selectProducts ( ProductTables const &  tables)

Definition at line 193 of file OutputWorker.cc.

References module_.

194  {
195  module_->selectProducts(tables);
196  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
SerialTaskQueueChain * art::Worker::serialTaskQueueChain ( ) const
inherited

Definition at line 140 of file Worker.cc.

References art::Worker::doSerialTaskQueueChain().

Referenced by art::Worker::isUnique().

141  {
142  return doSerialTaskQueueChain();
143  }
virtual hep::concurrency::SerialTaskQueueChain * doSerialTaskQueueChain() const =0
void art::OutputWorker::setFileStatus ( OutputFileStatus  ofs)

Definition at line 187 of file OutputWorker.cc.

References module_.

188  {
189  return module_->setFileStatus(ofs);
190  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
void art::OutputWorker::setRunAuxiliaryRangeSetID ( RangeSet const &  rs)

Definition at line 169 of file OutputWorker.cc.

References module_.

170  {
171  module_->doSetRunAuxiliaryRangeSetID(rs);
172  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
void art::OutputWorker::setSubRunAuxiliaryRangeSetID ( RangeSet const &  rs)

Definition at line 175 of file OutputWorker.cc.

References module_.

176  {
177  module_->doSetSubRunAuxiliaryRangeSetID(rs);
178  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
size_t art::Worker::timesExcept ( ) const
inherited

Definition at line 190 of file Worker.cc.

References art::Worker::counts_thrown_.

191  {
192  return counts_thrown_.load();
193  }
std::atomic< std::size_t > counts_thrown_
Definition: Worker.h:102
size_t art::Worker::timesFailed ( ) const
inherited

Definition at line 183 of file Worker.cc.

References art::Worker::counts_failed_.

184  {
185  return counts_failed_.load();
186  }
std::atomic< std::size_t > counts_failed_
Definition: Worker.h:101
size_t art::Worker::timesPassed ( ) const
inherited

Definition at line 176 of file Worker.cc.

References art::Worker::counts_passed_.

177  {
178  return counts_passed_.load();
179  }
std::atomic< std::size_t > counts_passed_
Definition: Worker.h:100
size_t art::Worker::timesRun ( ) const
inherited

Definition at line 169 of file Worker.cc.

References art::Worker::counts_run_.

170  {
171  return counts_run_.load();
172  }
std::atomic< std::size_t > counts_run_
Definition: Worker.h:99
size_t art::Worker::timesVisited ( ) const
inherited

Definition at line 162 of file Worker.cc.

References art::Worker::counts_visited_.

163  {
164  return counts_visited_.load();
165  }
std::atomic< std::size_t > counts_visited_
Definition: Worker.h:98
void art::OutputWorker::writeEvent ( EventPrincipal ep,
PathContext const &  pc 
)

Definition at line 160 of file OutputWorker.cc.

References actReg_, art::Worker::description(), module_, art::ActivityRegistry::sPostWriteEvent, and art::ActivityRegistry::sPreWriteEvent.

161  {
162  ModuleContext const mc{pc, description()};
163  actReg_.sPreWriteEvent.invoke(mc);
164  module_->doWriteEvent(ep, mc);
165  actReg_.sPostWriteEvent.invoke(mc);
166  }
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreWriteEvent
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
ActivityRegistry const & actReg_
Definition: OutputWorker.h:65
ModuleDescription const & description() const
Definition: Worker.cc:121
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostWriteEvent
void art::OutputWorker::writeRun ( RunPrincipal rp)

Definition at line 148 of file OutputWorker.cc.

References module_.

149  {
150  module_->doWriteRun(rp);
151  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63
void art::OutputWorker::writeSubRun ( SubRunPrincipal srp)

Definition at line 154 of file OutputWorker.cc.

References module_.

155  {
156  module_->doWriteSubRun(srp);
157  }
cet::exempt_ptr< OutputModule > module_
Definition: OutputWorker.h:63

Member Data Documentation

ActivityRegistry const& art::OutputWorker::actReg_
private

Definition at line 65 of file OutputWorker.h.

Referenced by closeFile(), openFile(), OutputWorker(), and writeEvent().

ServiceHandle<CatalogInterface> art::OutputWorker::ci_ {}
private

Definition at line 64 of file OutputWorker.h.

Referenced by closeFile(), openFile(), and OutputWorker().

std::atomic<std::size_t> art::Worker::counts_failed_ {}
protectedinherited
std::atomic<std::size_t> art::Worker::counts_passed_ {}
protectedinherited
std::atomic<std::size_t> art::Worker::counts_run_ {}
protectedinherited

Definition at line 99 of file Worker.h.

Referenced by art::WorkerT< T >::doProcess(), doProcess(), and art::Worker::timesRun().

std::atomic<std::size_t> art::Worker::counts_thrown_ {}
protectedinherited
std::atomic<std::size_t> art::Worker::counts_visited_ {}
protectedinherited
Granularity art::OutputWorker::fileGranularity_ {Granularity::Unset}
private

Definition at line 66 of file OutputWorker.h.


The documentation for this class was generated from the following files: