LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
TriggerPathsExecutor.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
17 #include "cetlib/trim.h"
18 #include "hep_concurrency/WaitingTask.h"
20 #include "range/v3/view.hpp"
21 
22 #include <cassert>
23 #include <utility>
24 
25 using namespace hep::concurrency;
26 using namespace std;
27 
28 namespace {
29  auto
30  unique_workers(art::PathsInfo const& pinfo)
31  {
32  using namespace ::ranges;
33  return pinfo.workers() | views::values | views::indirect |
34  views::filter([](auto const& worker) { return worker.isUnique(); });
35  }
36 }
37 
38 namespace art {
39 
40  TriggerPathsExecutor::TriggerPathsExecutor(
41  ScheduleID const scheduleID,
42  PathManager& pm,
43  ActionTable const& actions,
44  ActivityRegistry const& activityRegistry,
45  GlobalTaskGroup& group)
46  : sc_{scheduleID}
47  , actionTable_{actions}
48  , actReg_{activityRegistry}
49  , triggerPathsInfo_{pm.triggerPathsInfo(scheduleID)}
50  , results_inserter_{pm.releaseTriggerResultsInserter(scheduleID)}
51  , taskGroup_{group}
52  {
53  TDEBUG_FUNC_SI(5, scheduleID) << hex << this << dec;
54  }
55 
56  void
58  {
59  for (auto& worker : unique_workers(triggerPathsInfo_)) {
60  worker.beginJob(resources);
61  }
62  if (results_inserter_) {
63  results_inserter_->beginJob(resources);
64  }
65  }
66 
67  void
69  {
71  for (auto& worker : unique_workers(triggerPathsInfo_)) {
72  // FIXME: The catch and rethrow here seems to have little value added.
73  try {
74  worker.endJob();
75  }
76  catch (cet::exception& e) {
77  error << "cet::exception caught in TriggerPathsExecutor::endJob\n"
78  << e.explain_self();
79  throw error;
80  }
81  catch (exception& e) {
82  error << "Standard library exception caught in "
83  "TriggerPathsExecutor::endJob\n"
84  << e.what();
85  throw error;
86  }
87  catch (...) {
88  error << "Unknown exception caught in TriggerPathsExecutor::endJob\n";
89  throw error;
90  }
91  }
92  if (results_inserter_) {
93  // FIXME: The catch and rethrow here seems to have little value added.
94  try {
95  results_inserter_->endJob();
96  }
97  catch (cet::exception& e) {
98  error << "cet::exception caught in TriggerPathsExecutor::endJob\n"
99  << e.explain_self();
100  throw error;
101  }
102  catch (exception& e) {
103  error << "Standard library exception caught in "
104  "TriggerPathsExecutor::endJob\n"
105  << e.what();
106  throw error;
107  }
108  catch (...) {
109  error << "Unknown exception caught in TriggerPathsExecutor::endJob\n";
110  throw error;
111  }
112  }
113  }
114 
115  void
117  {
118  for (auto& worker : unique_workers(triggerPathsInfo_)) {
119  worker.respondToOpenInputFile(fb);
120  }
121  if (results_inserter_) {
122  results_inserter_->respondToOpenInputFile(fb);
123  }
124  }
125 
126  void
128  {
129  for (auto& worker : unique_workers(triggerPathsInfo_)) {
130  worker.respondToCloseInputFile(fb);
131  }
132  if (results_inserter_) {
133  results_inserter_->respondToCloseInputFile(fb);
134  }
135  }
136 
137  void
139  {
140  for (auto& worker : unique_workers(triggerPathsInfo_)) {
141  worker.respondToOpenOutputFiles(fb);
142  }
143  if (results_inserter_) {
144  results_inserter_->respondToOpenOutputFiles(fb);
145  }
146  }
147 
148  void
150  {
151  for (auto& worker : unique_workers(triggerPathsInfo_)) {
152  worker.respondToCloseOutputFiles(fb);
153  }
154  if (results_inserter_) {
155  results_inserter_->respondToCloseOutputFiles(fb);
156  }
157  }
158 
159  void
161  {
163  for (auto& path : triggerPathsInfo_.paths()) {
164  path.process(trans, principal);
165  }
166  }
167 
169  public:
171  WaitingTaskPtr const endPathTask,
172  EventPrincipal& principal,
173  GlobalTaskGroup& group)
174  : schedule_{schedule}
175  , endPathTask_{endPathTask}
176  , principal_{principal}
177  , taskGroup_{group}
178  {}
179 
180  void
181  operator()(exception_ptr const ex)
182  {
183  auto const scheduleID = schedule_->sc_.id();
184 
185  TDEBUG_BEGIN_TASK_SI(4, scheduleID);
186  if (ex) {
187  taskGroup_.may_run(endPathTask_, ex);
188  TDEBUG_END_TASK_SI(4, scheduleID)
189  << "trigger path processing terminate because of EXCEPTION";
190  return;
191  }
192 
193  try {
194  schedule_->process_event_paths_done(principal_);
195  taskGroup_.may_run(endPathTask_);
196  }
197  catch (...) {
198  taskGroup_.may_run(endPathTask_, current_exception());
199  };
200 
201  // Start the endPathTask going.
202  TDEBUG_END_TASK_SI(4, scheduleID);
203  }
204 
205  private:
207  WaitingTaskPtr const endPathTask_;
210  };
211 
212  void
213  TriggerPathsExecutor::process_event(WaitingTaskPtr endPathTask,
214  EventPrincipal& event_principal)
215  {
216  // We get here as part of the readAndProcessEventTask (schedule
217  // head task).
218  actReg_.sPreProcessEvent.invoke(
219  event_principal.makeEvent(ModuleContext::invalid()), sc_);
220  auto const scheduleID = sc_.id();
221  TDEBUG_BEGIN_FUNC_SI(4, scheduleID);
222  if (results_inserter_) {
223  results_inserter_->reset();
224  }
227  try {
228  if (triggerPathsInfo_.paths().empty()) {
229  auto pathsDoneTask = make_waiting_task<PathsDoneTask>(
230  this, endPathTask, event_principal, taskGroup_);
231  taskGroup_.may_run(pathsDoneTask);
232  TDEBUG_END_FUNC_SI(4, scheduleID);
233  return;
234  }
235  auto pathsDoneTask = std::make_shared<WaitingTask>(
236  PathsDoneTask{this, endPathTask, event_principal, taskGroup_},
237  triggerPathsInfo_.paths().size());
238  for (auto& path : triggerPathsInfo_.paths()) {
239  // Start each path running. The path will start a spawn chain
240  // going to run each worker in the order specified on the
241  // path, and when they have all been run, it will call
242  // doneWaiting() on the pathsDoneTask, which decrements its
243  // reference count, which will eventually cause it to run when
244  // every path has finished.
245  path.process(pathsDoneTask, event_principal);
246  }
247  TDEBUG_END_FUNC_SI(4, scheduleID);
248  }
249  catch (...) {
250  taskGroup_.may_run(endPathTask, current_exception());
251  TDEBUG_END_FUNC_SI(4, scheduleID) << "because of EXCEPTION";
252  }
253  }
254 
255  void
257  {
258  // We come here as part of the pathsDoneTask.
259  auto const scheduleID = sc_.id();
260  TDEBUG_BEGIN_FUNC_SI(4, scheduleID);
261  try {
264  }
265  if (results_inserter_) {
266  // FIXME: not sure what the trigger bit should be
267  auto const& resultsInserterDesc = results_inserter_->description();
268  PathContext const pc{sc_,
270  {resultsInserterDesc.moduleLabel()}};
271  ModuleContext const mc{pc, resultsInserterDesc};
272  results_inserter_->doWork_event(principal, mc);
273  }
274  }
275  catch (cet::exception& e) {
276  auto action = actionTable_.find(e.root_cause());
277  assert(action != actions::IgnoreCompletely);
278  assert(action != actions::FailPath);
279  assert(action != actions::FailModule);
280  if (action != actions::SkipEvent) {
281  TDEBUG_END_FUNC_SI(4, scheduleID);
282  throw;
283  }
284  mf::LogWarning(e.category())
285  << "An exception occurred inserting the TriggerResults object:\n"
286  << cet::trim_right_copy(e.what(), " \n");
287  }
288  TDEBUG_END_FUNC_SI(4, scheduleID);
289  }
290 } // namespace art
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
ScheduleContext const sc_
std::unique_ptr< Worker > results_inserter_
HLTGlobalStatus & pathResults()
Definition: PathsInfo.cc:92
std::vector< Path > & paths()
Definition: PathsInfo.cc:54
Event makeEvent(ModuleContext const &mc)
actions::ActionCodes find(std::string const &category) const
Definition: Actions.cc:58
#define TDEBUG_END_TASK_SI(LEVEL, SI)
PathsDoneTask(TriggerPathsExecutor *const schedule, WaitingTaskPtr const endPathTask, EventPrincipal &principal, GlobalTaskGroup &group)
void process(Transition, Principal &)
STL namespace.
void respondToOpenOutputFiles(FileBlock const &)
ActivityRegistry const & actReg_
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:22
ActionTable const & actionTable_
Transition
Definition: Transition.h:7
decltype(auto) values(Coll &&coll)
Range-for loop helper iterating across the values of the specified collection.
void respondToOpenInputFile(FileBlock const &)
#define TDEBUG_FUNC_SI(LEVEL, SI)
void process_event_paths_done(EventPrincipal &)
GlobalSignal< detail::SignalResponseType::FIFO, void(Event const &, ScheduleContext)> sPreProcessEvent
void reset_for_event()
Definition: PathsInfo.cc:85
TFile fb("Li6.root")
void respondToCloseInputFile(FileBlock const &)
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
void may_run(hep::concurrency::WaitingTaskPtr task, std::exception_ptr ex_ptr={})
void incrementPassedEventCount()
Definition: PathsInfo.cc:104
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
Definition: MVAAlg.h:12
void process_event(hep::concurrency::WaitingTaskPtr endPathTask, EventPrincipal &)
static auto art_path_spec()
Definition: PathContext.h:32
Float_t e
Definition: plot.C:35
void reset()
Definition: PathsInfo.cc:77
void respondToCloseOutputFiles(FileBlock const &)
static ModuleContext invalid()
Definition: ModuleContext.h:22
void beginJob(detail::SharedResources const &resources)
void incrementTotalEventCount()
Definition: PathsInfo.cc:98
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33