19 #include "hep_concurrency/WaitingTask.h" 35 vector<WorkerInPath>&& workers,
38 : actionTable_{actions}
43 , workers_{std::move(workers)}
45 , taskGroup_{taskGroup}
51 Path::scheduleID()
const 53 return pc_.scheduleID();
57 Path::pathSpec()
const 59 return pc_.pathSpec();
71 return pc_.pathName();
75 Path::timesRun()
const 81 Path::timesPassed()
const 87 Path::timesFailed()
const 93 Path::timesExcept()
const 104 vector<WorkerInPath>
const&
105 Path::workersInPath()
const 114 if (pc_.scheduleID() == ScheduleID::first()) {
116 case Transition::BeginRun:
117 actReg_.sPrePathBeginRun.invoke(name());
119 case Transition::EndRun:
120 actReg_.sPrePathEndRun.invoke(name());
122 case Transition::BeginSubRun:
123 actReg_.sPrePathBeginSubRun.invoke(name());
125 case Transition::EndSubRun:
126 actReg_.sPrePathEndSubRun.invoke(name());
134 bool all_passed{
false};
138 if (not wip.getWorker()->isUnique()) {
142 all_passed = wip.run(trans, principal);
150 <<
"Exception going through path " << name() <<
'\n';
154 <<
"Exception passing through path " << name();
169 case Transition::BeginRun:
170 actReg_.sPostPathBeginRun.invoke(name(), status);
172 case Transition::EndRun:
173 actReg_.sPostPathEndRun.invoke(name(), status);
175 case Transition::BeginSubRun:
176 actReg_.sPostPathBeginSubRun.invoke(name(), status);
178 case Transition::EndSubRun:
179 actReg_.sPostPathEndSubRun.invoke(name(), status);
192 auto const sid = pc_.scheduleID();
194 TDEBUG_FUNC_SI(6, sid) << hex <<
this << dec <<
" Resetting waitingTasks_";
197 actReg_.sPreProcessPath.invoke(pc_);
201 auto max_idx = workers_.size();
205 process_event_idx_asynch(idx, max_idx, ep, pathsDoneTask);
210 Path::runWorkerTask(
size_t const idx,
211 size_t const max_idx,
213 WaitingTaskPtr pathsDone)
215 auto const sid = pc_.scheduleID();
218 process_event_idx(idx, max_idx, ep, pathsDone);
222 taskGroup_.may_run(pathsDone, current_exception());
232 Path::process_event_idx_asynch(
size_t const idx,
233 size_t const max_idx,
235 WaitingTaskPtr pathsDone)
237 auto const sid = pc_.scheduleID();
239 taskGroup_.run([
this, idx, max_idx, &ep, pathsDone] {
240 runWorkerTask(idx, max_idx, ep, pathsDone);
249 size_t const max_idx,
251 WaitingTaskPtr pathsDone,
257 , pathsDone_{pathsDone}
263 auto const sid = path_->pc_.scheduleID();
265 auto& workerInPath = path_->workers_[idx_];
267 bool new_should_continue = workerInPath.returnCode();
268 TDEBUG_TASK_SI(4, sid) <<
"new_should_continue: " << new_should_continue;
271 rethrow_exception(ex);
274 auto action = path_->actionTable_.find(e.root_cause());
278 ++path_->timesExcept_;
282 path_->trptr_->at(path_->pathPosition_) =
288 <<
"Exception going through path " << path_->name() <<
'\n';
289 auto ex_ptr = make_exception_ptr(art_ex);
290 group_.may_run(pathsDone_, ex_ptr);
294 new_should_continue =
false;
296 <<
", due to exception, message:\n" 302 <<
"Exception passing through path " << path_->name();
303 ++path_->timesExcept_;
307 path_->trptr_->at(path_->pathPosition_) =
310 group_.may_run(pathsDone_, current_exception());
316 path_->process_event_workerFinished(
317 idx_, max_idx_, ep_, new_should_continue, pathsDone_);
332 Path::process_event_idx(
size_t const idx,
333 size_t const max_idx,
335 WaitingTaskPtr pathsDone)
337 auto const sid = pc_.scheduleID();
338 TDEBUG_FUNC_SI(4, sid) <<
"idx: " << idx <<
" max_idx: " << max_idx;
339 auto workerDoneTask = make_waiting_task<WorkerDoneTask>(
340 this, idx, max_idx, ep, pathsDone, taskGroup_);
341 auto& workerInPath = workers_[idx];
342 workerInPath.run(workerDoneTask, ep);
343 TDEBUG_FUNC_SI(4, sid) <<
"idx: " << idx <<
" max_idx: " << max_idx;
347 Path::process_event_workerFinished(
size_t const idx,
348 size_t const max_idx,
350 bool const should_continue,
351 WaitingTaskPtr pathsDone)
353 auto const sid = pc_.scheduleID();
355 <<
" should_continue: " << should_continue;
356 auto new_idx = idx + 1;
358 if (should_continue && (new_idx < max_idx)) {
360 process_event_idx_asynch(new_idx, max_idx, ep, pathsDone);
363 <<
"new_idx: " << new_idx <<
" max_idx: " << max_idx;
368 process_event_pathFinished(new_idx, should_continue, pathsDone);
374 Path::process_event_pathFinished(
size_t const idx,
375 bool const should_continue,
376 WaitingTaskPtr pathsDone)
379 auto const sid = pc_.scheduleID();
381 <<
" should_continue: " << should_continue;
382 if (should_continue) {
390 auto ex_ptr = std::exception_ptr{};
395 trptr_->at(pathPosition_) = status;
397 actReg_.sPostProcessPath.invoke(pc_, status);
400 ex_ptr = std::current_exception();
402 taskGroup_.may_run(pathsDone, ex_ptr);
404 <<
" should_continue: " << should_continue
405 << (ex_ptr ?
" EXCEPTION" :
"");
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
#define TDEBUG_END_TASK_SI(LEVEL, SI)
void operator()(exception_ptr ex)
const std::string instance
WorkerDoneTask(Path *path, size_t const idx, size_t const max_idx, EventPrincipal &ep, WaitingTaskPtr pathsDone, GlobalTaskGroup &group)
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
#define TDEBUG_TASK_SI(LEVEL, SI)
WaitingTaskPtr pathsDone_
#define TDEBUG_FUNC_SI(LEVEL, SI)
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
cet::coded_exception< error, detail::translate > exception