LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
EndPathExecutor.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
20 #include "hep_concurrency/WaitingTask.h"
22 #include "range/v3/view.hpp"
23 
24 #include <memory>
25 #include <type_traits>
26 #include <utility>
27 #include <vector>
28 
29 using namespace hep::concurrency;
30 using namespace std;
31 
32 namespace {
33  auto
34  unique_workers(art::PathsInfo const& pinfo)
35  {
36  using namespace ::ranges;
37  return pinfo.workers() | views::values | views::indirect |
38  views::filter([](auto const& worker) { return worker.isUnique(); });
39  }
40 }
41 
42 namespace art {
43 
44  EndPathExecutor::EndPathExecutor(ScheduleID const sid,
45  PathManager& pm,
46  ActionTable const& actionTable,
47  UpdateOutputCallbacks& outputCallbacks,
48  GlobalTaskGroup& group)
49  : sc_{sid}
50  , actionTable_{actionTable}
51  , endPathInfo_{pm.endPathInfo(sid)}
52  , taskGroup_{group}
53  {
54  for (auto const& worker :
56  assert(sid == worker->scheduleID());
57  if (auto owp = std::dynamic_pointer_cast<OutputWorker>(worker)) {
58  outputWorkers_.emplace_back(owp.get());
59  }
60  }
61  outputWorkersToOpen_.insert(outputWorkers_.cbegin(), outputWorkers_.cend());
62  outputCallbacks.registerCallback(
63  [this](auto const& tables) { selectProducts(tables); });
64  }
65 
66  void
68  {
69  for (auto& worker : unique_workers(endPathInfo_)) {
70  worker.beginJob(resources);
71  }
72  }
73 
74  void
76  {
78  // FIXME: There seems to be little value-added by the catch and rethrow
79  // here.
80  for (auto& worker : unique_workers(endPathInfo_)) {
81  try {
82  worker.endJob();
83  }
84  catch (cet::exception& e) {
85  error << "cet::exception caught in Schedule::endJob\n"
86  << e.explain_self();
87  throw error;
88  }
89  catch (exception& e) {
90  error << "Standard library exception caught in Schedule::endJob\n"
91  << e.what();
92  throw error;
93  }
94  catch (...) {
95  error << "Unknown exception caught in Schedule::endJob\n";
96  throw error;
97  }
98  }
99  }
100 
101  void
103  {
104  for (auto ow : outputWorkers_) {
105  ow->selectProducts(tables);
106  }
107  }
108 
109  void
111  {
112  for (auto& worker : unique_workers(endPathInfo_)) {
113  worker.respondToOpenInputFile(fb);
114  }
115  }
116 
117  void
119  {
120  for (auto& worker : unique_workers(endPathInfo_)) {
121  worker.respondToCloseInputFile(fb);
122  }
123  }
124 
125  void
127  {
128  for (auto& worker : unique_workers(endPathInfo_)) {
129  worker.respondToOpenOutputFiles(fb);
130  }
131  }
132 
133  void
135  {
136  for (auto& worker : unique_workers(endPathInfo_)) {
137  worker.respondToCloseOutputFiles(fb);
138  }
139  }
140 
141  bool
143  {
144  return any_of(outputWorkers_.cbegin(), outputWorkers_.cend(), [](auto ow) {
145  return ow->fileIsOpen();
146  });
147  }
148 
149  void
151  {
152  for (auto ow : outputWorkers_) {
153  ow->closeFile();
154  }
155  }
156 
157  void
159  {
160  runRangeSetHandler_.reset(rsh.clone());
161  }
162 
163  void
165  {
166  for (auto ow : outputWorkers_) {
167  ow->setRunAuxiliaryRangeSetID(rangeSet);
168  }
169  }
170 
171  void
173  {
174  for (auto ow : outputWorkers_) {
175  ow->writeRun(rp);
176  }
178  runRangeSetHandler_->rebase();
179  }
180  }
181 
182  void
184  {
185  subRunRangeSetHandler_.reset(rsh.clone());
186  }
187 
188  void
190  {
191  for (auto ow : outputWorkers_) {
192  // For RootOutput this enters the possibly split range set into
193  // the range set db.
194  ow->setSubRunAuxiliaryRangeSetID(rs);
195  }
196  }
197 
198  void
200  {
201  for (auto ow : outputWorkers_) {
202  ow->writeSubRun(srp);
203  }
205  subRunRangeSetHandler_->rebase();
206  }
207  }
208 
209  //
210  // MEMBER FUNCTIONS -- Process Non-Event
211  //
212 
213  void
215  {
216  for (auto& worker : unique_workers(endPathInfo_)) {
217  worker.reset();
218  }
219  try {
220  if (!endPathInfo_.paths().empty()) {
221  endPathInfo_.paths().front().process(trans, principal);
222  }
223  }
224  catch (cet::exception& ex) {
225  throw Exception(errors::EventProcessorFailure, "EndPathExecutor:")
226  << "an exception occurred during current event processing\n"
227  << ex;
228  }
229  catch (...) {
230  mf::LogError("PassingThrough")
231  << "an exception occurred during current event processing\n";
232  throw;
233  }
235  }
236 
238  public:
239  PathsDoneTask(EndPathExecutor* const endPathExec,
240  WaitingTaskPtr const finalizeEventTask,
241  GlobalTaskGroup& taskGroup)
242  : endPathExec_{endPathExec}
243  , finalizeEventTask_{finalizeEventTask}
244  , taskGroup_{taskGroup}
245  {}
246 
247  void
248  operator()(exception_ptr const ex)
249  {
250  auto const scheduleID = endPathExec_->sc_.id();
251 
252  // Note: When we start our parent task is the eventLoop task.
253  TDEBUG_BEGIN_TASK_SI(4, scheduleID);
254 
255  if (ex) {
256  try {
257  rethrow_exception(ex);
258  }
259  catch (cet::exception& e) {
260  Exception tmp(errors::EventProcessorFailure, "EndPathExecutor:");
261  tmp << "an exception occurred during current event processing\n" << e;
262  taskGroup_.may_run(finalizeEventTask_, make_exception_ptr(tmp));
263  TDEBUG_END_TASK_SI(4, scheduleID)
264  << "end path processing terminate because of EXCEPTION";
265  return;
266  }
267  catch (...) {
268  taskGroup_.may_run(finalizeEventTask_, current_exception());
269  TDEBUG_END_TASK_SI(4, scheduleID)
270  << "end path processing terminate because of EXCEPTION";
271  return;
272  }
273  }
274 
275  endPathExec_->endPathInfo_.incrementPassedEventCount();
276 
277  taskGroup_.may_run(finalizeEventTask_);
278  TDEBUG_END_TASK_SI(4, scheduleID);
279  }
280 
281  private:
283  WaitingTaskPtr const finalizeEventTask_;
285  };
286 
287  // Note: We come here as part of the endPath task, our
288  // parent task is the eventLoop task.
289  void
290  EndPathExecutor::process_event(WaitingTaskPtr finalizeEventTask,
291  EventPrincipal& ep)
292  {
293  auto const sid = sc_.id();
294  TDEBUG_BEGIN_FUNC_SI(4, sid);
297  try {
298  auto pathsDoneTask =
299  make_waiting_task<PathsDoneTask>(this, finalizeEventTask, taskGroup_);
300  if (endPathInfo_.paths().empty()) {
301  taskGroup_.may_run(pathsDoneTask);
302  } else {
303  endPathInfo_.paths().front().process(pathsDoneTask, ep);
304  }
305  }
306  catch (...) {
307  taskGroup_.may_run(finalizeEventTask, current_exception());
308  }
309  TDEBUG_END_FUNC_SI(4, sid);
310  }
311 
312  void
314  {
315  // We don't worry about providing the sorted list of module names
316  // for the end_path right now. If users decide it is necessary to
317  // know what they are, then we can provide them.
318  PathContext const pc{sc_, PathContext::end_path_spec(), {}};
319  for (auto ow : outputWorkers_) {
320  ow->writeEvent(ep, pc);
321  }
322  auto const& eid = ep.eventID();
323  bool const lastInSubRun{ep.isLastInSubRun()};
324  TDEBUG_FUNC_SI(5, sc_.id())
325  << "eid: " << eid.run() << ", " << eid.subRun() << ", " << eid.event();
326  runRangeSetHandler_->update(eid, lastInSubRun);
327  subRunRangeSetHandler_->update(eid, lastInSubRun);
328  }
329 
330  bool
332  {
333  return !outputWorkersToClose_.empty();
334  }
335 
336  // MT note: This is where we need to get all the schedules
337  // synchronized, and then have all schedules do the file
338  // close, and then the file open, then the schedules can
339  // proceed. A nasty complication is that a great deal of
340  // time can go by between the file close and the file open
341  // because artdaq may pause the run inbetween, and wants to
342  // have all output files closed while the run is paused.
343  // They probably want the input file closed too.
344  void
346  {
348  for (auto ow : outputWorkersToClose_) {
349  // Skip files that are already closed due to other end-path
350  // executors already closing them.
351  if (!ow->fileIsOpen()) {
352  continue;
353  }
354  ow->closeFile();
355  }
356  outputWorkersToOpen_ = std::move(outputWorkersToClose_);
357  }
358 
359  bool
361  {
362  return !outputWorkersToOpen_.empty();
363  }
364 
365  void
367  {
368  for (auto ow : outputWorkersToOpen_) {
369  ow->openFile(fb);
370  }
372  outputWorkersToOpen_.clear();
373  }
374 
375  // Note: When we are passed OutputFileStatus::Switching, we must close
376  // the file and call openSomeOutputFiles which changes it back
377  // to OutputFileStatus::Open.
378  // A side effect of switching status is the run/subrun writes
379  // are not counted in the overall counting by RootOutputClosingCriteria
380  // while the switch is active (this avoids counting the extra subRun and
381  // Run that we are forced to write to finish out the file we are
382  // closing, which keeps the ongoing count for closing based on SubRun
383  // and Run counts meaningful). However, the extra ones are still
384  // counted by the tree entry counters.
385  void
387  {
388  for (auto ow : outputWorkers_) {
389  ow->setFileStatus(ofs);
390  }
391  fileStatus_ = ofs;
392  }
393 
394  void
396  {
397  for (auto ow : outputWorkers_) {
398  if (atBoundary < ow->fileGranularity()) {
399  // The boundary we are checking at is finer than the checks
400  // the output worker needs, nothing to do.
401  continue;
402  }
403  if (ow->requestsToCloseFile()) {
404  outputWorkersToClose_.insert(ow);
405  }
406  }
407  }
408 
409  void
411  {
412  for (auto ow : outputWorkers_) {
413  ow->incrementInputFileNumber();
414  }
415  }
416 
417 } // namespace art
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
std::vector< Path > & paths()
Definition: PathsInfo.cc:54
bool outputsToClose() const
void seedRunRangeSet(RangeSetHandler const &)
#define TDEBUG_END_TASK_SI(LEVEL, SI)
PathsDoneTask(EndPathExecutor *const endPathExec, WaitingTaskPtr const finalizeEventTask, GlobalTaskGroup &taskGroup)
STL namespace.
void writeSubRun(SubRunPrincipal &srp)
Float_t tmp
Definition: plot.C:35
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
void respondToOpenInputFile(FileBlock const &fb)
void respondToOpenOutputFiles(FileBlock const &fb)
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:22
void selectProducts(ProductTables const &)
void operator()(exception_ptr const ex)
void recordOutputClosureRequests(Granularity)
void respondToCloseOutputFiles(FileBlock const &fb)
void openSomeOutputFiles(FileBlock const &fb)
OutputFileStatus
Transition
Definition: Transition.h:7
void setOutputFileStatus(OutputFileStatus)
decltype(auto) values(Coll &&coll)
Range-for loop helper iterating across the values of the specified collection.
std::atomic< OutputFileStatus > fileStatus_
std::set< OutputWorker * > outputWorkersToClose_
EventID const & eventID() const
void seedSubRunRangeSet(RangeSetHandler const &)
bool someOutputsOpen() const
bool isLastInSubRun() const
ScheduleContext const sc_
#define TDEBUG_FUNC_SI(LEVEL, SI)
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler_
static auto end_path_spec()
Definition: PathContext.h:20
void writeRun(RunPrincipal &rp)
void beginJob(detail::SharedResources const &resources)
void reset_for_event()
Definition: PathsInfo.cc:85
bool outputsToOpen() const
void setRunAuxiliaryRangeSetID(RangeSet const &rs)
void setSubRunAuxiliaryRangeSetID(RangeSet const &rs)
TFile fb("Li6.root")
std::set< OutputWorker * > outputWorkersToOpen_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
void respondToCloseInputFile(FileBlock const &fb)
void may_run(hep::concurrency::WaitingTaskPtr task, std::exception_ptr ex_ptr={})
void writeEvent(EventPrincipal &)
void incrementPassedEventCount()
Definition: PathsInfo.cc:104
ActionTable const & actionTable_
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
Definition: MVAAlg.h:12
RangeSetHandler * clone() const
void process(Transition, Principal &)
GlobalTaskGroup & taskGroup_
Float_t e
Definition: plot.C:35
void process_event(hep::concurrency::WaitingTaskPtr finalizeEventTask, EventPrincipal &)
std::vector< OutputWorker * > outputWorkers_
std::unique_ptr< RangeSetHandler > runRangeSetHandler_
void incrementTotalEventCount()
Definition: PathsInfo.cc:98
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33