LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
art::EndPathExecutor Class Reference

#include "EndPathExecutor.h"

Classes

class  PathsDoneTask
 

Public Member Functions

 EndPathExecutor (ScheduleID sid, PathManager &pm, ActionTable const &actions, UpdateOutputCallbacks &callbacks, GlobalTaskGroup &task_group)
 
 EndPathExecutor (EndPathExecutor &&)=delete
 
EndPathExecutoroperator= (EndPathExecutor &&)=delete
 
 EndPathExecutor (EndPathExecutor const &)=delete
 
EndPathExecutoroperator= (EndPathExecutor const &)=delete
 
void beginJob (detail::SharedResources const &resources)
 
void endJob ()
 
void selectProducts (ProductTables const &)
 
void respondToOpenInputFile (FileBlock const &fb)
 
void respondToCloseInputFile (FileBlock const &fb)
 
void respondToOpenOutputFiles (FileBlock const &fb)
 
void respondToCloseOutputFiles (FileBlock const &fb)
 
bool someOutputsOpen () const
 
void closeAllOutputFiles ()
 
void seedRunRangeSet (RangeSetHandler const &)
 
void setRunAuxiliaryRangeSetID (RangeSet const &rs)
 
void writeRun (RunPrincipal &rp)
 
void seedSubRunRangeSet (RangeSetHandler const &)
 
void setSubRunAuxiliaryRangeSetID (RangeSet const &rs)
 
void writeSubRun (SubRunPrincipal &srp)
 
void process (Transition, Principal &)
 
void process_event (hep::concurrency::WaitingTaskPtr finalizeEventTask, EventPrincipal &)
 
void writeEvent (EventPrincipal &)
 
bool outputsToClose () const
 
void closeSomeOutputFiles ()
 
bool outputsToOpen () const
 
void openSomeOutputFiles (FileBlock const &fb)
 
void setOutputFileStatus (OutputFileStatus)
 
void recordOutputClosureRequests (Granularity)
 
void incrementInputFileNumber ()
 

Private Attributes

ScheduleContext const sc_
 
ActionTable const & actionTable_
 
PathsInfoendPathInfo_
 
GlobalTaskGrouptaskGroup_
 
std::vector< OutputWorker * > outputWorkers_ {}
 
std::unique_ptr< RangeSetHandlerrunRangeSetHandler_ {nullptr}
 
std::unique_ptr< RangeSetHandlersubRunRangeSetHandler_ {nullptr}
 
std::atomic< OutputFileStatusfileStatus_ {OutputFileStatus::Closed}
 
std::set< OutputWorker * > outputWorkersToOpen_ {}
 
std::set< OutputWorker * > outputWorkersToClose_ {}
 

Friends

class Schedule
 

Detailed Description

Definition at line 41 of file EndPathExecutor.h.

Constructor & Destructor Documentation

art::EndPathExecutor::EndPathExecutor ( ScheduleID  sid,
PathManager pm,
ActionTable const &  actions,
UpdateOutputCallbacks callbacks,
GlobalTaskGroup task_group 
)

Definition at line 44 of file EndPathExecutor.cc.

References actionTable_, endPathInfo_, outputWorkers_, outputWorkersToOpen_, selectProducts(), taskGroup_, util::values(), and art::PathsInfo::workers().

49  : sc_{sid}
50  , actionTable_{actionTable}
51  , endPathInfo_{pm.endPathInfo(sid)}
52  , taskGroup_{group}
53  {
54  for (auto const& worker :
56  assert(sid == worker->scheduleID());
57  if (auto owp = std::dynamic_pointer_cast<OutputWorker>(worker)) {
58  outputWorkers_.emplace_back(owp.get());
59  }
60  }
61  outputWorkersToOpen_.insert(outputWorkers_.cbegin(), outputWorkers_.cend());
62  outputCallbacks.registerCallback(
63  [this](auto const& tables) { selectProducts(tables); });
64  }
std::map< std::string, std::shared_ptr< Worker > > & workers()
Definition: PathsInfo.cc:22
void selectProducts(ProductTables const &)
decltype(auto) values(Coll &&coll)
Range-for loop helper iterating across the values of the specified collection.
ScheduleContext const sc_
std::set< OutputWorker * > outputWorkersToOpen_
ActionTable const & actionTable_
GlobalTaskGroup & taskGroup_
std::vector< OutputWorker * > outputWorkers_
art::EndPathExecutor::EndPathExecutor ( EndPathExecutor &&  )
delete
art::EndPathExecutor::EndPathExecutor ( EndPathExecutor const &  )
delete

Member Function Documentation

void art::EndPathExecutor::beginJob ( detail::SharedResources const &  resources)

Definition at line 67 of file EndPathExecutor.cc.

References endPathInfo_.

Referenced by art::Schedule::beginJob().

68  {
69  for (auto& worker : unique_workers(endPathInfo_)) {
70  worker.beginJob(resources);
71  }
72  }
void art::EndPathExecutor::closeAllOutputFiles ( )

Definition at line 150 of file EndPathExecutor.cc.

References outputWorkers_.

151  {
152  for (auto ow : outputWorkers_) {
153  ow->closeFile();
154  }
155  }
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::closeSomeOutputFiles ( )

Definition at line 345 of file EndPathExecutor.cc.

References outputWorkersToClose_, outputWorkersToOpen_, setOutputFileStatus(), and art::Switching.

346  {
348  for (auto ow : outputWorkersToClose_) {
349  // Skip files that are already closed due to other end-path
350  // executors already closing them.
351  if (!ow->fileIsOpen()) {
352  continue;
353  }
354  ow->closeFile();
355  }
356  outputWorkersToOpen_ = std::move(outputWorkersToClose_);
357  }
void setOutputFileStatus(OutputFileStatus)
std::set< OutputWorker * > outputWorkersToClose_
std::set< OutputWorker * > outputWorkersToOpen_
void art::EndPathExecutor::endJob ( )

Definition at line 75 of file EndPathExecutor.cc.

References e, art::errors::EndJobFailure, and endPathInfo_.

Referenced by art::Schedule::endJob().

76  {
78  // FIXME: There seems to be little value-added by the catch and rethrow
79  // here.
80  for (auto& worker : unique_workers(endPathInfo_)) {
81  try {
82  worker.endJob();
83  }
84  catch (cet::exception& e) {
85  error << "cet::exception caught in Schedule::endJob\n"
86  << e.explain_self();
87  throw error;
88  }
89  catch (exception& e) {
90  error << "Standard library exception caught in Schedule::endJob\n"
91  << e.what();
92  throw error;
93  }
94  catch (...) {
95  error << "Unknown exception caught in Schedule::endJob\n";
96  throw error;
97  }
98  }
99  }
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
Float_t e
Definition: plot.C:35
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::EndPathExecutor::incrementInputFileNumber ( )

Definition at line 410 of file EndPathExecutor.cc.

References outputWorkers_.

411  {
412  for (auto ow : outputWorkers_) {
413  ow->incrementInputFileNumber();
414  }
415  }
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::openSomeOutputFiles ( FileBlock const &  fb)

Definition at line 366 of file EndPathExecutor.cc.

References art::Open, outputWorkersToOpen_, and setOutputFileStatus().

367  {
368  for (auto ow : outputWorkersToOpen_) {
369  ow->openFile(fb);
370  }
372  outputWorkersToOpen_.clear();
373  }
void setOutputFileStatus(OutputFileStatus)
TFile fb("Li6.root")
std::set< OutputWorker * > outputWorkersToOpen_
EndPathExecutor& art::EndPathExecutor::operator= ( EndPathExecutor &&  )
delete
EndPathExecutor& art::EndPathExecutor::operator= ( EndPathExecutor const &  )
delete
bool art::EndPathExecutor::outputsToClose ( ) const

Definition at line 331 of file EndPathExecutor.cc.

References outputWorkersToClose_.

332  {
333  return !outputWorkersToClose_.empty();
334  }
std::set< OutputWorker * > outputWorkersToClose_
bool art::EndPathExecutor::outputsToOpen ( ) const

Definition at line 360 of file EndPathExecutor.cc.

References outputWorkersToOpen_.

361  {
362  return !outputWorkersToOpen_.empty();
363  }
std::set< OutputWorker * > outputWorkersToOpen_
void art::EndPathExecutor::process ( Transition  trans,
Principal principal 
)

Definition at line 214 of file EndPathExecutor.cc.

References endPathInfo_, art::errors::EventProcessorFailure, art::PathsInfo::incrementPassedEventCount(), and art::PathsInfo::paths().

Referenced by art::Schedule::process().

215  {
216  for (auto& worker : unique_workers(endPathInfo_)) {
217  worker.reset();
218  }
219  try {
220  if (!endPathInfo_.paths().empty()) {
221  endPathInfo_.paths().front().process(trans, principal);
222  }
223  }
224  catch (cet::exception& ex) {
225  throw Exception(errors::EventProcessorFailure, "EndPathExecutor:")
226  << "an exception occurred during current event processing\n"
227  << ex;
228  }
229  catch (...) {
230  mf::LogError("PassingThrough")
231  << "an exception occurred during current event processing\n";
232  throw;
233  }
235  }
std::vector< Path > & paths()
Definition: PathsInfo.cc:54
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
void incrementPassedEventCount()
Definition: PathsInfo.cc:104
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::EndPathExecutor::process_event ( hep::concurrency::WaitingTaskPtr  finalizeEventTask,
EventPrincipal  
)

Definition at line 290 of file EndPathExecutor.cc.

References endPathInfo_, art::ScheduleContext::id(), art::PathsInfo::incrementTotalEventCount(), art::GlobalTaskGroup::may_run(), art::PathsInfo::paths(), art::PathsInfo::reset_for_event(), sc_, taskGroup_, TDEBUG_BEGIN_FUNC_SI, and TDEBUG_END_FUNC_SI.

Referenced by art::Schedule::process_event_observers().

292  {
293  auto const sid = sc_.id();
294  TDEBUG_BEGIN_FUNC_SI(4, sid);
297  try {
298  auto pathsDoneTask =
299  make_waiting_task<PathsDoneTask>(this, finalizeEventTask, taskGroup_);
300  if (endPathInfo_.paths().empty()) {
301  taskGroup_.may_run(pathsDoneTask);
302  } else {
303  endPathInfo_.paths().front().process(pathsDoneTask, ep);
304  }
305  }
306  catch (...) {
307  taskGroup_.may_run(finalizeEventTask, current_exception());
308  }
309  TDEBUG_END_FUNC_SI(4, sid);
310  }
std::vector< Path > & paths()
Definition: PathsInfo.cc:54
ScheduleContext const sc_
void reset_for_event()
Definition: PathsInfo.cc:85
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
void may_run(hep::concurrency::WaitingTaskPtr task, std::exception_ptr ex_ptr={})
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
GlobalTaskGroup & taskGroup_
void incrementTotalEventCount()
Definition: PathsInfo.cc:98
void art::EndPathExecutor::recordOutputClosureRequests ( Granularity  atBoundary)

Definition at line 395 of file EndPathExecutor.cc.

References outputWorkers_, and outputWorkersToClose_.

396  {
397  for (auto ow : outputWorkers_) {
398  if (atBoundary < ow->fileGranularity()) {
399  // The boundary we are checking at is finer than the checks
400  // the output worker needs, nothing to do.
401  continue;
402  }
403  if (ow->requestsToCloseFile()) {
404  outputWorkersToClose_.insert(ow);
405  }
406  }
407  }
std::set< OutputWorker * > outputWorkersToClose_
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::respondToCloseInputFile ( FileBlock const &  fb)

Definition at line 118 of file EndPathExecutor.cc.

References endPathInfo_.

Referenced by art::Schedule::respondToCloseInputFile().

119  {
120  for (auto& worker : unique_workers(endPathInfo_)) {
121  worker.respondToCloseInputFile(fb);
122  }
123  }
TFile fb("Li6.root")
void art::EndPathExecutor::respondToCloseOutputFiles ( FileBlock const &  fb)

Definition at line 134 of file EndPathExecutor.cc.

References endPathInfo_.

Referenced by art::Schedule::respondToCloseOutputFiles().

135  {
136  for (auto& worker : unique_workers(endPathInfo_)) {
137  worker.respondToCloseOutputFiles(fb);
138  }
139  }
TFile fb("Li6.root")
void art::EndPathExecutor::respondToOpenInputFile ( FileBlock const &  fb)

Definition at line 110 of file EndPathExecutor.cc.

References endPathInfo_.

Referenced by art::Schedule::respondToOpenInputFile().

111  {
112  for (auto& worker : unique_workers(endPathInfo_)) {
113  worker.respondToOpenInputFile(fb);
114  }
115  }
TFile fb("Li6.root")
void art::EndPathExecutor::respondToOpenOutputFiles ( FileBlock const &  fb)

Definition at line 126 of file EndPathExecutor.cc.

References endPathInfo_.

Referenced by art::Schedule::respondToOpenOutputFiles().

127  {
128  for (auto& worker : unique_workers(endPathInfo_)) {
129  worker.respondToOpenOutputFiles(fb);
130  }
131  }
TFile fb("Li6.root")
void art::EndPathExecutor::seedRunRangeSet ( RangeSetHandler const &  rsh)

Definition at line 158 of file EndPathExecutor.cc.

References art::RangeSetHandler::clone(), and runRangeSetHandler_.

159  {
160  runRangeSetHandler_.reset(rsh.clone());
161  }
std::unique_ptr< RangeSetHandler > runRangeSetHandler_
void art::EndPathExecutor::seedSubRunRangeSet ( RangeSetHandler const &  rsh)

Definition at line 183 of file EndPathExecutor.cc.

References art::RangeSetHandler::clone(), and subRunRangeSetHandler_.

184  {
185  subRunRangeSetHandler_.reset(rsh.clone());
186  }
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler_
void art::EndPathExecutor::selectProducts ( ProductTables const &  tables)

Definition at line 102 of file EndPathExecutor.cc.

References outputWorkers_.

Referenced by EndPathExecutor().

103  {
104  for (auto ow : outputWorkers_) {
105  ow->selectProducts(tables);
106  }
107  }
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::setOutputFileStatus ( OutputFileStatus  ofs)

Definition at line 386 of file EndPathExecutor.cc.

References fileStatus_, and outputWorkers_.

Referenced by closeSomeOutputFiles(), and openSomeOutputFiles().

387  {
388  for (auto ow : outputWorkers_) {
389  ow->setFileStatus(ofs);
390  }
391  fileStatus_ = ofs;
392  }
std::atomic< OutputFileStatus > fileStatus_
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::setRunAuxiliaryRangeSetID ( RangeSet const &  rs)

Definition at line 164 of file EndPathExecutor.cc.

References outputWorkers_.

165  {
166  for (auto ow : outputWorkers_) {
167  ow->setRunAuxiliaryRangeSetID(rangeSet);
168  }
169  }
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::setSubRunAuxiliaryRangeSetID ( RangeSet const &  rs)

Definition at line 189 of file EndPathExecutor.cc.

References outputWorkers_.

190  {
191  for (auto ow : outputWorkers_) {
192  // For RootOutput this enters the possibly split range set into
193  // the range set db.
194  ow->setSubRunAuxiliaryRangeSetID(rs);
195  }
196  }
std::vector< OutputWorker * > outputWorkers_
bool art::EndPathExecutor::someOutputsOpen ( ) const

Definition at line 142 of file EndPathExecutor.cc.

References outputWorkers_.

143  {
144  return any_of(outputWorkers_.cbegin(), outputWorkers_.cend(), [](auto ow) {
145  return ow->fileIsOpen();
146  });
147  }
std::vector< OutputWorker * > outputWorkers_
void art::EndPathExecutor::writeEvent ( EventPrincipal ep)

Definition at line 313 of file EndPathExecutor.cc.

References art::PathContext::end_path_spec(), art::EventPrincipal::eventID(), art::ScheduleContext::id(), art::EventPrincipal::isLastInSubRun(), outputWorkers_, runRangeSetHandler_, sc_, subRunRangeSetHandler_, and TDEBUG_FUNC_SI.

314  {
315  // We don't worry about providing the sorted list of module names
316  // for the end_path right now. If users decide it is necessary to
317  // know what they are, then we can provide them.
318  PathContext const pc{sc_, PathContext::end_path_spec(), {}};
319  for (auto ow : outputWorkers_) {
320  ow->writeEvent(ep, pc);
321  }
322  auto const& eid = ep.eventID();
323  bool const lastInSubRun{ep.isLastInSubRun()};
324  TDEBUG_FUNC_SI(5, sc_.id())
325  << "eid: " << eid.run() << ", " << eid.subRun() << ", " << eid.event();
326  runRangeSetHandler_->update(eid, lastInSubRun);
327  subRunRangeSetHandler_->update(eid, lastInSubRun);
328  }
ScheduleContext const sc_
#define TDEBUG_FUNC_SI(LEVEL, SI)
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler_
static auto end_path_spec()
Definition: PathContext.h:20
std::vector< OutputWorker * > outputWorkers_
std::unique_ptr< RangeSetHandler > runRangeSetHandler_
void art::EndPathExecutor::writeRun ( RunPrincipal rp)

Definition at line 172 of file EndPathExecutor.cc.

References fileStatus_, outputWorkers_, runRangeSetHandler_, and art::Switching.

173  {
174  for (auto ow : outputWorkers_) {
175  ow->writeRun(rp);
176  }
178  runRangeSetHandler_->rebase();
179  }
180  }
std::atomic< OutputFileStatus > fileStatus_
std::vector< OutputWorker * > outputWorkers_
std::unique_ptr< RangeSetHandler > runRangeSetHandler_
void art::EndPathExecutor::writeSubRun ( SubRunPrincipal srp)

Definition at line 199 of file EndPathExecutor.cc.

References fileStatus_, outputWorkers_, subRunRangeSetHandler_, and art::Switching.

200  {
201  for (auto ow : outputWorkers_) {
202  ow->writeSubRun(srp);
203  }
205  subRunRangeSetHandler_->rebase();
206  }
207  }
std::atomic< OutputFileStatus > fileStatus_
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler_
std::vector< OutputWorker * > outputWorkers_

Friends And Related Function Documentation

friend class Schedule
friend

Definition at line 42 of file EndPathExecutor.h.

Member Data Documentation

ActionTable const& art::EndPathExecutor::actionTable_
private

Definition at line 124 of file EndPathExecutor.h.

Referenced by EndPathExecutor().

std::atomic<OutputFileStatus> art::EndPathExecutor::fileStatus_ {OutputFileStatus::Closed}
private

Definition at line 135 of file EndPathExecutor.h.

Referenced by setOutputFileStatus(), writeRun(), and writeSubRun().

std::set<OutputWorker*> art::EndPathExecutor::outputWorkersToClose_ {}
private
std::set<OutputWorker*> art::EndPathExecutor::outputWorkersToOpen_ {}
private
std::unique_ptr<RangeSetHandler> art::EndPathExecutor::runRangeSetHandler_ {nullptr}
private

Definition at line 130 of file EndPathExecutor.h.

Referenced by seedRunRangeSet(), writeEvent(), and writeRun().

ScheduleContext const art::EndPathExecutor::sc_
private

Definition at line 120 of file EndPathExecutor.h.

Referenced by process_event(), and writeEvent().

std::unique_ptr<RangeSetHandler> art::EndPathExecutor::subRunRangeSetHandler_ {nullptr}
private

Definition at line 132 of file EndPathExecutor.h.

Referenced by seedSubRunRangeSet(), writeEvent(), and writeSubRun().


The documentation for this class was generated from the following files: