LArSoft  v07_13_02
Liquid Argon Software toolkit - http://larsoft.org/
EndPathExecutor.cc
Go to the documentation of this file.
2 
5 #include "cetlib/container_algorithms.h"
6 
7 #include <memory>
8 #include <type_traits>
9 #include <utility>
10 
11 namespace {
12  auto
13  getOutputWorkers(art::WorkerMap const& workers)
14  {
15  std::vector<art::OutputWorker*> ows;
16  for (auto const& val : workers) {
17  auto w = val.second.get();
18  if (auto ow = dynamic_cast<art::OutputWorker*>(w)) {
19  ows.emplace_back(ow);
20  }
21  }
22  return ows;
23  }
24 }
25 
27  ActionTable& actions,
28  ActivityRegistry& areg,
30  : endPathInfo_{pm.endPathInfo()}
31  , act_table_{&actions}
32  , actReg_{areg}
33  , outputWorkers_{getOutputWorkers(endPathInfo_.workers())}
36  std::end(outputWorkers_)) // seed with all output workers
37  , workersEnabled_(endPathInfo_.workers().size(), true)
39 {
40  mpr.registerProductListUpdatedCallback(
41  [this](auto const& productList) { this->selectProducts(productList); });
42 }
43 
44 bool
46 {
47  bool const rc = !outputWorkers_.empty() && // Necessary because std::all_of()
48  // returns true if range is empty.
49  std::all_of(outputWorkers_.cbegin(),
50  outputWorkers_.cend(),
51  [](auto& w) { return w->limitReached(); });
52  if (rc) {
53  mf::LogInfo("SuccessfulTermination")
54  << "The job is terminating successfully because each output module\n"
55  << "has reached its configured limit.\n";
56  }
57  return rc;
58 }
59 
60 void
62 {
63  bool failure{false};
65  doForAllEnabledWorkers_([&failure, &error](Worker* w) {
66  try {
67  w->endJob();
68  }
69  catch (cet::exception& e) {
70  error << "cet::exception caught in Schedule::endJob\n"
71  << e.explain_self();
72  failure = true;
73  }
74  catch (std::exception& e) {
75  error << "Standard library exception caught in Schedule::endJob\n"
76  << e.what();
77  failure = true;
78  }
79  catch (...) {
80  error << "Unknown exception caught in Schedule::endJob\n";
81  failure = true;
82  }
83  });
84  if (failure) {
85  throw error;
86  }
87 }
88 
89 void
91 {
93  actReg_.sPreCloseOutputFile.invoke(ow->label());
94  ow->closeFile();
97  });
98 }
99 
100 void
102 {
104  ow->openFile(fb);
105  actReg_.sPostOpenOutputFile.invoke(ow->label());
106  });
107 }
108 
109 void
111 {
112  doForAllEnabledOutputWorkers_([&rp](auto w) { w->writeRun(rp); });
114  runRangeSetHandler_->rebase();
115  }
116 }
117 
118 void
120 {
121  doForAllEnabledOutputWorkers_([&srp](auto w) { w->writeSubRun(srp); });
123  subRunRangeSetHandler_->rebase();
124  }
125 }
126 
127 void
129 {
130  doForAllEnabledOutputWorkers_([this, &ep](auto w) {
131  auto const& md = w->description();
132  actReg_.sPreWriteEvent.invoke(md);
133  w->writeEvent(ep);
134  actReg_.sPostWriteEvent.invoke(md);
135  });
136  auto const& eid = ep.id();
137  bool const lastInSubRun{ep.isLastInSubRun()};
138  runRangeSetHandler_->update(eid, lastInSubRun);
139  subRunRangeSetHandler_->update(eid, lastInSubRun);
140 }
141 
142 void
143 art::EndPathExecutor::seedRunRangeSet(std::unique_ptr<RangeSetHandler> rsh)
144 {
145  runRangeSetHandler_ = std::move(rsh);
146 }
147 
148 void
149 art::EndPathExecutor::seedSubRunRangeSet(std::unique_ptr<RangeSetHandler> rsh)
150 {
151  subRunRangeSetHandler_ = std::move(rsh);
152 }
153 
154 void
156 {
157  assert(subRunRangeSetHandler_);
158  assert(runRangeSetHandler_);
159  // Ranges are split/flushed only for a RangeSetHandler whose dynamic
160  // type is 'ClosedRangeSetHandler'. The implementations for the
161  // 'OpenRangeSetHandler' are nops.
162  //
163  // Consider the following range-sets
164  // SubRun RangeSet: { Run 1 : SubRun 1 : Events [1,7) } <-- Current iterator
165  // of handler Run RangeSet: { Run 1 : SubRun 0 : Events [5,11)
166  // SubRun 1 : Events [1,7) <-- Current iterator
167  // of handler SubRun 1 : Events [9,15) }
169  // For a range split just before SubRun 1, Event 6, the range sets
170  // should become:
171  //
172  // SubRun RangeSet: { Run 1 : SubRun 1 : Events [1,6)
173  // SubRun 1 : Events [6,7) } <-- Updated
174  // iterator of handler
175  // Run RangeSet: { Run 1 : SubRun 0 : Events [5,11)
176  // SubRun 1 : Events [1,6)
177  // SubRun 1 : Events [6,7) <-- Updated
178  // iterator of handler SubRun 1 : Events [9,15)
179  // }
180  subRunRangeSetHandler_->maybeSplitRange();
181  runRangeSetHandler_->maybeSplitRange();
182  } else {
183  subRunRangeSetHandler_->flushRanges();
184  }
185  auto const& ranges = subRunRangeSetHandler_->seenRanges();
186  srp.updateSeenRanges(ranges);
188  [&ranges](auto w) { w->setSubRunAuxiliaryRangeSetID(ranges); });
189 }
190 
191 void
193 {
194  assert(runRangeSetHandler_);
196  runRangeSetHandler_->flushRanges();
197  }
198  auto const& ranges = runRangeSetHandler_->seenRanges();
199  rp.updateSeenRanges(ranges);
201  [&ranges](auto w) { w->setRunAuxiliaryRangeSetID(ranges); });
202 }
203 
204 void
206 {
208  [&productList](auto w) { w->selectProducts(productList); });
209 }
210 
211 void
213 {
214  doForAllEnabledOutputWorkers_([this, b](auto ow) {
215  // We need to support the following case:
216  //
217  // fileProperties: {
218  // maxEvents: 10
219  // maxRuns: 1
220  // granularity: Event
221  // }
222  //
223  // If a request to close is made on a run boundary, but the
224  // granularity is still Event, then the file should close. For
225  // that reason, the comparison is 'granularity > b' instead of
226  // 'granularity != b'.
227 
228  auto const granularity = ow->fileGranularity();
229  if (granularity > b || !ow->requestsToCloseFile())
230  return;
231 
232  // Technical note: although the outputWorkersToClose_ container
233  // is "moved from" in closeSomeOutputFiles, it is safe to call
234  // 'insert' vis-a-vis the [lib.types.movedfrom] section of the
235  // standard. There are no preconditions for std::set::insert,
236  // so no state-checking is required.
237  outputWorkersToClose_.insert(ow);
238  });
239 }
240 
241 void
243 {
245  [](auto ow) { ow->incrementInputFileNumber(); });
246 }
247 
248 bool
250 {
251  return !outputWorkersToOpen_.empty();
252 }
253 
254 bool
256 {
257  return !outputWorkersToClose_.empty();
258 }
259 
260 bool
262 {
263  return std::any_of(outputWorkers_.cbegin(),
264  outputWorkers_.cend(),
265  [](auto ow) { return ow->fileIsOpen(); });
266 }
267 
268 void
270 {
271  auto invoke_sPreCloseOutputFile = [this](auto ow) {
272  actReg_.sPreCloseOutputFile.invoke(ow->label());
273  };
274  auto closeFile = [](auto ow) { ow->closeFile(); };
275  auto invoke_sPostCloseOutputFile = [this](auto ow) {
277  OutputFileInfo{ow->label(), ow->lastClosedFileName()});
278  };
279 
281  cet::for_all(outputWorkersToClose_, invoke_sPreCloseOutputFile);
282  cet::for_all(outputWorkersToClose_, closeFile);
283  cet::for_all(outputWorkersToClose_, invoke_sPostCloseOutputFile);
285 }
286 
287 void
289 {
290  doForAllEnabledOutputWorkers_([ofs](auto ow) { ow->setFileStatus(ofs); });
291  fileStatus_ = ofs;
292 }
293 
294 void
296 {
297  auto openFile = [&fb](auto ow) { ow->openFile(fb); };
298  auto invoke_sPostOpenOutputFile = [this](auto ow) {
299  actReg_.sPostOpenOutputFile.invoke(ow->label());
300  };
301 
302  cet::for_all(outputWorkersToOpen_, openFile);
303  cet::for_all(outputWorkersToOpen_, invoke_sPostOpenOutputFile);
305 
306  outputWorkersToOpen_.clear();
307 }
308 
309 void
311 {
312  doForAllEnabledWorkers_([&fb](auto w) { w->respondToOpenInputFile(fb); });
313 }
314 
315 void
317 {
318  doForAllEnabledWorkers_([&fb](auto w) { w->respondToCloseInputFile(fb); });
319 }
320 
321 void
323 {
324  doForAllEnabledWorkers_([&fb](auto w) { w->respondToOpenOutputFiles(fb); });
325 }
326 
327 void
329 {
330  doForAllEnabledWorkers_([&fb](auto w) { w->respondToCloseOutputFiles(fb); });
331 }
332 
333 void
335 {
336  doForAllEnabledWorkers_([](auto w) { w->beginJob(); });
337 }
338 
339 void
341 {
342  doForAllEnabledWorkers_([](auto w) { w->reset(); });
343 }
std::vector< unsigned char > workersEnabled_
std::map< std::string, std::unique_ptr< Worker >> WorkerMap
Definition: WorkerMap.h:10
void endJob()
Definition: Worker.cc:68
bool outputsToClose() const
void writeEvent(EventPrincipal &ep)
ActionTable * act_table_
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreWriteEvent
bool terminate() const
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &)> sPostOpenOutputFile
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
EndPathExecutor(PathManager &pm, ActionTable &actions, ActivityRegistry &areg, MasterProductRegistry &mpr)
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostWriteEvent
void openFile(FileBlock const &fb)
Definition: OutputWorker.cc:48
void doForAllEnabledWorkers_(F f)
std::map< BranchKey, BranchDescription > ProductList
Definition: ProductList.h:15
std::string const & lastClosedFileName() const
Definition: OutputWorker.cc:22
void writeSubRun(SubRunPrincipal &srp)
void respondToOpenInputFile(FileBlock const &fb)
void respondToOpenOutputFiles(FileBlock const &fb)
void setAuxiliaryRangeSetID(SubRunPrincipal &srp)
void seedSubRunRangeSet(std::unique_ptr< RangeSetHandler >)
void recordOutputClosureRequests(Granularity)
void openAllOutputFiles(FileBlock &fb)
void respondToCloseOutputFiles(FileBlock const &fb)
void openSomeOutputFiles(FileBlock const &fb)
OutputFileStatus
WorkerMap const & workers() const
Definition: PathsInfo.h:81
void updateSeenRanges(RangeSet const &rs)
void setOutputFileStatus(OutputFileStatus)
std::vector< evd::details::RawDigitInfo_t >::const_iterator begin(RawDigitCacheDataClass const &cache)
TFile fb("Li6.root")
bool someOutputsOpen() const
bool isLastInSubRun() const
OutputWorkers outputWorkers_
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler_
PathsInfo & endPathInfo()
Definition: PathManager.cc:132
void writeRun(RunPrincipal &rp)
bool outputsToOpen() const
OutputWorkerSet outputWorkersToOpen_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
void selectProducts(ProductList const &)
void seedRunRangeSet(std::unique_ptr< RangeSetHandler >)
void respondToCloseInputFile(FileBlock const &fb)
GlobalSignal< detail::SignalResponseType::FIFO, void(std::string const &)> sPreCloseOutputFile
GlobalSignal< detail::SignalResponseType::LIFO, void(OutputFileInfo const &)> sPostCloseOutputFile
EventID const & id() const
ActivityRegistry & actReg_
std::vector< unsigned char > outputWorkersEnabled_
std::vector< evd::details::RawDigitInfo_t >::const_iterator end(RawDigitCacheDataClass const &cache)
void doForAllEnabledOutputWorkers_(F f)
OutputFileStatus fileStatus_
Float_t e
Definition: plot.C:34
std::string const & label() const
Definition: Worker.h:127
OutputWorkerSet outputWorkersToClose_
void updateSeenRanges(RangeSet const &rs)
Definition: RunPrincipal.h:92
Float_t w
Definition: plot.C:23
std::unique_ptr< RangeSetHandler > runRangeSetHandler_
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33