LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
Worker.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
15 #include "cetlib_except/exception.h"
16 #include "hep_concurrency/SerialTaskQueueChain.h"
17 #include "hep_concurrency/WaitingTask.h"
18 #include "hep_concurrency/WaitingTaskList.h"
20 
21 #include <atomic>
22 #include <cassert>
23 #include <exception>
24 #include <memory>
25 #include <sstream>
26 
27 using namespace hep::concurrency;
28 using namespace std;
29 
30 using mf::LogError;
31 
32 namespace {
33  std::string
34  brief_context(art::ModuleDescription const& md)
35  {
36  return md.moduleName() + "/" + md.moduleLabel();
37  }
38 
39  std::string
40  brief_context(art::ModuleDescription const& md,
41  art::Principal const& principal)
42  {
43  std::ostringstream result;
44  result << brief_context(md) << ' ';
45  auto const bt = principal.branchType();
46  switch (bt) {
47  case art::InRun:
48  result << static_cast<art::RunPrincipal const&>(principal).runID();
49  break;
50  case art::InSubRun:
51  result << static_cast<art::SubRunPrincipal const&>(principal).subRunID();
52  break;
53  case art::InEvent:
54  result << static_cast<art::EventPrincipal const&>(principal).eventID();
55  break;
56  default: {
57  }
58  }
59  return result.str();
60  }
61 
62  [[noreturn]] void
63  rethrow_with_context(std::exception_ptr eptr,
64  art::ModuleDescription const& md,
65  std::string const& transition)
66  {
67  using namespace art;
68  assert(eptr);
69  std::string const brief_module_context =
70  brief_context(md) + " during " + transition;
71  try {
72  rethrow_exception(eptr);
73  }
74  catch (cet::exception& e) {
76  "An exception was thrown while processing module " +
77  brief_module_context,
78  e};
79  }
80  catch (exception& e) {
81  throw Exception{errors::StdException, brief_module_context} << e.what();
82  }
83  catch (string& s) {
85  "A string exception was thrown while processing module " +
86  brief_module_context}
87  << s << '\n';
88  }
89  catch (char const* c) {
90  throw Exception{
92  "A char const* exception was thrown while processing module " +
93  brief_module_context}
94  << c << '\n';
95  }
96  catch (...) {
98  << "An unknown exception was thrown while processing module " +
99  brief_module_context;
100  }
101  }
102 }
103 
104 namespace art {
105 
106  Worker::~Worker() = default;
107 
108  Worker::Worker(ModuleDescription const& md, WorkerParams const& wp)
109  : scheduleID_{wp.scheduleID_}
110  , md_{md}
111  , actions_{wp.actions_}
112  , actReg_{wp.actReg_}
113  , waitingTasks_{wp.taskGroup_}
114  {
115  TDEBUG_FUNC_SI(5, wp.scheduleID_)
116  << hex << this << dec << " name: " << md.moduleName()
117  << " label: " << md.moduleLabel();
118  }
119 
120  ModuleDescription const&
122  {
123  return md_;
124  }
125 
126  string const&
128  {
129  return md_.moduleLabel();
130  }
131 
132  // Used only by WorkerInPath.
133  bool
135  {
136  return returnCode_.load();
137  }
138 
139  SerialTaskQueueChain*
141  {
142  return doSerialTaskQueueChain();
143  }
144 
145  // Used by EventProcessor
146  // Used by Schedule
147  // Used by EndPathExecutor
148  void
150  {
151  state_ = Ready;
152  cached_exception_ = std::exception_ptr{};
154  << hex << this << dec << " Resetting waitingTasks_";
155  waitingTasks_.reset();
156  workStarted_ = false;
157  returnCode_ = false;
158  }
159 
160  // Used only by writeSummary
161  size_t
163  {
164  return counts_visited_.load();
165  }
166 
167  // Used only by writeSummary
168  size_t
170  {
171  return counts_run_.load();
172  }
173 
174  // Used only by writeSummary
175  size_t
177  {
178  return counts_passed_.load();
179  }
180 
181  // Used only by writeSummary
182  size_t
184  {
185  return counts_failed_.load();
186  }
187 
188  // Used only by writeSummary
189  size_t
191  {
192  return counts_thrown_.load();
193  }
194 
195  void
197  try {
199  doBeginJob(resources);
201  }
202  catch (...) {
203  rethrow_with_context(std::current_exception(), md_, "beginJob");
204  }
205 
206  void
208  try {
209  actReg_.sPreModuleEndJob.invoke(md_);
210  doEndJob();
211  actReg_.sPostModuleEndJob.invoke(md_);
212  }
213  catch (...) {
214  rethrow_with_context(std::current_exception(), md_, "endJob");
215  }
216 
217  void
219  {
223  }
224 
225  void
227  {
231  }
232 
233  void
235  {
239  }
240 
241  void
243  {
247  }
248 
249  void
251  Principal& principal,
252  ModuleContext const& mc)
253  {
254  switch (state_.load()) {
255  case Ready:
256  break;
257  case Pass:
258  case Fail:
259  return;
260  case ExceptionThrown: {
261  // Rethrow the cached exception again. It seems impossible to
262  // get here a second time unless a cet::exception has been
263  // thrown previously.
264  mf::LogWarning("repeat") << "A module has been invoked a second time "
265  "even though it caught an exception during "
266  "the previous invocation.\nThis may be an "
267  "indication of a configuration problem.";
268  rethrow_exception(cached_exception_);
269  }
270  case Working:
271  break; // See below.
272  }
273 
274  try {
275  if (state_.load() == Working) {
276  // Not part of the switch statement above because we want the
277  // exception to be caught by our handling mechanism.
279  << "A Module has been invoked while it is still being executed.\n"
280  << "Product dependencies have invoked a module execution cycle.\n";
281  }
282  state_ = Working;
283  if (trans == Transition::BeginRun) {
284  actReg_.sPreModuleBeginRun.invoke(mc);
285  doBegin(dynamic_cast<RunPrincipal&>(principal), mc);
286  actReg_.sPostModuleBeginRun.invoke(mc);
287  } else if (trans == Transition::EndRun) {
288  actReg_.sPreModuleEndRun.invoke(mc);
289  doEnd(dynamic_cast<RunPrincipal&>(principal), mc);
290  actReg_.sPostModuleEndRun.invoke(mc);
291  } else if (trans == Transition::BeginSubRun) {
292  actReg_.sPreModuleBeginSubRun.invoke(mc);
293  doBegin(dynamic_cast<SubRunPrincipal&>(principal), mc);
294  actReg_.sPostModuleBeginSubRun.invoke(mc);
295  } else if (trans == Transition::EndSubRun) {
296  actReg_.sPreModuleEndSubRun.invoke(mc);
297  doEnd(dynamic_cast<SubRunPrincipal&>(principal), mc);
298  actReg_.sPostModuleEndSubRun.invoke(mc);
299  }
300  state_ = Pass;
301  }
302  catch (cet::exception& e) {
304  e << "The above exception was thrown while processing module "
305  << brief_context(md_, principal) << '\n';
306  if (auto edmEx = dynamic_cast<Exception*>(&e)) {
307  cached_exception_ = std::make_exception_ptr(*edmEx);
308  } else {
309  auto art_ex = Exception{errors::OtherArt, std::string(), e};
310  cached_exception_ = std::make_exception_ptr(art_ex);
311  }
312  throw;
313  }
314  catch (std::bad_alloc const& bda) {
316  auto art_ex =
318  << "A bad_alloc exception occurred during a call to the module "
319  << brief_context(md_, principal) << '\n'
320  << "The job has probably exhausted the virtual memory available to the "
321  "process.\n";
322  cached_exception_ = make_exception_ptr(art_ex);
323  rethrow_exception(cached_exception_);
324  }
325  catch (std::exception const& e) {
327  auto art_ex = Exception{errors::StdException}
328  << "An exception occurred during a call to the module "
329  << brief_context(md_, principal) << e.what();
330  cached_exception_ = make_exception_ptr(art_ex);
331  rethrow_exception(cached_exception_);
332  }
333  catch (std::string const& s) {
335  auto art_ex = Exception{errors::BadExceptionType, "string"}
336  << "A string thrown as an exception occurred during a call "
337  "to the module "
338  << brief_context(md_, principal) << '\n'
339  << s << '\n';
340  cached_exception_ = make_exception_ptr(art_ex);
341  rethrow_exception(cached_exception_);
342  }
343  catch (char const* c) {
345  auto art_ex = Exception{errors::BadExceptionType, "char const*"}
346  << "A char const* thrown as an exception occurred during a "
347  "call to the module "
348  << brief_context(md_, principal) << '\n'
349  << c << '\n';
350  cached_exception_ = make_exception_ptr(art_ex);
351  rethrow_exception(cached_exception_);
352  }
353  catch (...) {
355  auto art_ex =
356  Exception{errors::Unknown, "repeated"}
357  << "An unknown occurred during a previous call to the module "
358  << brief_context(md_, principal) << '\n';
359  cached_exception_ = make_exception_ptr(art_ex);
360  rethrow_exception(cached_exception_);
361  }
362  }
363 
364  // This is used only to do trigger results insertion.
365  void
367  try {
368  ++counts_visited_;
369  returnCode_ = false;
370  // Transition from Ready state to Working state.
371  state_ = Working;
372  actReg_.sPreModule.invoke(mc);
373  // Note: Only the TriggerResults inserter--a producer--calls this
374  // function. The return code is thus always true.
375  returnCode_ = doProcess(p, mc);
376  actReg_.sPostModule.invoke(mc);
377  assert(returnCode_.load());
378  state_ = Pass;
379  }
380  catch (cet::exception& e) {
381  auto action = actions_.find(e.root_cause());
382  assert(not mc.onEndPath());
383  if (action == actions::IgnoreCompletely) {
384  state_ = Pass;
385  returnCode_ = true;
386  ++counts_passed_;
387  mf::LogWarning("IgnoreCompletely") << "Module ignored an exception\n"
388  << e.what();
389  // WARNING: We will continue execution below!!!
390  } else if (action == actions::FailModule) {
391  state_ = Fail;
392  returnCode_ = true;
393  ++counts_failed_;
394  mf::LogWarning("FailModule") << "Module failed due to an exception\n"
395  << e.what();
396  // WARNING: We will continue execution below!!!
397  } else {
399  ++counts_thrown_;
400  e << "The above exception was thrown while processing module "
401  << brief_context(md_, p) << '\n';
402  if (auto edmEx = dynamic_cast<Exception*>(&e)) {
403  cached_exception_ = make_exception_ptr(*edmEx);
404  } else {
406  make_exception_ptr(Exception{errors::OtherArt, string(), e});
407  }
408  rethrow_exception(cached_exception_);
409  }
410  }
411  catch (bad_alloc const& bda) {
413  ++counts_thrown_;
414  auto art_ex =
416  << "A bad_alloc exception occurred during a call to the module "
417  << brief_context(md_, p) << '\n'
418  << "The job has probably exhausted the virtual memory available to the "
419  "process.\n";
420  cached_exception_ = make_exception_ptr(art_ex);
421  rethrow_exception(cached_exception_);
422  }
423  catch (exception const& e) {
425  ++counts_thrown_;
426  auto art_ex = Exception{errors::StdException}
427  << "An exception occurred during a call to the module "
428  << brief_context(md_, p) << '\n'
429  << e.what();
430  cached_exception_ = make_exception_ptr(art_ex);
431  rethrow_exception(cached_exception_);
432  }
433  catch (string const& s) {
435  ++counts_thrown_;
436  auto art_ex = Exception{errors::BadExceptionType, "string"}
437  << "A string thrown as an exception occurred during a call "
438  "to the module "
439  << brief_context(md_, p) << '\n'
440  << s << '\n';
441  cached_exception_ = make_exception_ptr(art_ex);
442  rethrow_exception(cached_exception_);
443  }
444  catch (char const* c) {
446  ++counts_thrown_;
447  auto art_ex = Exception{errors::BadExceptionType, "char const*"}
448  << "A char const* thrown as an exception occurred during a "
449  "call to the module "
450  << brief_context(md_, p) << '\n'
451  << c << '\n';
452  cached_exception_ = make_exception_ptr(art_ex);
453  rethrow_exception(cached_exception_);
454  }
455  catch (...) {
456  ++counts_thrown_;
458  auto art_ex = Exception{errors::Unknown, "repeated"}
459  << "An unknown occurred during a previous call to the module "
460  << brief_context(md_, p) << '\n';
461  cached_exception_ = make_exception_ptr(art_ex);
462  rethrow_exception(cached_exception_);
463  }
464 
465  void
467  {
468  auto const sid = mc.scheduleID();
469  TDEBUG_BEGIN_TASK_SI(4, sid);
470  returnCode_ = false;
471  try {
472  // Transition from Ready state to Working state.
473  state_ = Working;
474  actReg_.sPreModule.invoke(mc);
475  // Note: Only filters ever return false, and when they do it
476  // means they have rejected.
477  returnCode_ = doProcess(p, mc);
478  actReg_.sPostModule.invoke(mc);
479  state_ = Fail;
480  if (returnCode_.load()) {
481  state_ = Pass;
482  }
483  }
484  catch (cet::exception& e) {
485  auto action = actions_.find(e.root_cause());
486  // If we are processing an endPath, treat SkipEvent or FailPath
487  // as FailModule, so any subsequent OutputModules are still run.
488  if (mc.onEndPath()) {
489  if ((action == actions::SkipEvent) || (action == actions::FailPath)) {
490  action = actions::FailModule;
491  }
492  }
493  if (action == actions::IgnoreCompletely) {
494  state_ = Pass;
495  returnCode_ = true;
496  ++counts_passed_;
497  mf::LogWarning("IgnoreCompletely") << "Module ignored an exception\n"
498  << e.what();
499  // WARNING: We will continue execution below!!!
500  } else if (action == actions::FailModule) {
501  state_ = Fail;
502  returnCode_ = true;
503  ++counts_failed_;
504  mf::LogWarning("FailModule") << "Module failed due to an exception\n"
505  << e.what();
506  // WARNING: We will continue execution below!!!
507  } else {
509  ++counts_thrown_;
510  e << "The above exception was thrown while processing module "
511  << brief_context(md_, p);
512  if (auto art_ex = dynamic_cast<Exception*>(&e)) {
513  cached_exception_ = make_exception_ptr(*art_ex);
514  } else {
516  make_exception_ptr(Exception{errors::OtherArt, string(), e});
517  }
518  waitingTasks_.doneWaiting(cached_exception_);
519  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
520  return;
521  }
522  }
523  catch (bad_alloc const& bda) {
525  ++counts_thrown_;
526  auto art_ex =
528  << "A bad_alloc exception was thrown while processing module "
529  << brief_context(md_, p) << '\n'
530  << "The job has probably exhausted the virtual memory available to the "
531  "process.\n";
532  cached_exception_ = make_exception_ptr(art_ex);
533  waitingTasks_.doneWaiting(cached_exception_);
534  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
535  return;
536  }
537  catch (exception const& e) {
539  ++counts_thrown_;
540  auto art_ex = Exception{errors::StdException}
541  << "An exception was thrown while processing module "
542  << brief_context(md_, p) << '\n'
543  << e.what();
544  cached_exception_ = make_exception_ptr(art_ex);
545  waitingTasks_.doneWaiting(cached_exception_);
546  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
547  return;
548  }
549  catch (string const& s) {
551  ++counts_thrown_;
552  auto art_ex =
554  << "A string was thrown as an exception while processing module "
555  << brief_context(md_, p) << '\n'
556  << s << '\n';
557  cached_exception_ = make_exception_ptr(art_ex);
558  waitingTasks_.doneWaiting(cached_exception_);
559  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
560  return;
561  }
562  catch (char const* c) {
564  ++counts_thrown_;
565  auto art_ex =
566  Exception{errors::BadExceptionType, "char const*"}
567  << "A char const* was thrown as an exception while processing module "
568  << brief_context(md_, p) << '\n'
569  << c << '\n';
570  cached_exception_ = make_exception_ptr(art_ex);
571  waitingTasks_.doneWaiting(cached_exception_);
572  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
573  return;
574  }
575  catch (...) {
576  ++counts_thrown_;
578  auto art_ex =
579  Exception{errors::Unknown, "repeated"}
580  << "An unknown exception was thrown while processing module "
581  << brief_context(md_, p) << '\n';
582  cached_exception_ = make_exception_ptr(art_ex);
583  waitingTasks_.doneWaiting(cached_exception_);
584  TDEBUG_END_TASK_SI(4, sid) << "because of EXCEPTION";
585  return;
586  }
587  waitingTasks_.doneWaiting(exception_ptr{});
588  TDEBUG_END_TASK_SI(4, sid);
589  }
590 
591  bool
593  {
594  if (scheduleID_ == ScheduleID::first()) {
595  return true;
596  }
598  }
599 
600  void
601  Worker::doWork_event(WaitingTaskPtr workerInPathDoneTask,
602  EventPrincipal& p,
603  ModuleContext const& mc)
604  {
605  auto const sid = mc.scheduleID();
606  TDEBUG_BEGIN_FUNC_SI(4, sid);
607  // Note: We actually can have more than one entry in this list
608  // because a worker may be one more than one path, and if both
609  // paths are running in parallel, then it is possible that they
610  // both attempt to run the same worker at nearly the same time.
611  // We arrange so that the worker itself only runs once, but we do
612  // have to notify all the paths that the worker has finished,
613  // hence the waiting list of notification tasks.
614  //
615  // Note: threading: More than one task can enter here in the case
616  // that paths running in parallel share the same worker.
617  waitingTasks_.add(workerInPathDoneTask);
618  ++counts_visited_;
619  bool expected = false;
620  if (workStarted_.compare_exchange_strong(expected, true)) {
621  if (auto chain = serialTaskQueueChain()) {
622  // Must be a serialized shared module (including legacy).
623  TDEBUG_FUNC_SI(4, sid) << "pushing onto chain " << hex << chain << dec;
624  chain->push([&p, &mc, this] { runWorker(p, mc); });
625  TDEBUG_END_FUNC_SI(4, sid);
626  return;
627  }
628  // Must be a replicated or shared module with no serialization.
629  TDEBUG_FUNC_SI(4, sid) << "calling worker functor";
630  runWorker(p, mc);
631  TDEBUG_END_FUNC_SI(4, sid);
632  return;
633  }
634  // Worker is running on another path, exit without running the waiting
635  // worker done tasks.
636  TDEBUG_END_FUNC_SI(4, sid) << "work already in progress on another path";
637  }
638 
639 } // namespace art
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
void endJob()
Definition: Worker.cc:207
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleBeginRun
std::size_t timesExcept() const
Definition: Worker.cc:190
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleEndRun
void respondToOpenOutputFiles(FileBlock const &fb)
Definition: Worker.cc:234
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleBeginJob
bool onEndPath() const
Definition: ModuleContext.h:53
void respondToCloseOutputFiles(FileBlock const &fb)
Definition: Worker.cc:242
actions::ActionCodes find(std::string const &category) const
Definition: Actions.cc:58
bool isUnique() const
Definition: Worker.cc:592
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToCloseOutputFiles
#define TDEBUG_END_TASK_SI(LEVEL, SI)
std::size_t timesPassed() const
Definition: Worker.cc:176
std::string const & moduleLabel() const
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToCloseInputFile
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModule
auto scheduleID() const
Definition: ModuleContext.h:28
std::string const & label() const
Definition: Worker.cc:127
static constexpr ScheduleID first()
Definition: ScheduleID.h:50
STL namespace.
virtual void doBeginJob(detail::SharedResources const &resources)=0
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
void reset()
Definition: Worker.cc:149
ActionTable const & actions_
Definition: Worker.h:122
std::size_t timesRun() const
Definition: Worker.cc:169
std::atomic< std::size_t > counts_visited_
Definition: Worker.h:98
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleEndRun
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleEndSubRun
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleBeginJob
std::atomic< int > state_
Definition: Worker.h:124
std::size_t timesVisited() const
Definition: Worker.cc:162
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToOpenInputFile
std::string const & moduleName() const
Transition
Definition: Transition.h:7
virtual bool doProcess(EventPrincipal &, ModuleContext const &)=0
ActivityRegistry const & actReg_
Definition: Worker.h:123
void runWorker(EventPrincipal &, ModuleContext const &)
Definition: Worker.cc:466
ModuleDescription const md_
Definition: Worker.h:121
ScheduleID scheduleID_
Definition: WorkerParams.h:29
virtual void doRespondToCloseInputFile(FileBlock const &fb)=0
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleBeginRun
virtual void doEndJob()=0
#define TDEBUG_FUNC_SI(LEVEL, SI)
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToOpenInputFile
ScheduleID const scheduleID_
Definition: Worker.h:120
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleEndJob
TFile fb("Li6.root")
void respondToOpenInputFile(FileBlock const &fb)
Definition: Worker.cc:218
virtual void doBegin(RunPrincipal &rp, ModuleContext const &mc)=0
std::atomic< bool > returnCode_
Definition: Worker.h:137
std::atomic< std::size_t > counts_run_
Definition: Worker.h:99
ModuleDescription const & description() const
Definition: Worker.cc:121
ModuleThreadingType moduleThreadingType() const
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModule
std::atomic< std::size_t > counts_thrown_
Definition: Worker.h:102
std::atomic< std::size_t > counts_passed_
Definition: Worker.h:100
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToOpenOutputFiles
virtual void doRespondToCloseOutputFiles(FileBlock const &fb)=0
virtual void doEnd(RunPrincipal &rp, ModuleContext const &mc)=0
std::atomic< std::size_t > counts_failed_
Definition: Worker.h:101
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
virtual void doRespondToOpenOutputFiles(FileBlock const &fb)=0
virtual hep::concurrency::SerialTaskQueueChain * doSerialTaskQueueChain() const =0
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
Definition: MVAAlg.h:12
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleEndJob
void doWork(Transition, Principal &, ModuleContext const &)
Definition: Worker.cc:250
void respondToCloseInputFile(FileBlock const &fb)
Definition: Worker.cc:226
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleEndSubRun
void beginJob(detail::SharedResources const &)
Definition: Worker.cc:196
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleContext const &)> sPostModuleBeginSubRun
Float_t e
Definition: plot.C:35
virtual void doRespondToOpenInputFile(FileBlock const &fb)=0
hep::concurrency::SerialTaskQueueChain * serialTaskQueueChain() const
Definition: Worker.cc:140
std::exception_ptr cached_exception_
Definition: Worker.h:134
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleContext const &)> sPreModuleBeginSubRun
hep::concurrency::WaitingTaskList waitingTasks_
Definition: Worker.h:143
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToCloseOutputFiles
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleRespondToOpenOutputFiles
void doWork_event(hep::concurrency::WaitingTaskPtr workerInPathDoneTask, EventPrincipal &, ModuleContext const &)
bool returnCode() const
Definition: Worker.cc:134
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleRespondToCloseInputFile
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
BranchType branchType() const
Definition: Principal.cc:807
std::atomic< bool > workStarted_
Definition: Worker.h:136
std::size_t timesFailed() const
Definition: Worker.cc:183