LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
art::Worker Class Referenceabstract

#include "Worker.h"

Inheritance diagram for art::Worker:
art::OutputWorker art::WorkerT< T >

Public Types

enum  State {
  Ready, Pass, Fail, Working,
  ExceptionThrown
}
 

Public Member Functions

virtual ~Worker ()
 
 Worker (ModuleDescription const &, WorkerParams const &)
 
void beginJob (detail::SharedResources const &)
 
void endJob ()
 
void respondToOpenInputFile (FileBlock const &fb)
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenOutputFiles (FileBlock const &fb)
 
void respondToCloseOutputFiles (FileBlock const &fb)
 
void doWork (Transition, Principal &, ModuleContext const &)
 
void doWork_event (hep::concurrency::WaitingTaskPtr workerInPathDoneTask, EventPrincipal &, ModuleContext const &)
 
void doWork_event (EventPrincipal &, ModuleContext const &)
 
ScheduleID scheduleID () const
 
bool returnCode () const
 
ModuleDescription const & description () const
 
hep::concurrency::SerialTaskQueueChain * serialTaskQueueChain () const
 
void reset ()
 
std::size_t timesVisited () const
 
std::size_t timesRun () const
 
std::size_t timesPassed () const
 
std::size_t timesFailed () const
 
std::size_t timesExcept () const
 
void runWorker (EventPrincipal &, ModuleContext const &)
 
bool isUnique () const
 

Protected Member Functions

std::string const & label () const
 

Protected Attributes

std::atomic< std::size_t > counts_visited_ {}
 
std::atomic< std::size_t > counts_run_ {}
 
std::atomic< std::size_t > counts_passed_ {}
 
std::atomic< std::size_t > counts_failed_ {}
 
std::atomic< std::size_t > counts_thrown_ {}
 

Private Member Functions

virtual hep::concurrency::SerialTaskQueueChain * doSerialTaskQueueChain () const =0
 
virtual void doBeginJob (detail::SharedResources const &resources)=0
 
virtual void doEndJob ()=0
 
virtual void doBegin (RunPrincipal &rp, ModuleContext const &mc)=0
 
virtual void doEnd (RunPrincipal &rp, ModuleContext const &mc)=0
 
virtual void doBegin (SubRunPrincipal &srp, ModuleContext const &mc)=0
 
virtual void doEnd (SubRunPrincipal &srp, ModuleContext const &mc)=0
 
virtual bool doProcess (EventPrincipal &, ModuleContext const &)=0
 
virtual void doRespondToOpenInputFile (FileBlock const &fb)=0
 
virtual void doRespondToCloseInputFile (FileBlock const &fb)=0
 
virtual void doRespondToOpenOutputFiles (FileBlock const &fb)=0
 
virtual void doRespondToCloseOutputFiles (FileBlock const &fb)=0
 

Private Attributes

ScheduleID const scheduleID_
 
ModuleDescription const md_
 
ActionTable const & actions_
 
ActivityRegistry const & actReg_
 
std::atomic< int > state_ {Ready}
 
std::exception_ptr cached_exception_ {}
 
std::atomic< bool > workStarted_ {false}
 
std::atomic< bool > returnCode_ {false}
 
hep::concurrency::WaitingTaskList waitingTasks_
 

Detailed Description

Definition at line 47 of file Worker.h.

Member Enumeration Documentation

Enumerator
Ready 
Pass 
Fail 
Working 
ExceptionThrown 

Definition at line 49 of file Worker.h.

Constructor & Destructor Documentation

art::Worker::~Worker ( )
virtualdefault
art::Worker::Worker ( ModuleDescription const &  md,
WorkerParams const &  wp 
)

Definition at line 108 of file Worker.cc.

References actions_, actReg_, md_, art::ModuleDescription::moduleLabel(), art::ModuleDescription::moduleName(), art::WorkerParams::scheduleID_, TDEBUG_FUNC_SI, and waitingTasks_.

109  : scheduleID_{wp.scheduleID_}
110  , md_{md}
111  , actions_{wp.actions_}
112  , actReg_{wp.actReg_}
113  , waitingTasks_{wp.taskGroup_}
114  {
115  TDEBUG_FUNC_SI(5, wp.scheduleID_)
116  << hex << this << dec << " name: " << md.moduleName()
117  << " label: " << md.moduleLabel();
118  }
ActionTable const & actions_
Definition: Worker.h:122
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
#define TDEBUG_FUNC_SI(LEVEL, SI)
ScheduleID const scheduleID_
Definition: Worker.h:120
hep::concurrency::WaitingTaskList waitingTasks_
Definition: Worker.h:143

Member Function Documentation

void art::Worker::beginJob ( detail::SharedResources const &  resources)

Definition at line 196 of file Worker.cc.

References actReg_, doBeginJob(), md_, art::ActivityRegistry::sPostModuleBeginJob, and art::ActivityRegistry::sPreModuleBeginJob.

197  {
199  doBeginJob(resources);
201  }
202  catch (...) {
203  rethrow_with_context(std::current_exception(), md_, "beginJob");
204  }
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleBeginJob
virtual void doBeginJob(detail::SharedResources const &resources)=0
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleBeginJob
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
ModuleDescription const & art::Worker::description ( ) const

Definition at line 121 of file Worker.cc.

References md_.

Referenced by art::OutputWorker::OutputWorker(), and art::OutputWorker::writeEvent().

122  {
123  return md_;
124  }
ModuleDescription const md_
Definition: Worker.h:121
virtual void art::Worker::doBegin ( RunPrincipal rp,
ModuleContext const &  mc 
)
privatepure virtual

Implemented in art::OutputWorker, and art::WorkerT< T >.

Referenced by doWork().

virtual void art::Worker::doBegin ( SubRunPrincipal srp,
ModuleContext const &  mc 
)
privatepure virtual

Implemented in art::OutputWorker, and art::WorkerT< T >.

virtual void art::Worker::doBeginJob ( detail::SharedResources const &  resources)
privatepure virtual

Implemented in art::OutputWorker, and art::WorkerT< T >.

Referenced by beginJob().

virtual void art::Worker::doEnd ( RunPrincipal rp,
ModuleContext const &  mc 
)
privatepure virtual

Implemented in art::OutputWorker, and art::WorkerT< T >.

Referenced by doWork().

virtual void art::Worker::doEnd ( SubRunPrincipal srp,
ModuleContext const &  mc 
)
privatepure virtual

Implemented in art::OutputWorker, and art::WorkerT< T >.

virtual void art::Worker::doEndJob ( )
privatepure virtual

Implemented in art::OutputWorker, and art::WorkerT< T >.

Referenced by endJob().

virtual bool art::Worker::doProcess ( EventPrincipal ,
ModuleContext const &   
)
privatepure virtual

Implemented in art::OutputWorker, and art::WorkerT< T >.

Referenced by doWork_event(), and runWorker().

virtual void art::Worker::doRespondToCloseInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in art::OutputWorker, and art::WorkerT< T >.

Referenced by respondToCloseInputFile().

virtual void art::Worker::doRespondToCloseOutputFiles ( FileBlock const &  fb)
privatepure virtual
virtual void art::Worker::doRespondToOpenInputFile ( FileBlock const &  fb)
privatepure virtual

Implemented in art::OutputWorker, and art::WorkerT< T >.

Referenced by respondToOpenInputFile().

virtual void art::Worker::doRespondToOpenOutputFiles ( FileBlock const &  fb)
privatepure virtual

Implemented in art::OutputWorker, and art::WorkerT< T >.

Referenced by respondToOpenOutputFiles().

virtual hep::concurrency::SerialTaskQueueChain* art::Worker::doSerialTaskQueueChain ( ) const
privatepure virtual

Implemented in art::OutputWorker, and art::WorkerT< T >.

Referenced by serialTaskQueueChain().

void art::Worker::doWork ( Transition  trans,
Principal principal,
ModuleContext const &  mc 
)

Definition at line 250 of file Worker.cc.

References actReg_, art::errors::BadAlloc, art::errors::BadExceptionType, art::BeginRun, art::BeginSubRun, cached_exception_, doBegin(), doEnd(), art::EndRun, art::EndSubRun, ExceptionThrown, Fail, md_, art::errors::OtherArt, Pass, Ready, art::errors::ScheduleExecutionFailure, art::ActivityRegistry::sPostModuleBeginRun, art::ActivityRegistry::sPostModuleBeginSubRun, art::ActivityRegistry::sPostModuleEndRun, art::ActivityRegistry::sPostModuleEndSubRun, art::ActivityRegistry::sPreModuleBeginRun, art::ActivityRegistry::sPreModuleBeginSubRun, art::ActivityRegistry::sPreModuleEndRun, art::ActivityRegistry::sPreModuleEndSubRun, state_, art::errors::StdException, art::errors::Unknown, and Working.

253  {
254  switch (state_.load()) {
255  case Ready:
256  break;
257  case Pass:
258  case Fail:
259  return;
260  case ExceptionThrown: {
261  // Rethrow the cached exception again. It seems impossible to
262  // get here a second time unless a cet::exception has been
263  // thrown previously.
264  mf::LogWarning("repeat") << "A module has been invoked a second time "
265  "even though it caught an exception during "
266  "the previous invocation.\nThis may be an "
267  "indication of a configuration problem.";
268  rethrow_exception(cached_exception_);
269  }
270  case Working:
271  break; // See below.
272  }
273 
274  try {
275  if (state_.load() == Working) {
276  // Not part of the switch statement above because we want the
277  // exception to be caught by our handling mechanism.
279  << "A Module has been invoked while it is still being executed.\n"
280  << "Product dependencies have invoked a module execution cycle.\n";
281  }
282  state_ = Working;
283  if (trans == Transition::BeginRun) {
284  actReg_.sPreModuleBeginRun.invoke(mc);
285  doBegin(dynamic_cast<RunPrincipal&>(principal), mc);
286  actReg_.sPostModuleBeginRun.invoke(mc);
287  } else if (trans == Transition::EndRun) {
288  actReg_.sPreModuleEndRun.invoke(mc);
289  doEnd(dynamic_cast<RunPrincipal&>(principal), mc);
290  actReg_.sPostModuleEndRun.invoke(mc);
291  } else if (trans == Transition::BeginSubRun) {
292  actReg_.sPreModuleBeginSubRun.invoke(mc);
293  doBegin(dynamic_cast<SubRunPrincipal&>(principal), mc);
294  actReg_.sPostModuleBeginSubRun.invoke(mc);
295  } else if (trans == Transition::EndSubRun) {
296  actReg_.sPreModuleEndSubRun.invoke(mc);
297  doEnd(dynamic_cast<SubRunPrincipal&>(principal), mc);
298  actReg_.sPostModuleEndSubRun.invoke(mc);
299  }
300  state_ = Pass;
301  }
302  catch (cet::exception& e) {
304  e << "The above exception was thrown while processing module "
305  << brief_context(md_, principal) << '\n';
306  if (auto edmEx = dynamic_cast<Exception*>(&e)) {
307  cached_exception_ = std::make_exception_ptr(*edmEx);
308  } else {
309  auto art_ex = Exception{errors::OtherArt, std::string(), e};
310  cached_exception_ = std::make_exception_ptr(art_ex);
311  }
312  throw;
313  }
314  catch (std::bad_alloc const& bda) {
316  auto art_ex =
318  << "A bad_alloc exception occurred during a call to the module "
319  << brief_context(md_, principal) << '\n'
320  << "The job has probably exhausted the virtual memory available to the "
321  "process.\n";
322  cached_exception_ = make_exception_ptr(art_ex);
323  rethrow_exception(cached_exception_);
324  }
325  catch (std::exception const& e) {
327  auto art_ex = Exception{errors::StdException}
328  << "An exception occurred during a call to the module "
329  << brief_context(md_, principal) << e.what();
330  cached_exception_ = make_exception_ptr(art_ex);
331  rethrow_exception(cached_exception_);
332  }
333  catch (std::string const& s) {
335  auto art_ex = Exception{errors::BadExceptionType, "string"}
336  << "A string thrown as an exception occurred during a call "
337  "to the module "
338  << brief_context(md_, principal) << '\n'
339  << s << '\n';
340  cached_exception_ = make_exception_ptr(art_ex);
341  rethrow_exception(cached_exception_);
342  }
343  catch (char const* c) {
345  auto art_ex = Exception{errors::BadExceptionType, "char const*"}
346  << "A char const* thrown as an exception occurred during a "
347  "call to the module "
348  << brief_context(md_, principal) << '\n'
349  << c << '\n';
350  cached_exception_ = make_exception_ptr(art_ex);
351  rethrow_exception(cached_exception_);
352  }
353  catch (...) {
355  auto art_ex =
356  Exception{errors::Unknown, "repeated"}
357  << "An unknown occurred during a previous call to the module "
358  << brief_context(md_, principal) << '\n';
359  cached_exception_ = make_exception_ptr(art_ex);
360  rethrow_exception(cached_exception_);
361  }
362  }
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleBeginRun
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleEndRun
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleEndRun
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleEndSubRun
std::atomic< int > state_
Definition: Worker.h:124
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleBeginRun
virtual void doBegin(RunPrincipal &rp, ModuleContext const &mc)=0
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
virtual void doEnd(RunPrincipal &rp, ModuleContext const &mc)=0
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleEndSubRun
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleBeginSubRun
Float_t e
Definition: plot.C:35
std::exception_ptr cached_exception_
Definition: Worker.h:134
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleBeginSubRun
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::Worker::doWork_event ( hep::concurrency::WaitingTaskPtr  workerInPathDoneTask,
EventPrincipal ,
ModuleContext const &   
)

Referenced by isUnique().

void art::Worker::doWork_event ( EventPrincipal p,
ModuleContext const &  mc 
)

Definition at line 366 of file Worker.cc.

References actions_, actReg_, art::errors::BadAlloc, art::errors::BadExceptionType, cached_exception_, counts_failed_, counts_passed_, counts_thrown_, counts_visited_, doProcess(), ExceptionThrown, Fail, art::actions::FailModule, art::ActionTable::find(), art::actions::IgnoreCompletely, md_, art::errors::OtherArt, Pass, returnCode_, art::ActivityRegistry::sPostModule, art::ActivityRegistry::sPreModule, state_, art::errors::StdException, art::errors::Unknown, and Working.

367  {
368  ++counts_visited_;
369  returnCode_ = false;
370  // Transition from Ready state to Working state.
371  state_ = Working;
372  actReg_.sPreModule.invoke(mc);
373  // Note: Only the TriggerResults inserter--a producer--calls this
374  // function. The return code is thus always true.
375  returnCode_ = doProcess(p, mc);
376  actReg_.sPostModule.invoke(mc);
377  assert(returnCode_.load());
378  state_ = Pass;
379  }
380  catch (cet::exception& e) {
381  auto action = actions_.find(e.root_cause());
382  assert(not mc.onEndPath());
383  if (action == actions::IgnoreCompletely) {
384  state_ = Pass;
385  returnCode_ = true;
386  ++counts_passed_;
387  mf::LogWarning("IgnoreCompletely") << "Module ignored an exception\n"
388  << e.what();
389  // WARNING: We will continue execution below!!!
390  } else if (action == actions::FailModule) {
391  state_ = Fail;
392  returnCode_ = true;
393  ++counts_failed_;
394  mf::LogWarning("FailModule") << "Module failed due to an exception\n"
395  << e.what();
396  // WARNING: We will continue execution below!!!
397  } else {
399  ++counts_thrown_;
400  e << "The above exception was thrown while processing module "
401  << brief_context(md_, p) << '\n';
402  if (auto edmEx = dynamic_cast<Exception*>(&e)) {
403  cached_exception_ = make_exception_ptr(*edmEx);
404  } else {
406  make_exception_ptr(Exception{errors::OtherArt, string(), e});
407  }
408  rethrow_exception(cached_exception_);
409  }
410  }
411  catch (bad_alloc const& bda) {
413  ++counts_thrown_;
414  auto art_ex =
416  << "A bad_alloc exception occurred during a call to the module "
417  << brief_context(md_, p) << '\n'
418  << "The job has probably exhausted the virtual memory available to the "
419  "process.\n";
420  cached_exception_ = make_exception_ptr(art_ex);
421  rethrow_exception(cached_exception_);
422  }
423  catch (exception const& e) {
425  ++counts_thrown_;
426  auto art_ex = Exception{errors::StdException}
427  << "An exception occurred during a call to the module "
428  << brief_context(md_, p) << '\n'
429  << e.what();
430  cached_exception_ = make_exception_ptr(art_ex);
431  rethrow_exception(cached_exception_);
432  }
433  catch (string const& s) {
435  ++counts_thrown_;
436  auto art_ex = Exception{errors::BadExceptionType, "string"}
437  << "A string thrown as an exception occurred during a call "
438  "to the module "
439  << brief_context(md_, p) << '\n'
440  << s << '\n';
441  cached_exception_ = make_exception_ptr(art_ex);
442  rethrow_exception(cached_exception_);
443  }
444  catch (char const* c) {
446  ++counts_thrown_;
447  auto art_ex = Exception{errors::BadExceptionType, "char const*"}
448  << "A char const* thrown as an exception occurred during a "
449  "call to the module "
450  << brief_context(md_, p) << '\n'
451  << c << '\n';
452  cached_exception_ = make_exception_ptr(art_ex);
453  rethrow_exception(cached_exception_);
454  }
455  catch (...) {
456  ++counts_thrown_;
458  auto art_ex = Exception{errors::Unknown, "repeated"}
459  << "An unknown occurred during a previous call to the module "
460  << brief_context(md_, p) << '\n';
461  cached_exception_ = make_exception_ptr(art_ex);
462  rethrow_exception(cached_exception_);
463  }
actions::ActionCodes find(std::string const &category) const
Definition: Actions.cc:58
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModule
ActionTable const & actions_
Definition: Worker.h:122
std::atomic< std::size_t > counts_visited_
Definition: Worker.h:98
std::atomic< int > state_
Definition: Worker.h:124
virtual bool doProcess(EventPrincipal &, ModuleContext const &)=0
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
std::atomic< bool > returnCode_
Definition: Worker.h:137
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModule
std::atomic< std::size_t > counts_thrown_
Definition: Worker.h:102
std::atomic< std::size_t > counts_passed_
Definition: Worker.h:100
std::atomic< std::size_t > counts_failed_
Definition: Worker.h:101
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
Float_t e
Definition: plot.C:35
std::exception_ptr cached_exception_
Definition: Worker.h:134
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::Worker::endJob ( )

Definition at line 207 of file Worker.cc.

References actReg_, doEndJob(), md_, art::ActivityRegistry::sPostModuleEndJob, and art::ActivityRegistry::sPreModuleEndJob.

208  {
209  actReg_.sPreModuleEndJob.invoke(md_);
210  doEndJob();
211  actReg_.sPostModuleEndJob.invoke(md_);
212  }
213  catch (...) {
214  rethrow_with_context(std::current_exception(), md_, "endJob");
215  }
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
virtual void doEndJob()=0
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleEndJob
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleEndJob
bool art::Worker::isUnique ( ) const
string const & art::Worker::label ( ) const
protected

Definition at line 127 of file Worker.cc.

References md_, and art::ModuleDescription::moduleLabel().

Referenced by art::OutputWorker::closeFile(), art::OutputWorker::openFile(), and art::OutputWorker::OutputWorker().

128  {
129  return md_.moduleLabel();
130  }
std::string const & moduleLabel() const
ModuleDescription const md_
Definition: Worker.h:121
void art::Worker::reset ( )

Definition at line 149 of file Worker.cc.

References cached_exception_, Ready, returnCode_, scheduleID_, state_, TDEBUG_FUNC_SI, waitingTasks_, and workStarted_.

150  {
151  state_ = Ready;
152  cached_exception_ = std::exception_ptr{};
154  << hex << this << dec << " Resetting waitingTasks_";
155  waitingTasks_.reset();
156  workStarted_ = false;
157  returnCode_ = false;
158  }
std::atomic< int > state_
Definition: Worker.h:124
#define TDEBUG_FUNC_SI(LEVEL, SI)
ScheduleID const scheduleID_
Definition: Worker.h:120
std::atomic< bool > returnCode_
Definition: Worker.h:137
std::exception_ptr cached_exception_
Definition: Worker.h:134
hep::concurrency::WaitingTaskList waitingTasks_
Definition: Worker.h:143
std::atomic< bool > workStarted_
Definition: Worker.h:136
void art::Worker::respondToCloseInputFile ( FileBlock const &  fb)

Definition at line 226 of file Worker.cc.

References actReg_, doRespondToCloseInputFile(), md_, art::ActivityRegistry::sPostModuleRespondToCloseInputFile, and art::ActivityRegistry::sPreModuleRespondToCloseInputFile.

227  {
231  }
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToCloseInputFile
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
virtual void doRespondToCloseInputFile(FileBlock const &fb)=0
TFile fb("Li6.root")
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToCloseInputFile
void art::Worker::respondToCloseOutputFiles ( FileBlock const &  fb)

Definition at line 242 of file Worker.cc.

References actReg_, doRespondToCloseOutputFiles(), md_, art::ActivityRegistry::sPostModuleRespondToCloseOutputFiles, and art::ActivityRegistry::sPreModuleRespondToCloseOutputFiles.

243  {
247  }
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToCloseOutputFiles
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
TFile fb("Li6.root")
virtual void doRespondToCloseOutputFiles(FileBlock const &fb)=0
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToCloseOutputFiles
void art::Worker::respondToOpenInputFile ( FileBlock const &  fb)

Definition at line 218 of file Worker.cc.

References actReg_, doRespondToOpenInputFile(), md_, art::ActivityRegistry::sPostModuleRespondToOpenInputFile, and art::ActivityRegistry::sPreModuleRespondToOpenInputFile.

219  {
223  }
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToOpenInputFile
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToOpenInputFile
TFile fb("Li6.root")
virtual void doRespondToOpenInputFile(FileBlock const &fb)=0
void art::Worker::respondToOpenOutputFiles ( FileBlock const &  fb)

Definition at line 234 of file Worker.cc.

References actReg_, doRespondToOpenOutputFiles(), md_, art::ActivityRegistry::sPostModuleRespondToOpenOutputFiles, and art::ActivityRegistry::sPreModuleRespondToOpenOutputFiles.

235  {
239  }
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
TFile fb("Li6.root")
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToOpenOutputFiles
virtual void doRespondToOpenOutputFiles(FileBlock const &fb)=0
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToOpenOutputFiles
bool art::Worker::returnCode ( ) const

Definition at line 134 of file Worker.cc.

References returnCode_.

135  {
136  return returnCode_.load();
137  }
std::atomic< bool > returnCode_
Definition: Worker.h:137
void art::Worker::runWorker ( EventPrincipal p,
ModuleContext const &  mc 
)

Definition at line 466 of file Worker.cc.

References actions_, actReg_, art::errors::BadAlloc, art::errors::BadExceptionType, cached_exception_, counts_failed_, counts_passed_, counts_thrown_, doProcess(), ExceptionThrown, Fail, art::actions::FailModule, art::actions::FailPath, art::ActionTable::find(), art::actions::IgnoreCompletely, md_, art::ModuleContext::onEndPath(), art::errors::OtherArt, Pass, returnCode_, art::ModuleContext::scheduleID(), art::actions::SkipEvent, art::ActivityRegistry::sPostModule, art::ActivityRegistry::sPreModule, state_, art::errors::StdException, TDEBUG_BEGIN_TASK_SI, TDEBUG_END_TASK_SI, art::errors::Unknown, waitingTasks_, and Working.

Referenced by isUnique().

467  {
468  auto const sid = mc.scheduleID();
469  TDEBUG_BEGIN_TASK_SI(4, sid);
470  returnCode_ = false;
471  try {
472  // Transition from Ready state to Working state.
473  state_ = Working;
474  actReg_.sPreModule.invoke(mc);
475  // Note: Only filters ever return false, and when they do it
476  // means they have rejected.
477  returnCode_ = doProcess(p, mc);
478  actReg_.sPostModule.invoke(mc);
479  state_ = Fail;
480  if (returnCode_.load()) {
481  state_ = Pass;
482  }
483  }
484  catch (cet::exception& e) {
485  auto action = actions_.find(e.root_cause());
486  // If we are processing an endPath, treat SkipEvent or FailPath
487  // as FailModule, so any subsequent OutputModules are still run.
488  if (mc.onEndPath()) {
489  if ((action == actions::SkipEvent) || (action == actions::FailPath)) {
490  action = actions::FailModule;
491  }
492  }
493  if (action == actions::IgnoreCompletely) {
494  state_ = Pass;
495  returnCode_ = true;
496  ++counts_passed_;
497  mf::LogWarning("IgnoreCompletely") << "Module ignored an exception\n"
498  << e.what();
499  // WARNING: We will continue execution below!!!
500  } else if (action == actions::FailModule) {
501  state_ = Fail;
502  returnCode_ = true;
503  ++counts_failed_;
504  mf::LogWarning("FailModule") << "Module failed due to an exception\n"
505  << e.what();
506  // WARNING: We will continue execution below!!!
507  } else {
509  ++counts_thrown_;
510  e << "The above exception was thrown while processing module "
511  << brief_context(md_, p);
512  if (auto art_ex = dynamic_cast<Exception*>(&e)) {
513  cached_exception_ = make_exception_ptr(*art_ex);
514  } else {
516  make_exception_ptr(Exception{errors::OtherArt, string(), e});
517  }
518  waitingTasks_.doneWaiting(cached_exception_);
519  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
520  return;
521  }
522  }
523  catch (bad_alloc const& bda) {
525  ++counts_thrown_;
526  auto art_ex =
528  << "A bad_alloc exception was thrown while processing module "
529  << brief_context(md_, p) << '\n'
530  << "The job has probably exhausted the virtual memory available to the "
531  "process.\n";
532  cached_exception_ = make_exception_ptr(art_ex);
533  waitingTasks_.doneWaiting(cached_exception_);
534  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
535  return;
536  }
537  catch (exception const& e) {
539  ++counts_thrown_;
540  auto art_ex = Exception{errors::StdException}
541  << "An exception was thrown while processing module "
542  << brief_context(md_, p) << '\n'
543  << e.what();
544  cached_exception_ = make_exception_ptr(art_ex);
545  waitingTasks_.doneWaiting(cached_exception_);
546  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
547  return;
548  }
549  catch (string const& s) {
551  ++counts_thrown_;
552  auto art_ex =
554  << "A string was thrown as an exception while processing module "
555  << brief_context(md_, p) << '\n'
556  << s << '\n';
557  cached_exception_ = make_exception_ptr(art_ex);
558  waitingTasks_.doneWaiting(cached_exception_);
559  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
560  return;
561  }
562  catch (char const* c) {
564  ++counts_thrown_;
565  auto art_ex =
566  Exception{errors::BadExceptionType, "char const*"}
567  << "A char const* was thrown as an exception while processing module "
568  << brief_context(md_, p) << '\n'
569  << c << '\n';
570  cached_exception_ = make_exception_ptr(art_ex);
571  waitingTasks_.doneWaiting(cached_exception_);
572  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
573  return;
574  }
575  catch (...) {
576  ++counts_thrown_;
578  auto art_ex =
579  Exception{errors::Unknown, "repeated"}
580  << "An unknown exception was thrown while processing module "
581  << brief_context(md_, p) << '\n';
582  cached_exception_ = make_exception_ptr(art_ex);
583  waitingTasks_.doneWaiting(cached_exception_);
584  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
585  return;
586  }
587  waitingTasks_.doneWaiting(exception_ptr{});
588  TDEBUG_END_TASK_SI(4, sid);
589  }
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
actions::ActionCodes find(std::string const &category) const
Definition: Actions.cc:58
#define TDEBUG_END_TASK_SI(LEVEL, SI)
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModule
ActionTable const & actions_
Definition: Worker.h:122
std::atomic< int > state_
Definition: Worker.h:124
virtual bool doProcess(EventPrincipal &, ModuleContext const &)=0
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
std::atomic< bool > returnCode_
Definition: Worker.h:137
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModule
std::atomic< std::size_t > counts_thrown_
Definition: Worker.h:102
std::atomic< std::size_t > counts_passed_
Definition: Worker.h:100
std::atomic< std::size_t > counts_failed_
Definition: Worker.h:101
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
Float_t e
Definition: plot.C:35
std::exception_ptr cached_exception_
Definition: Worker.h:134
hep::concurrency::WaitingTaskList waitingTasks_
Definition: Worker.h:143
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
ScheduleID art::Worker::scheduleID ( ) const
inline

Definition at line 70 of file Worker.h.

71  {
72  return scheduleID_;
73  }
ScheduleID const scheduleID_
Definition: Worker.h:120
SerialTaskQueueChain * art::Worker::serialTaskQueueChain ( ) const

Definition at line 140 of file Worker.cc.

References doSerialTaskQueueChain().

Referenced by isUnique().

141  {
142  return doSerialTaskQueueChain();
143  }
virtual hep::concurrency::SerialTaskQueueChain * doSerialTaskQueueChain() const =0
size_t art::Worker::timesExcept ( ) const

Definition at line 190 of file Worker.cc.

References counts_thrown_.

191  {
192  return counts_thrown_.load();
193  }
std::atomic< std::size_t > counts_thrown_
Definition: Worker.h:102
size_t art::Worker::timesFailed ( ) const

Definition at line 183 of file Worker.cc.

References counts_failed_.

184  {
185  return counts_failed_.load();
186  }
std::atomic< std::size_t > counts_failed_
Definition: Worker.h:101
size_t art::Worker::timesPassed ( ) const

Definition at line 176 of file Worker.cc.

References counts_passed_.

177  {
178  return counts_passed_.load();
179  }
std::atomic< std::size_t > counts_passed_
Definition: Worker.h:100
size_t art::Worker::timesRun ( ) const

Definition at line 169 of file Worker.cc.

References counts_run_.

170  {
171  return counts_run_.load();
172  }
std::atomic< std::size_t > counts_run_
Definition: Worker.h:99
size_t art::Worker::timesVisited ( ) const

Definition at line 162 of file Worker.cc.

References counts_visited_.

163  {
164  return counts_visited_.load();
165  }
std::atomic< std::size_t > counts_visited_
Definition: Worker.h:98

Member Data Documentation

ActionTable const& art::Worker::actions_
private

Definition at line 122 of file Worker.h.

Referenced by doWork_event(), runWorker(), and Worker().

std::exception_ptr art::Worker::cached_exception_ {}
private

Definition at line 134 of file Worker.h.

Referenced by doWork(), doWork_event(), reset(), and runWorker().

std::atomic<std::size_t> art::Worker::counts_failed_ {}
protected
std::atomic<std::size_t> art::Worker::counts_passed_ {}
protected
std::atomic<std::size_t> art::Worker::counts_run_ {}
protected

Definition at line 99 of file Worker.h.

Referenced by art::WorkerT< T >::doProcess(), art::OutputWorker::doProcess(), and timesRun().

std::atomic<std::size_t> art::Worker::counts_thrown_ {}
protected

Definition at line 102 of file Worker.h.

Referenced by doWork_event(), runWorker(), and timesExcept().

std::atomic<std::size_t> art::Worker::counts_visited_ {}
protected

Definition at line 98 of file Worker.h.

Referenced by doWork_event(), isUnique(), and timesVisited().

std::atomic<bool> art::Worker::returnCode_ {false}
private

Definition at line 137 of file Worker.h.

Referenced by doWork_event(), reset(), returnCode(), and runWorker().

ScheduleID const art::Worker::scheduleID_
private

Definition at line 120 of file Worker.h.

Referenced by isUnique(), and reset().

std::atomic<int> art::Worker::state_ {Ready}
private

Definition at line 124 of file Worker.h.

Referenced by doWork(), doWork_event(), reset(), and runWorker().

hep::concurrency::WaitingTaskList art::Worker::waitingTasks_
private

Definition at line 143 of file Worker.h.

Referenced by isUnique(), reset(), runWorker(), and Worker().

std::atomic<bool> art::Worker::workStarted_ {false}
private

Definition at line 136 of file Worker.h.

Referenced by isUnique(), and reset().


The documentation for this class was generated from the following files: