LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
Path.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
12 #include "art/Utilities/Globals.h"
19 #include "hep_concurrency/WaitingTask.h"
21 
22 #include <cstddef>
23 #include <exception>
24 #include <string>
25 #include <vector>
26 
27 using namespace hep::concurrency;
28 using namespace std;
29 
30 namespace art {
31 
32  Path::Path(ActionTable const& actions,
33  ActivityRegistry const& actReg,
34  PathContext const& pc,
35  vector<WorkerInPath>&& workers,
36  HLTGlobalStatus* pathResults,
37  GlobalTaskGroup& taskGroup) noexcept
38  : actionTable_{actions}
39  , actReg_{actReg}
40  , pc_{pc}
41  , pathPosition_{ServiceHandle<TriggerNamesService>()->index_for(
42  pc_.pathID())}
43  , workers_{std::move(workers)}
44  , trptr_{pathResults}
45  , taskGroup_{taskGroup}
46  {
47  TDEBUG_FUNC_SI(4, pc_.scheduleID()) << hex << this << dec;
48  }
49 
51  Path::scheduleID() const
52  {
53  return pc_.scheduleID();
54  }
55 
56  PathSpec const&
57  Path::pathSpec() const
58  {
59  return pc_.pathSpec();
60  }
61 
62  PathID
63  Path::pathID() const
64  {
65  return pc_.pathID();
66  }
67 
68  string const&
69  Path::name() const
70  {
71  return pc_.pathName();
72  }
73 
74  size_t
75  Path::timesRun() const
76  {
77  return timesRun_;
78  }
79 
80  size_t
81  Path::timesPassed() const
82  {
83  return timesPassed_;
84  }
85 
86  size_t
87  Path::timesFailed() const
88  {
89  return timesFailed_;
90  }
91 
92  size_t
93  Path::timesExcept() const
94  {
95  return timesExcept_;
96  }
97 
99  Path::state() const
100  {
101  return state_;
102  }
103 
104  vector<WorkerInPath> const&
105  Path::workersInPath() const
106  {
107  return workers_;
108  }
109 
110  void
111  Path::process(Transition const trans, Principal& principal)
112  {
113  // Invoke pre-path signals only for the first schedule.
114  if (pc_.scheduleID() == ScheduleID::first()) {
115  switch (trans) {
116  case Transition::BeginRun:
117  actReg_.sPrePathBeginRun.invoke(name());
118  break;
119  case Transition::EndRun:
120  actReg_.sPrePathEndRun.invoke(name());
121  break;
122  case Transition::BeginSubRun:
123  actReg_.sPrePathBeginSubRun.invoke(name());
124  break;
125  case Transition::EndSubRun:
126  actReg_.sPrePathEndSubRun.invoke(name());
127  break;
128  default: {
129  } // No other pre-path signals supported.
130  }
131  }
132  state_ = hlt::Ready;
133  std::size_t idx = 0;
134  bool all_passed{false};
135  for (WorkerInPath& wip : workers_) {
136  // We do not want to call (e.g.) beginRun once per schedule for
137  // non-replicated modules.
138  if (not wip.getWorker()->isUnique()) {
139  continue;
140  }
141  try {
142  all_passed = wip.run(trans, principal);
143  if (!all_passed)
144  break;
145  }
146  catch (cet::exception& e) {
147  state_ = hlt::Exception;
148  throw Exception{
149  errors::ScheduleExecutionFailure, "Path: ProcessingStopped.", e}
150  << "Exception going through path " << name() << '\n';
151  }
152  catch (...) {
153  mf::LogError("PassingThrough")
154  << "Exception passing through path " << name();
155  state_ = hlt::Exception;
156  throw;
157  }
158  ++idx;
159  }
160  if (all_passed) {
161  state_ = hlt::Pass;
162  } else {
163  state_ = hlt::Fail;
164  }
165  // Invoke post-path signals only for the last schedule.
166  if (pc_.scheduleID().id() == Globals::instance()->nschedules() - 1) {
167  HLTPathStatus const status(state_, idx);
168  switch (trans) {
169  case Transition::BeginRun:
170  actReg_.sPostPathBeginRun.invoke(name(), status);
171  break;
172  case Transition::EndRun:
173  actReg_.sPostPathEndRun.invoke(name(), status);
174  break;
175  case Transition::BeginSubRun:
176  actReg_.sPostPathBeginSubRun.invoke(name(), status);
177  break;
178  case Transition::EndSubRun:
179  actReg_.sPostPathEndSubRun.invoke(name(), status);
180  break;
181  default: {
182  } // No other post-path signals supported.
183  }
184  }
185  }
186 
187  void
188  Path::process(WaitingTaskPtr pathsDoneTask, EventPrincipal& ep)
189  {
190  // We come here as part of the readAndProcessEvent task (schedule
191  // head task), or as part of the endPath task.
192  auto const sid = pc_.scheduleID();
193  TDEBUG_BEGIN_FUNC_SI(4, sid);
194  TDEBUG_FUNC_SI(6, sid) << hex << this << dec << " Resetting waitingTasks_";
195 
196  // Make sure the list is not auto-spawning tasks.
197  actReg_.sPreProcessPath.invoke(pc_);
198  ++timesRun_;
199  state_ = hlt::Ready;
200  size_t idx = 0;
201  auto max_idx = workers_.size();
202  // Start the task spawn chain going with the first worker on the
203  // path. Each worker will spawn the next worker in order, until
204  // all the workers have run.
205  process_event_idx_asynch(idx, max_idx, ep, pathsDoneTask);
206  TDEBUG_END_FUNC_SI(4, sid);
207  }
208 
209  void
210  Path::runWorkerTask(size_t const idx,
211  size_t const max_idx,
212  EventPrincipal& ep,
213  WaitingTaskPtr pathsDone)
214  {
215  auto const sid = pc_.scheduleID();
216  TDEBUG_BEGIN_TASK_SI(4, sid);
217  try {
218  process_event_idx(idx, max_idx, ep, pathsDone);
219  TDEBUG_END_TASK_SI(4, sid);
220  }
221  catch (...) {
222  taskGroup_.may_run(pathsDone, current_exception());
223  TDEBUG_END_TASK_SI(4, sid) << "path terminate because of EXCEPTION";
224  }
225  }
226 
227  // This function is a spawn chain system to run workers one at a time,
228  // in the order specified on the path, and then decrement the ref count
229  // on the endPathsTask when finished (which causes it to run if we are
230  // the last path to finish running its workers).
231  void
232  Path::process_event_idx_asynch(size_t const idx,
233  size_t const max_idx,
234  EventPrincipal& ep,
235  WaitingTaskPtr pathsDone)
236  {
237  auto const sid = pc_.scheduleID();
238  TDEBUG_BEGIN_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
239  taskGroup_.run([this, idx, max_idx, &ep, pathsDone] {
240  runWorkerTask(idx, max_idx, ep, pathsDone);
241  });
242  TDEBUG_END_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
243  }
244 
246  public:
248  size_t const idx,
249  size_t const max_idx,
250  EventPrincipal& ep,
251  WaitingTaskPtr pathsDone,
252  GlobalTaskGroup& group)
253  : path_{path}
254  , idx_{idx}
255  , max_idx_{max_idx}
256  , ep_{ep}
257  , pathsDone_{pathsDone}
258  , group_{group}
259  {}
260  void
261  operator()(exception_ptr ex)
262  {
263  auto const sid = path_->pc_.scheduleID();
264  TDEBUG_BEGIN_TASK_SI(4, sid);
265  auto& workerInPath = path_->workers_[idx_];
266  // Note: This will only be set false by a filter which has rejected.
267  bool new_should_continue = workerInPath.returnCode();
268  TDEBUG_TASK_SI(4, sid) << "new_should_continue: " << new_should_continue;
269  if (ex) {
270  try {
271  rethrow_exception(ex);
272  }
273  catch (cet::exception& e) {
274  auto action = path_->actionTable_.find(e.root_cause());
275  assert(action != actions::FailModule);
276  if (action != actions::FailPath) {
277  // Possible actions: IgnoreCompletely, Rethrow, SkipEvent
278  ++path_->timesExcept_;
279  path_->state_ = hlt::Exception;
280  if (path_->trptr_) {
281  // Not the end path.
282  path_->trptr_->at(path_->pathPosition_) =
283  HLTPathStatus(path_->state_, idx_);
284  }
285  auto art_ex =
286  Exception{
287  errors::ScheduleExecutionFailure, "Path: ProcessingStopped.", e}
288  << "Exception going through path " << path_->name() << '\n';
289  auto ex_ptr = make_exception_ptr(art_ex);
290  group_.may_run(pathsDone_, ex_ptr);
291  TDEBUG_END_TASK_SI(4, sid) << "terminate path because of EXCEPTION";
292  return;
293  }
294  new_should_continue = false;
295  mf::LogWarning(e.category()) << "Failing path " << path_->name()
296  << ", due to exception, message:\n"
297  << e.what();
298  // WARNING: We continue processing below!!!
299  }
300  catch (...) {
301  mf::LogError("PassingThrough")
302  << "Exception passing through path " << path_->name();
303  ++path_->timesExcept_;
304  path_->state_ = hlt::Exception;
305  if (path_->trptr_) {
306  // Not the end path.
307  path_->trptr_->at(path_->pathPosition_) =
308  HLTPathStatus(path_->state_, idx_);
309  }
310  group_.may_run(pathsDone_, current_exception());
311  TDEBUG_END_TASK_SI(4, sid) << "terminate path because of EXCEPTION";
312  return;
313  }
314  }
315 
316  path_->process_event_workerFinished(
317  idx_, max_idx_, ep_, new_should_continue, pathsDone_);
318  TDEBUG_END_TASK_SI(4, sid);
319  }
320 
321  private:
323  size_t const idx_;
324  size_t const max_idx_;
326  WaitingTaskPtr pathsDone_;
328  };
329 
330  // This function is the main body of the Run Worker task.
331  void
332  Path::process_event_idx(size_t const idx,
333  size_t const max_idx,
334  EventPrincipal& ep,
335  WaitingTaskPtr pathsDone)
336  {
337  auto const sid = pc_.scheduleID();
338  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
339  auto workerDoneTask = make_waiting_task<WorkerDoneTask>(
340  this, idx, max_idx, ep, pathsDone, taskGroup_);
341  auto& workerInPath = workers_[idx];
342  workerInPath.run(workerDoneTask, ep);
343  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
344  }
345 
346  void
347  Path::process_event_workerFinished(size_t const idx,
348  size_t const max_idx,
349  EventPrincipal& ep,
350  bool const should_continue,
351  WaitingTaskPtr pathsDone)
352  {
353  auto const sid = pc_.scheduleID();
354  TDEBUG_BEGIN_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx
355  << " should_continue: " << should_continue;
356  auto new_idx = idx + 1;
357  // Move on to the next worker.
358  if (should_continue && (new_idx < max_idx)) {
359  // Spawn the next worker.
360  process_event_idx_asynch(new_idx, max_idx, ep, pathsDone);
361  // And end this one.
362  TDEBUG_END_FUNC_SI(4, sid)
363  << "new_idx: " << new_idx << " max_idx: " << max_idx;
364  return;
365  }
366 
367  // All done, or filter rejected, or error.
368  process_event_pathFinished(new_idx, should_continue, pathsDone);
369  // And end the path here.
370  TDEBUG_END_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
371  }
372 
373  void
374  Path::process_event_pathFinished(size_t const idx,
375  bool const should_continue,
376  WaitingTaskPtr pathsDone)
377  {
378  // We come here as as part of a runWorker task.
379  auto const sid = pc_.scheduleID();
380  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx
381  << " should_continue: " << should_continue;
382  if (should_continue) {
383  ++timesPassed_;
384  state_ = hlt::Pass;
385  } else {
386  ++timesFailed_;
387  state_ = hlt::Fail;
388  }
389 
390  auto ex_ptr = std::exception_ptr{};
391  try {
392  HLTPathStatus const status{state_, idx};
393  if (trptr_) {
394  // Not the end path.
395  trptr_->at(pathPosition_) = status;
396  }
397  actReg_.sPostProcessPath.invoke(pc_, status);
398  }
399  catch (...) {
400  ex_ptr = std::current_exception();
401  }
402  taskGroup_.may_run(pathsDone, ex_ptr);
403  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx
404  << " should_continue: " << should_continue
405  << (ex_ptr ? " EXCEPTION" : "");
406  }
407 
408 } // namespace art
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
#define TDEBUG_END_TASK_SI(LEVEL, SI)
void operator()(exception_ptr ex)
Definition: Path.cc:261
const std::string instance
STL namespace.
WorkerDoneTask(Path *path, size_t const idx, size_t const max_idx, EventPrincipal &ep, WaitingTaskPtr pathsDone, GlobalTaskGroup &group)
Definition: Path.cc:247
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
HLTState
Definition: HLTenums.h:6
#define TDEBUG_TASK_SI(LEVEL, SI)
WaitingTaskPtr pathsDone_
Definition: Path.cc:326
Transition
Definition: Transition.h:7
size_t const idx_
Definition: Path.cc:323
EventPrincipal & ep_
Definition: Path.cc:325
#define TDEBUG_FUNC_SI(LEVEL, SI)
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
GlobalTaskGroup & group_
Definition: Path.cc:327
Definition: Path.h:31
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
Definition: MVAAlg.h:12
Float_t e
Definition: plot.C:35
size_t const max_idx_
Definition: Path.cc:324
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33