LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
art::Path Class Reference

#include "Path.h"

Classes

class  WorkerDoneTask
 

Public Member Functions

 Path (ActionTable const &, ActivityRegistry const &, PathContext const &, std::vector< WorkerInPath > &&, HLTGlobalStatus *, GlobalTaskGroup &) noexcept
 
ScheduleID scheduleID () const
 
PathSpec const & pathSpec () const
 
PathID pathID () const
 
std::string const & name () const
 
std::vector< WorkerInPath > const & workersInPath () const
 
hlt::HLTState state () const
 
std::size_t timesRun () const
 
std::size_t timesPassed () const
 
std::size_t timesFailed () const
 
std::size_t timesExcept () const
 
void clearCounters ()
 
void process (Transition, Principal &)
 
void process (hep::concurrency::WaitingTaskPtr pathsDoneTask, EventPrincipal &)
 

Private Member Functions

void runWorkerTask (size_t idx, size_t max_idx, EventPrincipal &, hep::concurrency::WaitingTaskPtr pathsDone)
 
void process_event_idx_asynch (size_t idx, size_t max_idx, EventPrincipal &, hep::concurrency::WaitingTaskPtr pathsDone)
 
void process_event_idx (size_t const idx, size_t const max_idx, EventPrincipal &, hep::concurrency::WaitingTaskPtr pathsDone)
 
void process_event_workerFinished (size_t const idx, size_t const max_idx, EventPrincipal &ep, bool should_continue, hep::concurrency::WaitingTaskPtr pathsDone)
 
void process_event_pathFinished (size_t const idx, bool should_continue, hep::concurrency::WaitingTaskPtr pathsDone)
 

Private Attributes

ActionTable const & actionTable_
 
ActivityRegistry const & actReg_
 
PathContext const pc_
 
size_t const pathPosition_
 
std::vector< WorkerInPathworkers_
 
cet::exempt_ptr< HLTGlobalStatustrptr_
 
GlobalTaskGrouptaskGroup_
 
hlt::HLTState state_ {hlt::Ready}
 
std::size_t timesRun_ {}
 
std::size_t timesPassed_ {}
 
std::size_t timesFailed_ {}
 
std::size_t timesExcept_ {}
 

Detailed Description

Definition at line 31 of file Path.h.

Constructor & Destructor Documentation

art::Path::Path ( ActionTable const &  actions,
ActivityRegistry const &  actReg,
PathContext const &  pc,
std::vector< WorkerInPath > &&  workers,
HLTGlobalStatus pathResults,
GlobalTaskGroup taskGroup 
)
noexcept

Definition at line 32 of file Path.cc.

References TDEBUG_FUNC_SI.

38  : actionTable_{actions}
39  , actReg_{actReg}
40  , pc_{pc}
41  , pathPosition_{ServiceHandle<TriggerNamesService>()->index_for(
42  pc_.pathID())}
43  , workers_{std::move(workers)}
44  , trptr_{pathResults}
45  , taskGroup_{taskGroup}
46  {
47  TDEBUG_FUNC_SI(4, pc_.scheduleID()) << hex << this << dec;
48  }
GlobalTaskGroup & taskGroup_
Definition: Path.h:92
ActionTable const & actionTable_
Definition: Path.h:82
auto scheduleID() const
Definition: PathContext.h:52
#define TDEBUG_FUNC_SI(LEVEL, SI)
size_t const pathPosition_
Definition: Path.h:85
ActivityRegistry const & actReg_
Definition: Path.h:83
cet::exempt_ptr< HLTGlobalStatus > trptr_
Definition: Path.h:90
std::vector< WorkerInPath > workers_
Definition: Path.h:87
PathID pathID() const noexcept
Definition: PathContext.h:68
PathContext const pc_
Definition: Path.h:84

Member Function Documentation

void art::Path::clearCounters ( )
string const & art::Path::name ( ) const

Definition at line 69 of file Path.cc.

70  {
71  return pc_.pathName();
72  }
auto const & pathName() const
Definition: PathContext.h:63
PathContext const pc_
Definition: Path.h:84
PathID art::Path::pathID ( ) const

Definition at line 63 of file Path.cc.

64  {
65  return pc_.pathID();
66  }
PathID pathID() const noexcept
Definition: PathContext.h:68
PathContext const pc_
Definition: Path.h:84
PathSpec const & art::Path::pathSpec ( ) const

Definition at line 57 of file Path.cc.

58  {
59  return pc_.pathSpec();
60  }
auto const & pathSpec() const
Definition: PathContext.h:57
PathContext const pc_
Definition: Path.h:84
void art::Path::process ( Transition  trans,
Principal principal 
)

Definition at line 111 of file Path.cc.

References e, art::hlt::Fail, instance, art::hlt::Pass, art::hlt::Ready, art::errors::ScheduleExecutionFailure, TDEBUG_BEGIN_FUNC_SI, TDEBUG_END_FUNC_SI, and TDEBUG_FUNC_SI.

112  {
113  // Invoke pre-path signals only for the first schedule.
114  if (pc_.scheduleID() == ScheduleID::first()) {
115  switch (trans) {
117  actReg_.sPrePathBeginRun.invoke(name());
118  break;
119  case Transition::EndRun:
120  actReg_.sPrePathEndRun.invoke(name());
121  break;
123  actReg_.sPrePathBeginSubRun.invoke(name());
124  break;
126  actReg_.sPrePathEndSubRun.invoke(name());
127  break;
128  default: {
129  } // No other pre-path signals supported.
130  }
131  }
132  state_ = hlt::Ready;
133  std::size_t idx = 0;
134  bool all_passed{false};
135  for (WorkerInPath& wip : workers_) {
136  // We do not want to call (e.g.) beginRun once per schedule for
137  // non-replicated modules.
138  if (not wip.getWorker()->isUnique()) {
139  continue;
140  }
141  try {
142  all_passed = wip.run(trans, principal);
143  if (!all_passed)
144  break;
145  }
146  catch (cet::exception& e) {
148  throw Exception{
149  errors::ScheduleExecutionFailure, "Path: ProcessingStopped.", e}
150  << "Exception going through path " << name() << '\n';
151  }
152  catch (...) {
153  mf::LogError("PassingThrough")
154  << "Exception passing through path " << name();
156  throw;
157  }
158  ++idx;
159  }
160  if (all_passed) {
161  state_ = hlt::Pass;
162  } else {
163  state_ = hlt::Fail;
164  }
165  // Invoke post-path signals only for the last schedule.
166  if (pc_.scheduleID().id() == Globals::instance()->nschedules() - 1) {
167  HLTPathStatus const status(state_, idx);
168  switch (trans) {
170  actReg_.sPostPathBeginRun.invoke(name(), status);
171  break;
172  case Transition::EndRun:
173  actReg_.sPostPathEndRun.invoke(name(), status);
174  break;
176  actReg_.sPostPathBeginSubRun.invoke(name(), status);
177  break;
179  actReg_.sPostPathEndSubRun.invoke(name(), status);
180  break;
181  default: {
182  } // No other post-path signals supported.
183  }
184  }
185  }
hlt::HLTState state_
Definition: Path.h:95
GlobalSignal< detail::SignalResponseType::FIFO, void(std::string const &)> sPrePathEndSubRun
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &, HLTPathStatus const &)> sPostPathEndRun
std::string const & name() const
Definition: Path.cc:69
static constexpr ScheduleID first()
Definition: ScheduleID.h:50
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &, HLTPathStatus const &)> sPostPathEndSubRun
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &, HLTPathStatus const &)> sPostPathBeginSubRun
auto scheduleID() const
Definition: PathContext.h:52
ScheduleID::size_type nschedules() const
Definition: Globals.cc:24
GlobalSignal< detail::SignalResponseType::FIFO, void(std::string const &)> sPrePathEndRun
GlobalSignal< detail::SignalResponseType::FIFO, void(std::string const &)> sPrePathBeginRun
ActivityRegistry const & actReg_
Definition: Path.h:83
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
std::vector< WorkerInPath > workers_
Definition: Path.h:87
static Globals * instance()
Definition: Globals.cc:17
PathContext const pc_
Definition: Path.h:84
Float_t e
Definition: plot.C:35
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &, HLTPathStatus const &)> sPostPathBeginRun
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
GlobalSignal< detail::SignalResponseType::FIFO, void(std::string const &)> sPrePathBeginSubRun
void art::Path::process ( hep::concurrency::WaitingTaskPtr  pathsDoneTask,
EventPrincipal  
)
void art::Path::process_event_idx ( size_t const  idx,
size_t const  max_idx,
EventPrincipal ,
hep::concurrency::WaitingTaskPtr  pathsDone 
)
private

Definition at line 332 of file Path.cc.

References TDEBUG_FUNC_SI.

336  {
337  auto const sid = pc_.scheduleID();
338  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
339  auto workerDoneTask = make_waiting_task<WorkerDoneTask>(
340  this, idx, max_idx, ep, pathsDone, taskGroup_);
341  auto& workerInPath = workers_[idx];
342  workerInPath.run(workerDoneTask, ep);
343  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
344  }
GlobalTaskGroup & taskGroup_
Definition: Path.h:92
auto scheduleID() const
Definition: PathContext.h:52
#define TDEBUG_FUNC_SI(LEVEL, SI)
std::vector< WorkerInPath > workers_
Definition: Path.h:87
PathContext const pc_
Definition: Path.h:84
void art::Path::process_event_idx_asynch ( size_t  idx,
size_t  max_idx,
EventPrincipal ,
hep::concurrency::WaitingTaskPtr  pathsDone 
)
private

Definition at line 232 of file Path.cc.

References TDEBUG_BEGIN_FUNC_SI, and TDEBUG_END_FUNC_SI.

236  {
237  auto const sid = pc_.scheduleID();
238  TDEBUG_BEGIN_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
239  taskGroup_.run([this, idx, max_idx, &ep, pathsDone] {
240  runWorkerTask(idx, max_idx, ep, pathsDone);
241  });
242  TDEBUG_END_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
243  }
GlobalTaskGroup & taskGroup_
Definition: Path.h:92
auto scheduleID() const
Definition: PathContext.h:52
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
PathContext const pc_
Definition: Path.h:84
void runWorkerTask(size_t idx, size_t max_idx, EventPrincipal &, hep::concurrency::WaitingTaskPtr pathsDone)
Definition: Path.cc:210
void art::Path::process_event_pathFinished ( size_t const  idx,
bool  should_continue,
hep::concurrency::WaitingTaskPtr  pathsDone 
)
private

Definition at line 374 of file Path.cc.

References art::hlt::Fail, art::hlt::Pass, and TDEBUG_FUNC_SI.

377  {
378  // We come here as as part of a runWorker task.
379  auto const sid = pc_.scheduleID();
380  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx
381  << " should_continue: " << should_continue;
382  if (should_continue) {
383  ++timesPassed_;
384  state_ = hlt::Pass;
385  } else {
386  ++timesFailed_;
387  state_ = hlt::Fail;
388  }
389 
390  auto ex_ptr = std::exception_ptr{};
391  try {
392  HLTPathStatus const status{state_, idx};
393  if (trptr_) {
394  // Not the end path.
395  trptr_->at(pathPosition_) = status;
396  }
397  actReg_.sPostProcessPath.invoke(pc_, status);
398  }
399  catch (...) {
400  ex_ptr = std::current_exception();
401  }
402  taskGroup_.may_run(pathsDone, ex_ptr);
403  TDEBUG_FUNC_SI(4, sid) << "idx: " << idx
404  << " should_continue: " << should_continue
405  << (ex_ptr ? " EXCEPTION" : "");
406  }
GlobalTaskGroup & taskGroup_
Definition: Path.h:92
hlt::HLTState state_
Definition: Path.h:95
auto scheduleID() const
Definition: PathContext.h:52
GlobalSignal< detail::SignalResponseType::LIFO, void(PathContext const &, HLTPathStatus const &)> sPostProcessPath
std::size_t timesFailed_
Definition: Path.h:98
#define TDEBUG_FUNC_SI(LEVEL, SI)
size_t const pathPosition_
Definition: Path.h:85
ActivityRegistry const & actReg_
Definition: Path.h:83
cet::exempt_ptr< HLTGlobalStatus > trptr_
Definition: Path.h:90
void may_run(hep::concurrency::WaitingTaskPtr task, std::exception_ptr ex_ptr={})
std::size_t timesPassed_
Definition: Path.h:97
PathContext const pc_
Definition: Path.h:84
void art::Path::process_event_workerFinished ( size_t const  idx,
size_t const  max_idx,
EventPrincipal ep,
bool  should_continue,
hep::concurrency::WaitingTaskPtr  pathsDone 
)
private

Definition at line 347 of file Path.cc.

References TDEBUG_BEGIN_FUNC_SI, and TDEBUG_END_FUNC_SI.

352  {
353  auto const sid = pc_.scheduleID();
354  TDEBUG_BEGIN_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx
355  << " should_continue: " << should_continue;
356  auto new_idx = idx + 1;
357  // Move on to the next worker.
358  if (should_continue && (new_idx < max_idx)) {
359  // Spawn the next worker.
360  process_event_idx_asynch(new_idx, max_idx, ep, pathsDone);
361  // And end this one.
362  TDEBUG_END_FUNC_SI(4, sid)
363  << "new_idx: " << new_idx << " max_idx: " << max_idx;
364  return;
365  }
366 
367  // All done, or filter rejected, or error.
368  process_event_pathFinished(new_idx, should_continue, pathsDone);
369  // And end the path here.
370  TDEBUG_END_FUNC_SI(4, sid) << "idx: " << idx << " max_idx: " << max_idx;
371  }
auto scheduleID() const
Definition: PathContext.h:52
void process_event_idx_asynch(size_t idx, size_t max_idx, EventPrincipal &, hep::concurrency::WaitingTaskPtr pathsDone)
Definition: Path.cc:232
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
void process_event_pathFinished(size_t const idx, bool should_continue, hep::concurrency::WaitingTaskPtr pathsDone)
Definition: Path.cc:374
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
PathContext const pc_
Definition: Path.h:84
void art::Path::runWorkerTask ( size_t  idx,
size_t  max_idx,
EventPrincipal ,
hep::concurrency::WaitingTaskPtr  pathsDone 
)
private

Definition at line 210 of file Path.cc.

References TDEBUG_BEGIN_TASK_SI, and TDEBUG_END_TASK_SI.

214  {
215  auto const sid = pc_.scheduleID();
216  TDEBUG_BEGIN_TASK_SI(4, sid);
217  try {
218  process_event_idx(idx, max_idx, ep, pathsDone);
219  TDEBUG_END_TASK_SI(4, sid);
220  }
221  catch (...) {
222  taskGroup_.may_run(pathsDone, current_exception());
223  TDEBUG_END_TASK_SI(4, sid) << "path terminate because of EXCEPTION";
224  }
225  }
GlobalTaskGroup & taskGroup_
Definition: Path.h:92
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
#define TDEBUG_END_TASK_SI(LEVEL, SI)
auto scheduleID() const
Definition: PathContext.h:52
void process_event_idx(size_t const idx, size_t const max_idx, EventPrincipal &, hep::concurrency::WaitingTaskPtr pathsDone)
Definition: Path.cc:332
void may_run(hep::concurrency::WaitingTaskPtr task, std::exception_ptr ex_ptr={})
PathContext const pc_
Definition: Path.h:84
ScheduleID art::Path::scheduleID ( ) const

Definition at line 51 of file Path.cc.

52  {
53  return pc_.scheduleID();
54  }
auto scheduleID() const
Definition: PathContext.h:52
PathContext const pc_
Definition: Path.h:84
hlt::HLTState art::Path::state ( ) const

Definition at line 99 of file Path.cc.

100  {
101  return state_;
102  }
hlt::HLTState state_
Definition: Path.h:95
size_t art::Path::timesExcept ( ) const

Definition at line 93 of file Path.cc.

94  {
95  return timesExcept_;
96  }
std::size_t timesExcept_
Definition: Path.h:99
size_t art::Path::timesFailed ( ) const

Definition at line 87 of file Path.cc.

88  {
89  return timesFailed_;
90  }
std::size_t timesFailed_
Definition: Path.h:98
size_t art::Path::timesPassed ( ) const

Definition at line 81 of file Path.cc.

82  {
83  return timesPassed_;
84  }
std::size_t timesPassed_
Definition: Path.h:97
size_t art::Path::timesRun ( ) const

Definition at line 75 of file Path.cc.

76  {
77  return timesRun_;
78  }
std::size_t timesRun_
Definition: Path.h:96
vector< WorkerInPath > const & art::Path::workersInPath ( ) const

Definition at line 105 of file Path.cc.

106  {
107  return workers_;
108  }
std::vector< WorkerInPath > workers_
Definition: Path.h:87

Member Data Documentation

ActionTable const& art::Path::actionTable_
private

Definition at line 82 of file Path.h.

ActivityRegistry const& art::Path::actReg_
private

Definition at line 83 of file Path.h.

size_t const art::Path::pathPosition_
private

Definition at line 85 of file Path.h.

PathContext const art::Path::pc_
private

Definition at line 84 of file Path.h.

hlt::HLTState art::Path::state_ {hlt::Ready}
private

Definition at line 95 of file Path.h.

GlobalTaskGroup& art::Path::taskGroup_
private

Definition at line 92 of file Path.h.

std::size_t art::Path::timesExcept_ {}
private

Definition at line 99 of file Path.h.

std::size_t art::Path::timesFailed_ {}
private

Definition at line 98 of file Path.h.

std::size_t art::Path::timesPassed_ {}
private

Definition at line 97 of file Path.h.

std::size_t art::Path::timesRun_ {}
private

Definition at line 96 of file Path.h.

cet::exempt_ptr<HLTGlobalStatus> art::Path::trptr_
private

Definition at line 90 of file Path.h.

std::vector<WorkerInPath> art::Path::workers_
private

Definition at line 87 of file Path.h.


The documentation for this class was generated from the following files: