LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
art::WorkerT< T > Class Template Reference

#include "WorkerT.h"

Inheritance diagram for art::WorkerT< T >:
art::Worker

Public Types

enum  State {
  Ready, Pass, Fail, Working,
  ExceptionThrown
}
 

Public Member Functions

 WorkerT (T *, WorkerParams const &)
 
void beginJob (detail::SharedResources const &)
 
void endJob ()
 
void respondToOpenInputFile (FileBlock const &fb)
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenOutputFiles (FileBlock const &fb)
 
void respondToCloseOutputFiles (FileBlock const &fb)
 
void doWork (Transition, Principal &, ModuleContext const &)
 
void doWork_event (hep::concurrency::WaitingTaskPtr workerInPathDoneTask, EventPrincipal &, ModuleContext const &)
 
void doWork_event (EventPrincipal &, ModuleContext const &)
 
ScheduleID scheduleID () const
 
bool returnCode () const
 
ModuleDescription const & description () const
 
hep::concurrency::SerialTaskQueueChain * serialTaskQueueChain () const
 
void reset ()
 
std::size_t timesVisited () const
 
std::size_t timesRun () const
 
std::size_t timesPassed () const
 
std::size_t timesFailed () const
 
std::size_t timesExcept () const
 
void runWorker (EventPrincipal &, ModuleContext const &)
 
bool isUnique () const
 

Protected Member Functions

std::string const & label () const
 

Protected Attributes

std::atomic< std::size_t > counts_visited_ {}
 
std::atomic< std::size_t > counts_run_ {}
 
std::atomic< std::size_t > counts_passed_ {}
 
std::atomic< std::size_t > counts_failed_ {}
 
std::atomic< std::size_t > counts_thrown_ {}
 

Private Member Functions

hep::concurrency::SerialTaskQueueChain * doSerialTaskQueueChain () const override
 
void doBeginJob (detail::SharedResources const &) override
 
void doEndJob () override
 
void doRespondToOpenInputFile (FileBlock const &) override
 
void doRespondToCloseInputFile (FileBlock const &) override
 
void doRespondToOpenOutputFiles (FileBlock const &) override
 
void doRespondToCloseOutputFiles (FileBlock const &) override
 
void doBegin (RunPrincipal &, ModuleContext const &) override
 
void doEnd (RunPrincipal &, ModuleContext const &) override
 
void doBegin (SubRunPrincipal &, ModuleContext const &) override
 
void doEnd (SubRunPrincipal &, ModuleContext const &) override
 
bool doProcess (EventPrincipal &, ModuleContext const &) override
 

Private Attributes

cet::exempt_ptr< T > module_
 

Detailed Description

template<typename T>
class art::WorkerT< T >

Definition at line 19 of file WorkerT.h.

Member Enumeration Documentation

enum art::Worker::State
inherited
Enumerator
Ready 
Pass 
Fail 
Working 
ExceptionThrown 

Definition at line 49 of file Worker.h.

Constructor & Destructor Documentation

template<typename T >
art::WorkerT< T >::WorkerT ( T *  module,
WorkerParams const &  wp 
)

Definition at line 51 of file WorkerT.h.

References art::ScheduleID::first(), art::module, and art::WorkerT< T >::module_.

52  : Worker{module->moduleDescription(), wp}, module_{module}
53  {
54  if constexpr (std::is_base_of_v<Modifier, T>) {
55  if (wp.scheduleID_ == ScheduleID::first()) {
56  // We only want to register the products, not once for every
57  // schedule...
58  module_->registerProducts(wp.producedProducts_);
59  } else {
60  // ...but we need to fill product descriptions for each module
61  // copy.
62  module_->fillProductDescriptions();
63  }
64  }
65 
66  // Register shared resources only once
67  if constexpr (std::is_base_of_v<detail::SharedModule, T>) {
68  if (wp.scheduleID_ == ScheduleID::first()) {
69  wp.resources_.registerSharedResources(module_->sharedResources());
70  }
71  }
72  }
static constexpr ScheduleID first()
Definition: ScheduleID.h:50
Worker(ModuleDescription const &, WorkerParams const &)
Definition: Worker.cc:108
cet::exempt_ptr< T > module_
Definition: WorkerT.h:41

Member Function Documentation

void art::Worker::beginJob ( detail::SharedResources const &  resources)
inherited

Definition at line 196 of file Worker.cc.

References art::Worker::actReg_, art::Worker::doBeginJob(), art::Worker::md_, art::ActivityRegistry::sPostModuleBeginJob, and art::ActivityRegistry::sPreModuleBeginJob.

197  {
199  doBeginJob(resources);
201  }
202  catch (...) {
203  rethrow_with_context(std::current_exception(), md_, "beginJob");
204  }
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleBeginJob
virtual void doBeginJob(detail::SharedResources const &resources)=0
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleBeginJob
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
ModuleDescription const & art::Worker::description ( ) const
inherited

Definition at line 121 of file Worker.cc.

References art::Worker::md_.

Referenced by art::OutputWorker::OutputWorker(), and art::OutputWorker::writeEvent().

122  {
123  return md_;
124  }
ModuleDescription const md_
Definition: Worker.h:121
template<typename T >
void art::WorkerT< T >::doBegin ( RunPrincipal rp,
ModuleContext const &  mc 
)
overrideprivatevirtual

Implements art::Worker.

Definition at line 129 of file WorkerT.h.

References art::WorkerT< T >::module_.

130  {
131  module_->doBeginRun(rp, mc);
132  }
cet::exempt_ptr< T > module_
Definition: WorkerT.h:41
template<typename T >
void art::WorkerT< T >::doBegin ( SubRunPrincipal srp,
ModuleContext const &  mc 
)
overrideprivatevirtual

Implements art::Worker.

Definition at line 143 of file WorkerT.h.

References art::WorkerT< T >::module_.

144  {
145  module_->doBeginSubRun(srp, mc);
146  }
cet::exempt_ptr< T > module_
Definition: WorkerT.h:41
template<typename T >
void art::WorkerT< T >::doBeginJob ( detail::SharedResources const &  resources)
overrideprivatevirtual

Implements art::Worker.

Definition at line 87 of file WorkerT.h.

References art::WorkerT< T >::module_.

88  {
89  module_->doBeginJob(resources);
90  }
cet::exempt_ptr< T > module_
Definition: WorkerT.h:41
template<typename T >
void art::WorkerT< T >::doEnd ( RunPrincipal rp,
ModuleContext const &  mc 
)
overrideprivatevirtual

Implements art::Worker.

Definition at line 136 of file WorkerT.h.

References art::WorkerT< T >::module_.

137  {
138  module_->doEndRun(rp, mc);
139  }
cet::exempt_ptr< T > module_
Definition: WorkerT.h:41
template<typename T >
void art::WorkerT< T >::doEnd ( SubRunPrincipal srp,
ModuleContext const &  mc 
)
overrideprivatevirtual

Implements art::Worker.

Definition at line 150 of file WorkerT.h.

References art::WorkerT< T >::module_.

151  {
152  module_->doEndSubRun(srp, mc);
153  }
cet::exempt_ptr< T > module_
Definition: WorkerT.h:41
template<typename T >
void art::WorkerT< T >::doEndJob ( )
overrideprivatevirtual

Implements art::Worker.

Definition at line 94 of file WorkerT.h.

References art::WorkerT< T >::module_.

95  {
96  module_->doEndJob();
97  }
cet::exempt_ptr< T > module_
Definition: WorkerT.h:41
template<typename T >
bool art::WorkerT< T >::doProcess ( EventPrincipal ep,
ModuleContext const &  mc 
)
overrideprivatevirtual

Implements art::Worker.

Definition at line 157 of file WorkerT.h.

References art::Worker::counts_failed_, art::Worker::counts_passed_, art::Worker::counts_run_, and art::WorkerT< T >::module_.

158  {
159  // Note, only filters ever return false, and when they do it means
160  // they have rejected.
161  return module_->doEvent(
163  }
std::atomic< std::size_t > counts_run_
Definition: Worker.h:99
cet::exempt_ptr< T > module_
Definition: WorkerT.h:41
std::atomic< std::size_t > counts_passed_
Definition: Worker.h:100
std::atomic< std::size_t > counts_failed_
Definition: Worker.h:101
template<typename T >
void art::WorkerT< T >::doRespondToCloseInputFile ( FileBlock const &  fb)
overrideprivatevirtual

Implements art::Worker.

Definition at line 108 of file WorkerT.h.

References art::WorkerT< T >::module_.

109  {
110  module_->doRespondToCloseInputFile(fb);
111  }
TFile fb("Li6.root")
cet::exempt_ptr< T > module_
Definition: WorkerT.h:41
template<typename T >
void art::WorkerT< T >::doRespondToCloseOutputFiles ( FileBlock const &  fb)
overrideprivatevirtual

Implements art::Worker.

Definition at line 122 of file WorkerT.h.

References art::WorkerT< T >::module_.

123  {
124  module_->doRespondToCloseOutputFiles(fb);
125  }
TFile fb("Li6.root")
cet::exempt_ptr< T > module_
Definition: WorkerT.h:41
template<typename T >
void art::WorkerT< T >::doRespondToOpenInputFile ( FileBlock const &  fb)
overrideprivatevirtual

Implements art::Worker.

Definition at line 101 of file WorkerT.h.

References art::WorkerT< T >::module_.

102  {
103  module_->doRespondToOpenInputFile(fb);
104  }
TFile fb("Li6.root")
cet::exempt_ptr< T > module_
Definition: WorkerT.h:41
template<typename T >
void art::WorkerT< T >::doRespondToOpenOutputFiles ( FileBlock const &  fb)
overrideprivatevirtual

Implements art::Worker.

Definition at line 115 of file WorkerT.h.

References art::WorkerT< T >::module_.

116  {
117  module_->doRespondToOpenOutputFiles(fb);
118  }
TFile fb("Li6.root")
cet::exempt_ptr< T > module_
Definition: WorkerT.h:41
template<typename T >
hep::concurrency::SerialTaskQueueChain * art::WorkerT< T >::doSerialTaskQueueChain ( ) const
overrideprivatevirtual

Implements art::Worker.

Definition at line 76 of file WorkerT.h.

References art::WorkerT< T >::module_.

77  {
78  if constexpr (std::is_base_of_v<detail::SharedModule, T>) {
79  return module_->serialTaskQueueChain();
80  } else {
81  return nullptr;
82  }
83  }
cet::exempt_ptr< T > module_
Definition: WorkerT.h:41
void art::Worker::doWork ( Transition  trans,
Principal principal,
ModuleContext const &  mc 
)
inherited

Definition at line 250 of file Worker.cc.

References art::Worker::actReg_, art::errors::BadAlloc, art::errors::BadExceptionType, art::BeginRun, art::BeginSubRun, art::Worker::cached_exception_, art::Worker::doBegin(), art::Worker::doEnd(), art::EndRun, art::EndSubRun, art::Worker::ExceptionThrown, art::Worker::Fail, art::Worker::md_, art::errors::OtherArt, art::Worker::Pass, art::Worker::Ready, art::errors::ScheduleExecutionFailure, art::ActivityRegistry::sPostModuleBeginRun, art::ActivityRegistry::sPostModuleBeginSubRun, art::ActivityRegistry::sPostModuleEndRun, art::ActivityRegistry::sPostModuleEndSubRun, art::ActivityRegistry::sPreModuleBeginRun, art::ActivityRegistry::sPreModuleBeginSubRun, art::ActivityRegistry::sPreModuleEndRun, art::ActivityRegistry::sPreModuleEndSubRun, art::Worker::state_, art::errors::StdException, art::errors::Unknown, and art::Worker::Working.

253  {
254  switch (state_.load()) {
255  case Ready:
256  break;
257  case Pass:
258  case Fail:
259  return;
260  case ExceptionThrown: {
261  // Rethrow the cached exception again. It seems impossible to
262  // get here a second time unless a cet::exception has been
263  // thrown previously.
264  mf::LogWarning("repeat") << "A module has been invoked a second time "
265  "even though it caught an exception during "
266  "the previous invocation.\nThis may be an "
267  "indication of a configuration problem.";
268  rethrow_exception(cached_exception_);
269  }
270  case Working:
271  break; // See below.
272  }
273 
274  try {
275  if (state_.load() == Working) {
276  // Not part of the switch statement above because we want the
277  // exception to be caught by our handling mechanism.
279  << "A Module has been invoked while it is still being executed.\n"
280  << "Product dependencies have invoked a module execution cycle.\n";
281  }
282  state_ = Working;
283  if (trans == Transition::BeginRun) {
284  actReg_.sPreModuleBeginRun.invoke(mc);
285  doBegin(dynamic_cast<RunPrincipal&>(principal), mc);
286  actReg_.sPostModuleBeginRun.invoke(mc);
287  } else if (trans == Transition::EndRun) {
288  actReg_.sPreModuleEndRun.invoke(mc);
289  doEnd(dynamic_cast<RunPrincipal&>(principal), mc);
290  actReg_.sPostModuleEndRun.invoke(mc);
291  } else if (trans == Transition::BeginSubRun) {
292  actReg_.sPreModuleBeginSubRun.invoke(mc);
293  doBegin(dynamic_cast<SubRunPrincipal&>(principal), mc);
294  actReg_.sPostModuleBeginSubRun.invoke(mc);
295  } else if (trans == Transition::EndSubRun) {
296  actReg_.sPreModuleEndSubRun.invoke(mc);
297  doEnd(dynamic_cast<SubRunPrincipal&>(principal), mc);
298  actReg_.sPostModuleEndSubRun.invoke(mc);
299  }
300  state_ = Pass;
301  }
302  catch (cet::exception& e) {
304  e << "The above exception was thrown while processing module "
305  << brief_context(md_, principal) << '\n';
306  if (auto edmEx = dynamic_cast<Exception*>(&e)) {
307  cached_exception_ = std::make_exception_ptr(*edmEx);
308  } else {
309  auto art_ex = Exception{errors::OtherArt, std::string(), e};
310  cached_exception_ = std::make_exception_ptr(art_ex);
311  }
312  throw;
313  }
314  catch (std::bad_alloc const& bda) {
316  auto art_ex =
318  << "A bad_alloc exception occurred during a call to the module "
319  << brief_context(md_, principal) << '\n'
320  << "The job has probably exhausted the virtual memory available to the "
321  "process.\n";
322  cached_exception_ = make_exception_ptr(art_ex);
323  rethrow_exception(cached_exception_);
324  }
325  catch (std::exception const& e) {
327  auto art_ex = Exception{errors::StdException}
328  << "An exception occurred during a call to the module "
329  << brief_context(md_, principal) << e.what();
330  cached_exception_ = make_exception_ptr(art_ex);
331  rethrow_exception(cached_exception_);
332  }
333  catch (std::string const& s) {
335  auto art_ex = Exception{errors::BadExceptionType, "string"}
336  << "A string thrown as an exception occurred during a call "
337  "to the module "
338  << brief_context(md_, principal) << '\n'
339  << s << '\n';
340  cached_exception_ = make_exception_ptr(art_ex);
341  rethrow_exception(cached_exception_);
342  }
343  catch (char const* c) {
345  auto art_ex = Exception{errors::BadExceptionType, "char const*"}
346  << "A char const* thrown as an exception occurred during a "
347  "call to the module "
348  << brief_context(md_, principal) << '\n'
349  << c << '\n';
350  cached_exception_ = make_exception_ptr(art_ex);
351  rethrow_exception(cached_exception_);
352  }
353  catch (...) {
355  auto art_ex =
356  Exception{errors::Unknown, "repeated"}
357  << "An unknown occurred during a previous call to the module "
358  << brief_context(md_, principal) << '\n';
359  cached_exception_ = make_exception_ptr(art_ex);
360  rethrow_exception(cached_exception_);
361  }
362  }
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleBeginRun
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleEndRun
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleEndRun
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleEndSubRun
std::atomic< int > state_
Definition: Worker.h:124
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleBeginRun
virtual void doBegin(RunPrincipal &rp, ModuleContext const &mc)=0
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
virtual void doEnd(RunPrincipal &rp, ModuleContext const &mc)=0
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleEndSubRun
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleBeginSubRun
Float_t e
Definition: plot.C:35
std::exception_ptr cached_exception_
Definition: Worker.h:134
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleBeginSubRun
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::Worker::doWork_event ( hep::concurrency::WaitingTaskPtr  workerInPathDoneTask,
EventPrincipal ,
ModuleContext const &   
)
inherited

Referenced by art::Worker::isUnique().

void art::Worker::doWork_event ( EventPrincipal p,
ModuleContext const &  mc 
)
inherited

Definition at line 366 of file Worker.cc.

References art::Worker::actions_, art::Worker::actReg_, art::errors::BadAlloc, art::errors::BadExceptionType, art::Worker::cached_exception_, art::Worker::counts_failed_, art::Worker::counts_passed_, art::Worker::counts_thrown_, art::Worker::counts_visited_, art::Worker::doProcess(), art::Worker::ExceptionThrown, art::Worker::Fail, art::actions::FailModule, art::ActionTable::find(), art::actions::IgnoreCompletely, art::Worker::md_, art::errors::OtherArt, art::Worker::Pass, art::Worker::returnCode_, art::ActivityRegistry::sPostModule, art::ActivityRegistry::sPreModule, art::Worker::state_, art::errors::StdException, art::errors::Unknown, and art::Worker::Working.

367  {
368  ++counts_visited_;
369  returnCode_ = false;
370  // Transition from Ready state to Working state.
371  state_ = Working;
372  actReg_.sPreModule.invoke(mc);
373  // Note: Only the TriggerResults inserter--a producer--calls this
374  // function. The return code is thus always true.
375  returnCode_ = doProcess(p, mc);
376  actReg_.sPostModule.invoke(mc);
377  assert(returnCode_.load());
378  state_ = Pass;
379  }
380  catch (cet::exception& e) {
381  auto action = actions_.find(e.root_cause());
382  assert(not mc.onEndPath());
383  if (action == actions::IgnoreCompletely) {
384  state_ = Pass;
385  returnCode_ = true;
386  ++counts_passed_;
387  mf::LogWarning("IgnoreCompletely") << "Module ignored an exception\n"
388  << e.what();
389  // WARNING: We will continue execution below!!!
390  } else if (action == actions::FailModule) {
391  state_ = Fail;
392  returnCode_ = true;
393  ++counts_failed_;
394  mf::LogWarning("FailModule") << "Module failed due to an exception\n"
395  << e.what();
396  // WARNING: We will continue execution below!!!
397  } else {
399  ++counts_thrown_;
400  e << "The above exception was thrown while processing module "
401  << brief_context(md_, p) << '\n';
402  if (auto edmEx = dynamic_cast<Exception*>(&e)) {
403  cached_exception_ = make_exception_ptr(*edmEx);
404  } else {
406  make_exception_ptr(Exception{errors::OtherArt, string(), e});
407  }
408  rethrow_exception(cached_exception_);
409  }
410  }
411  catch (bad_alloc const& bda) {
413  ++counts_thrown_;
414  auto art_ex =
416  << "A bad_alloc exception occurred during a call to the module "
417  << brief_context(md_, p) << '\n'
418  << "The job has probably exhausted the virtual memory available to the "
419  "process.\n";
420  cached_exception_ = make_exception_ptr(art_ex);
421  rethrow_exception(cached_exception_);
422  }
423  catch (exception const& e) {
425  ++counts_thrown_;
426  auto art_ex = Exception{errors::StdException}
427  << "An exception occurred during a call to the module "
428  << brief_context(md_, p) << '\n'
429  << e.what();
430  cached_exception_ = make_exception_ptr(art_ex);
431  rethrow_exception(cached_exception_);
432  }
433  catch (string const& s) {
435  ++counts_thrown_;
436  auto art_ex = Exception{errors::BadExceptionType, "string"}
437  << "A string thrown as an exception occurred during a call "
438  "to the module "
439  << brief_context(md_, p) << '\n'
440  << s << '\n';
441  cached_exception_ = make_exception_ptr(art_ex);
442  rethrow_exception(cached_exception_);
443  }
444  catch (char const* c) {
446  ++counts_thrown_;
447  auto art_ex = Exception{errors::BadExceptionType, "char const*"}
448  << "A char const* thrown as an exception occurred during a "
449  "call to the module "
450  << brief_context(md_, p) << '\n'
451  << c << '\n';
452  cached_exception_ = make_exception_ptr(art_ex);
453  rethrow_exception(cached_exception_);
454  }
455  catch (...) {
456  ++counts_thrown_;
458  auto art_ex = Exception{errors::Unknown, "repeated"}
459  << "An unknown occurred during a previous call to the module "
460  << brief_context(md_, p) << '\n';
461  cached_exception_ = make_exception_ptr(art_ex);
462  rethrow_exception(cached_exception_);
463  }
actions::ActionCodes find(std::string const &category) const
Definition: Actions.cc:58
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModule
ActionTable const & actions_
Definition: Worker.h:122
std::atomic< std::size_t > counts_visited_
Definition: Worker.h:98
std::atomic< int > state_
Definition: Worker.h:124
virtual bool doProcess(EventPrincipal &, ModuleContext const &)=0
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
std::atomic< bool > returnCode_
Definition: Worker.h:137
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModule
std::atomic< std::size_t > counts_thrown_
Definition: Worker.h:102
std::atomic< std::size_t > counts_passed_
Definition: Worker.h:100
std::atomic< std::size_t > counts_failed_
Definition: Worker.h:101
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
Float_t e
Definition: plot.C:35
std::exception_ptr cached_exception_
Definition: Worker.h:134
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::Worker::endJob ( )
inherited

Definition at line 207 of file Worker.cc.

References art::Worker::actReg_, art::Worker::doEndJob(), art::Worker::md_, art::ActivityRegistry::sPostModuleEndJob, and art::ActivityRegistry::sPreModuleEndJob.

208  {
209  actReg_.sPreModuleEndJob.invoke(md_);
210  doEndJob();
211  actReg_.sPostModuleEndJob.invoke(md_);
212  }
213  catch (...) {
214  rethrow_with_context(std::current_exception(), md_, "endJob");
215  }
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
virtual void doEndJob()=0
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleEndJob
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleEndJob
string const & art::Worker::label ( ) const
protectedinherited

Definition at line 127 of file Worker.cc.

References art::Worker::md_, and art::ModuleDescription::moduleLabel().

Referenced by art::OutputWorker::closeFile(), art::OutputWorker::openFile(), and art::OutputWorker::OutputWorker().

128  {
129  return md_.moduleLabel();
130  }
std::string const & moduleLabel() const
ModuleDescription const md_
Definition: Worker.h:121
void art::Worker::reset ( )
inherited

Definition at line 149 of file Worker.cc.

References art::Worker::cached_exception_, art::Worker::Ready, art::Worker::returnCode_, art::Worker::scheduleID_, art::Worker::state_, TDEBUG_FUNC_SI, art::Worker::waitingTasks_, and art::Worker::workStarted_.

150  {
151  state_ = Ready;
152  cached_exception_ = std::exception_ptr{};
154  << hex << this << dec << " Resetting waitingTasks_";
155  waitingTasks_.reset();
156  workStarted_ = false;
157  returnCode_ = false;
158  }
std::atomic< int > state_
Definition: Worker.h:124
#define TDEBUG_FUNC_SI(LEVEL, SI)
ScheduleID const scheduleID_
Definition: Worker.h:120
std::atomic< bool > returnCode_
Definition: Worker.h:137
std::exception_ptr cached_exception_
Definition: Worker.h:134
hep::concurrency::WaitingTaskList waitingTasks_
Definition: Worker.h:143
std::atomic< bool > workStarted_
Definition: Worker.h:136
void art::Worker::respondToCloseInputFile ( FileBlock const &  fb)
inherited

Definition at line 226 of file Worker.cc.

References art::Worker::actReg_, art::Worker::doRespondToCloseInputFile(), art::Worker::md_, art::ActivityRegistry::sPostModuleRespondToCloseInputFile, and art::ActivityRegistry::sPreModuleRespondToCloseInputFile.

227  {
231  }
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToCloseInputFile
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
virtual void doRespondToCloseInputFile(FileBlock const &fb)=0
TFile fb("Li6.root")
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToCloseInputFile
void art::Worker::respondToCloseOutputFiles ( FileBlock const &  fb)
inherited

Definition at line 242 of file Worker.cc.

References art::Worker::actReg_, art::Worker::doRespondToCloseOutputFiles(), art::Worker::md_, art::ActivityRegistry::sPostModuleRespondToCloseOutputFiles, and art::ActivityRegistry::sPreModuleRespondToCloseOutputFiles.

243  {
247  }
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToCloseOutputFiles
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
TFile fb("Li6.root")
virtual void doRespondToCloseOutputFiles(FileBlock const &fb)=0
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToCloseOutputFiles
void art::Worker::respondToOpenInputFile ( FileBlock const &  fb)
inherited

Definition at line 218 of file Worker.cc.

References art::Worker::actReg_, art::Worker::doRespondToOpenInputFile(), art::Worker::md_, art::ActivityRegistry::sPostModuleRespondToOpenInputFile, and art::ActivityRegistry::sPreModuleRespondToOpenInputFile.

219  {
223  }
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToOpenInputFile
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToOpenInputFile
TFile fb("Li6.root")
virtual void doRespondToOpenInputFile(FileBlock const &fb)=0
void art::Worker::respondToOpenOutputFiles ( FileBlock const &  fb)
inherited

Definition at line 234 of file Worker.cc.

References art::Worker::actReg_, art::Worker::doRespondToOpenOutputFiles(), art::Worker::md_, art::ActivityRegistry::sPostModuleRespondToOpenOutputFiles, and art::ActivityRegistry::sPreModuleRespondToOpenOutputFiles.

235  {
239  }
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
TFile fb("Li6.root")
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToOpenOutputFiles
virtual void doRespondToOpenOutputFiles(FileBlock const &fb)=0
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToOpenOutputFiles
bool art::Worker::returnCode ( ) const
inherited

Definition at line 134 of file Worker.cc.

References art::Worker::returnCode_.

135  {
136  return returnCode_.load();
137  }
std::atomic< bool > returnCode_
Definition: Worker.h:137
void art::Worker::runWorker ( EventPrincipal p,
ModuleContext const &  mc 
)
inherited

Definition at line 466 of file Worker.cc.

References art::Worker::actions_, art::Worker::actReg_, art::errors::BadAlloc, art::errors::BadExceptionType, art::Worker::cached_exception_, art::Worker::counts_failed_, art::Worker::counts_passed_, art::Worker::counts_thrown_, art::Worker::doProcess(), art::Worker::ExceptionThrown, art::Worker::Fail, art::actions::FailModule, art::actions::FailPath, art::ActionTable::find(), art::actions::IgnoreCompletely, art::Worker::md_, art::ModuleContext::onEndPath(), art::errors::OtherArt, art::Worker::Pass, art::Worker::returnCode_, art::ModuleContext::scheduleID(), art::actions::SkipEvent, art::ActivityRegistry::sPostModule, art::ActivityRegistry::sPreModule, art::Worker::state_, art::errors::StdException, TDEBUG_BEGIN_TASK_SI, TDEBUG_END_TASK_SI, art::errors::Unknown, art::Worker::waitingTasks_, and art::Worker::Working.

Referenced by art::Worker::isUnique().

467  {
468  auto const sid = mc.scheduleID();
469  TDEBUG_BEGIN_TASK_SI(4, sid);
470  returnCode_ = false;
471  try {
472  // Transition from Ready state to Working state.
473  state_ = Working;
474  actReg_.sPreModule.invoke(mc);
475  // Note: Only filters ever return false, and when they do it
476  // means they have rejected.
477  returnCode_ = doProcess(p, mc);
478  actReg_.sPostModule.invoke(mc);
479  state_ = Fail;
480  if (returnCode_.load()) {
481  state_ = Pass;
482  }
483  }
484  catch (cet::exception& e) {
485  auto action = actions_.find(e.root_cause());
486  // If we are processing an endPath, treat SkipEvent or FailPath
487  // as FailModule, so any subsequent OutputModules are still run.
488  if (mc.onEndPath()) {
489  if ((action == actions::SkipEvent) || (action == actions::FailPath)) {
490  action = actions::FailModule;
491  }
492  }
493  if (action == actions::IgnoreCompletely) {
494  state_ = Pass;
495  returnCode_ = true;
496  ++counts_passed_;
497  mf::LogWarning("IgnoreCompletely") << "Module ignored an exception\n"
498  << e.what();
499  // WARNING: We will continue execution below!!!
500  } else if (action == actions::FailModule) {
501  state_ = Fail;
502  returnCode_ = true;
503  ++counts_failed_;
504  mf::LogWarning("FailModule") << "Module failed due to an exception\n"
505  << e.what();
506  // WARNING: We will continue execution below!!!
507  } else {
509  ++counts_thrown_;
510  e << "The above exception was thrown while processing module "
511  << brief_context(md_, p);
512  if (auto art_ex = dynamic_cast<Exception*>(&e)) {
513  cached_exception_ = make_exception_ptr(*art_ex);
514  } else {
516  make_exception_ptr(Exception{errors::OtherArt, string(), e});
517  }
518  waitingTasks_.doneWaiting(cached_exception_);
519  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
520  return;
521  }
522  }
523  catch (bad_alloc const& bda) {
525  ++counts_thrown_;
526  auto art_ex =
528  << "A bad_alloc exception was thrown while processing module "
529  << brief_context(md_, p) << '\n'
530  << "The job has probably exhausted the virtual memory available to the "
531  "process.\n";
532  cached_exception_ = make_exception_ptr(art_ex);
533  waitingTasks_.doneWaiting(cached_exception_);
534  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
535  return;
536  }
537  catch (exception const& e) {
539  ++counts_thrown_;
540  auto art_ex = Exception{errors::StdException}
541  << "An exception was thrown while processing module "
542  << brief_context(md_, p) << '\n'
543  << e.what();
544  cached_exception_ = make_exception_ptr(art_ex);
545  waitingTasks_.doneWaiting(cached_exception_);
546  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
547  return;
548  }
549  catch (string const& s) {
551  ++counts_thrown_;
552  auto art_ex =
554  << "A string was thrown as an exception while processing module "
555  << brief_context(md_, p) << '\n'
556  << s << '\n';
557  cached_exception_ = make_exception_ptr(art_ex);
558  waitingTasks_.doneWaiting(cached_exception_);
559  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
560  return;
561  }
562  catch (char const* c) {
564  ++counts_thrown_;
565  auto art_ex =
566  Exception{errors::BadExceptionType, "char const*"}
567  << "A char const* was thrown as an exception while processing module "
568  << brief_context(md_, p) << '\n'
569  << c << '\n';
570  cached_exception_ = make_exception_ptr(art_ex);
571  waitingTasks_.doneWaiting(cached_exception_);
572  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
573  return;
574  }
575  catch (...) {
576  ++counts_thrown_;
578  auto art_ex =
579  Exception{errors::Unknown, "repeated"}
580  << "An unknown exception was thrown while processing module "
581  << brief_context(md_, p) << '\n';
582  cached_exception_ = make_exception_ptr(art_ex);
583  waitingTasks_.doneWaiting(cached_exception_);
584  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
585  return;
586  }
587  waitingTasks_.doneWaiting(exception_ptr{});
588  TDEBUG_END_TASK_SI(4, sid);
589  }
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
actions::ActionCodes find(std::string const &category) const
Definition: Actions.cc:58
#define TDEBUG_END_TASK_SI(LEVEL, SI)
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModule
ActionTable const & actions_
Definition: Worker.h:122
std::atomic< int > state_
Definition: Worker.h:124
virtual bool doProcess(EventPrincipal &, ModuleContext const &)=0
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
std::atomic< bool > returnCode_
Definition: Worker.h:137
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModule
std::atomic< std::size_t > counts_thrown_
Definition: Worker.h:102
std::atomic< std::size_t > counts_passed_
Definition: Worker.h:100
std::atomic< std::size_t > counts_failed_
Definition: Worker.h:101
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
Float_t e
Definition: plot.C:35
std::exception_ptr cached_exception_
Definition: Worker.h:134
hep::concurrency::WaitingTaskList waitingTasks_
Definition: Worker.h:143
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
ScheduleID art::Worker::scheduleID ( ) const
inlineinherited

Definition at line 70 of file Worker.h.

71  {
72  return scheduleID_;
73  }
ScheduleID const scheduleID_
Definition: Worker.h:120
SerialTaskQueueChain * art::Worker::serialTaskQueueChain ( ) const
inherited

Definition at line 140 of file Worker.cc.

References art::Worker::doSerialTaskQueueChain().

Referenced by art::Worker::isUnique().

141  {
142  return doSerialTaskQueueChain();
143  }
virtual hep::concurrency::SerialTaskQueueChain * doSerialTaskQueueChain() const =0
size_t art::Worker::timesExcept ( ) const
inherited

Definition at line 190 of file Worker.cc.

References art::Worker::counts_thrown_.

191  {
192  return counts_thrown_.load();
193  }
std::atomic< std::size_t > counts_thrown_
Definition: Worker.h:102
size_t art::Worker::timesFailed ( ) const
inherited

Definition at line 183 of file Worker.cc.

References art::Worker::counts_failed_.

184  {
185  return counts_failed_.load();
186  }
std::atomic< std::size_t > counts_failed_
Definition: Worker.h:101
size_t art::Worker::timesPassed ( ) const
inherited

Definition at line 176 of file Worker.cc.

References art::Worker::counts_passed_.

177  {
178  return counts_passed_.load();
179  }
std::atomic< std::size_t > counts_passed_
Definition: Worker.h:100
size_t art::Worker::timesRun ( ) const
inherited

Definition at line 169 of file Worker.cc.

References art::Worker::counts_run_.

170  {
171  return counts_run_.load();
172  }
std::atomic< std::size_t > counts_run_
Definition: Worker.h:99
size_t art::Worker::timesVisited ( ) const
inherited

Definition at line 162 of file Worker.cc.

References art::Worker::counts_visited_.

163  {
164  return counts_visited_.load();
165  }
std::atomic< std::size_t > counts_visited_
Definition: Worker.h:98

Member Data Documentation

std::atomic<std::size_t> art::Worker::counts_failed_ {}
protectedinherited
std::atomic<std::size_t> art::Worker::counts_passed_ {}
protectedinherited
std::atomic<std::size_t> art::Worker::counts_run_ {}
protectedinherited
std::atomic<std::size_t> art::Worker::counts_thrown_ {}
protectedinherited
std::atomic<std::size_t> art::Worker::counts_visited_ {}
protectedinherited

The documentation for this class was generated from the following file: