LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
art::EventProcessor Class Reference

#include "EventProcessor.h"

Classes

class  EndPathRunnerTask
 
class  EndPathTask
 

Public Types

enum  StatusCode { epSuccess = 0, epSignal = 3 }
 

Public Member Functions

 EventProcessor (fhicl::ParameterSet pset, detail::EnabledModules enabled_modules)
 
 ~EventProcessor ()
 
 EventProcessor (EventProcessor const &)=delete
 
 EventProcessor (EventProcessor &&)=delete
 
EventProcessoroperator= (EventProcessor const &)=delete
 
EventProcessoroperator= (EventProcessor &&)=delete
 
StatusCode runToCompletion ()
 

Private Types

template<typename T >
using tsan = hep::concurrency::thread_sanitize< T >
 
template<typename T >
using tsan_unique_ptr = hep::concurrency::thread_sanitize_unique_ptr< T >
 

Private Member Functions

void processAllEventsAsync (ScheduleID sid)
 
void readAndProcessAsync (ScheduleID sid)
 
void processEventAsync (ScheduleID sid)
 
void finishEventAsync (ScheduleID sid)
 
template<Level L>
bool levelsToProcess ()
 
template<Level L>
std::enable_if_t< is_above_most_deeply_nested_level(L)> begin ()
 
template<Level L>
void process ()
 
template<Level L>
void finalize ()
 
template<Level L>
void finalizeContainingLevels ()
 
template<Level L>
void recordOutputModuleClosureRequests ()
 
Level advanceItemType ()
 
void beginJob ()
 
void endJob ()
 
void endJobAllSchedules ()
 
void openInputFile ()
 
bool outputsToOpen ()
 
void openSomeOutputFiles ()
 
void closeInputFile ()
 
void closeSomeOutputFiles ()
 
void closeAllOutputFiles ()
 
void closeAllFiles ()
 
void respondToOpenInputFile ()
 
void respondToCloseInputFile ()
 
void respondToOpenOutputFiles ()
 
void respondToCloseOutputFiles ()
 
void readRun ()
 
void beginRun ()
 
void beginRunIfNotDoneAlready ()
 
void setRunAuxiliaryRangeSetID ()
 
void endRun ()
 
void writeRun ()
 
void readSubRun ()
 
void beginSubRun ()
 
void beginSubRunIfNotDoneAlready ()
 
void setSubRunAuxiliaryRangeSetID ()
 
void endSubRun ()
 
void writeSubRun ()
 
void readEvent ()
 
void processEvent ()
 
void writeEvent ()
 
void setOutputFileStatus (OutputFileStatus)
 
void invokePostBeginJobWorkers_ ()
 
void terminateAbnormally_ ()
 
Scheduleschedule (ScheduleID const id)
 
Schedulemain_schedule ()
 
actions::ActionCodes error_action (cet::exception &e) const
 
template<>
void process ()
 

Private Attributes

std::atomic< LevelnextLevel_ {Level::ReadyToAdvance}
 
tsan< detail::ExceptionCollectorec_ {}
 
tsan< cet::cpu_timer > timer_ {}
 
std::atomic< bool > beginRunCalled_ {false}
 
std::atomic< bool > beginSubRunCalled_ {false}
 
std::atomic< bool > finalizeRunEnabled_ {false}
 
std::atomic< bool > finalizeSubRunEnabled_ {false}
 
ActivityRegistry actReg_ {}
 
tsan< MFStatusUpdatermfStatusUpdater_ {actReg_}
 
tsan< UpdateOutputCallbacksoutputCallbacks_ {}
 
tsan< ProductDescriptionsproducedProductDescriptions_ {}
 
tsan< ProductTablesproducedProductLookupTables_ {ProductTables::invalid()}
 
tsan< ProducingServiceSignalspsSignals_ {}
 
tsan< Schedulerscheduler_
 
std::unique_ptr< GlobalTaskGrouptaskGroup_ {nullptr}
 
detail::SharedResources sharedResources_ {}
 
ScheduleIteration scheduleIteration_
 
tsan_unique_ptr< ServicesManagerservicesManager_
 
tsan< PathManagerpathManager_
 
tsan_unique_ptr< InputSourceinput_ {nullptr}
 
tsan< std::map< ScheduleID, Schedule > > schedules_ {}
 
tsan_unique_ptr< FileBlockfb_ {nullptr}
 
tsan_unique_ptr< RunPrincipalrunPrincipal_ {nullptr}
 
tsan_unique_ptr< SubRunPrincipalsubRunPrincipal_ {nullptr}
 
PerScheduleContainer< std::unique_ptr< EventPrincipal > > eventPrincipals_ {}
 
bool const handleEmptyRuns_
 
bool const handleEmptySubRuns_
 
SharedException sharedException_
 
std::atomic< bool > firstEvent_ {true}
 
std::atomic< bool > fileSwitchInProgress_ {false}
 

Detailed Description

Definition at line 43 of file EventProcessor.h.

Member Typedef Documentation

template<typename T >
using art::EventProcessor::tsan = hep::concurrency::thread_sanitize<T>
private

Definition at line 140 of file EventProcessor.h.

template<typename T >
using art::EventProcessor::tsan_unique_ptr = hep::concurrency::thread_sanitize_unique_ptr<T>
private

Definition at line 143 of file EventProcessor.h.

Member Enumeration Documentation

Enumerator
epSuccess 
epSignal 

Definition at line 49 of file EventProcessor.h.

Constructor & Destructor Documentation

art::EventProcessor::EventProcessor ( fhicl::ParameterSet  pset,
detail::EnabledModules  enabled_modules 
)
explicit

Definition at line 87 of file EventProcessor.cc.

References actReg_, art::errors::Configuration, e, util::end(), FDEBUG, art::detail::SharedResources::freeze(), fhicl::ParameterSet::get(), art::getReleaseVersion(), handleEmptyRuns_, handleEmptySubRuns_, art::ProcessConfiguration::id(), fhicl::ParameterSet::id(), input_, art::Globals::instance(), art::thread_safe_registry_via_id< K, M >::instance(), art::ConsumesInfo::instance(), art::legacy, art::InputSourceFactory::make(), art::Globals::nschedules(), outputCallbacks_, pathManager_, art::Globals::processName(), producedProductDescriptions_, producedProductLookupTables_, psSignals_, fhicl::ParameterSet::put(), scheduleIteration_, scheduler_, schedules_, servicesManager_, art::ConsumesInfo::setRequireConsumes(), art::setupSignals(), sharedResources_, art::ActivityRegistry::sPostSourceConstruction, taskGroup_, TDEBUG_FUNC, art::Globals::triggerPSet(), fhicl::detail::validationException::what(), and x.

89  : scheduler_{pset.get<ParameterSet>("services.scheduler")}
90  , scheduleIteration_{scheduler_->num_schedules()}
91  , servicesManager_{create_services_manager(
92  pset.get<ParameterSet>("services"),
93  actReg_,
95  , pathManager_{pset,
98  scheduler_->actionTable(),
99  actReg_,
100  std::move(enabled_modules)}
101  , handleEmptyRuns_{scheduler_->handleEmptyRuns()}
102  , handleEmptySubRuns_{scheduler_->handleEmptySubRuns()}
103  {
104  auto services_pset = pset.get<ParameterSet>("services");
105  auto const scheduler_pset = services_pset.get<ParameterSet>("scheduler");
106  {
107  // FIXME: Signals and threads require more effort than this! A
108  // signal is delivered to only one thread, and which
109  // thread is left up to the implementation to decide. To
110  // get control we must block all signals in the main
111  // thread, create a new thread which will handle the
112  // signals we want to handle, unblock the signals in that
113  // thread only, and have it use sigwaitinfo() to suspend
114  // itselt and wait for those signals.
115  setupSignals(scheduler_pset.get<bool>("enableSigInt", true));
116  }
120  // We do this late because the floating point control word, signal
121  // masks, etc., are per-thread and inherited from the master
122  // thread, so we want to allow system services, user services, and
123  // modules to configure these things in their constructors before
124  // we let tbb create any threads. This means they cannot use tbb
125  // in their constructors, instead they must use the beginJob
126  // callout.
127  taskGroup_ = scheduler_->global_task_group();
128  // Whenever we are ready to enable ROOT's implicit MT, which is
129  // equivalent to its use of TBB, the call should be made after our
130  // own TBB task manager has been initialized.
131  // ROOT::EnableImplicitMT();
132  TDEBUG_FUNC(5) << "nschedules: " << scheduler_->num_schedules()
133  << " nthreads: " << scheduler_->num_threads();
134 
135  auto const errorOnMissingConsumes = scheduler_->errorOnMissingConsumes();
136  ConsumesInfo::instance()->setRequireConsumes(errorOnMissingConsumes);
137 
138  auto const& processName = Globals::instance()->processName();
139 
140  // Trigger-names
141  servicesManager_->addSystemService<TriggerNamesService>(
143  pset.get<ParameterSet>("physics", {}));
144 
145  // We have delayed creating the service instances, now actually
146  // create them.
147  servicesManager_->forceCreation();
148  ServiceHandle<FileCatalogMetadata>()->addMetadataString("art.process_name",
149  processName);
150 
151  // Now that the service module instances have been created we can
152  // set the callbacks, set the module description, and register the
153  // products for each service module instance.
154  ProcessConfiguration const pc{processName, pset.id(), getReleaseVersion()};
155  auto const producing_services = servicesManager_->registerProducts(
156  producedProductDescriptions_, psSignals_, pc);
157  pathManager_->createModulesAndWorkers(
158  *taskGroup_, sharedResources_, producing_services);
159 
160  ServiceHandle<TriggerNamesService> trigger_names [[maybe_unused]];
161  auto const end = Globals::instance()->nschedules();
162  for (ScheduleID::size_type i = 0; i != end; ++i) {
163  ScheduleID const sid{i};
164 
165  // The ordering of the path results in the TriggerPathsInfo (which is used
166  // for the TriggerResults object), must be the same as that provided by
167  // the TriggerNamesService.
168  auto& trigger_paths_info [[maybe_unused]] =
169  pathManager_->triggerPathsInfo(sid);
170  assert(trigger_names->getTrigPaths() == trigger_paths_info.pathNames());
171 
172  schedules_->emplace(std::piecewise_construct,
173  std::forward_as_tuple(sid),
174  std::forward_as_tuple(sid,
175  pathManager_,
176  scheduler_->actionTable(),
177  actReg_,
179  *taskGroup_));
180  }
181  sharedResources_.freeze(taskGroup_->native_group());
182 
183  FDEBUG(2) << pset.to_string() << endl;
184  // The input source must be created after the end path executor
185  // because the end path executor registers a callback that must
186  // be invoked after the first input file is opened.
187  {
188  ParameterSet main_input;
189  main_input.put("module_type", "EmptyEvent");
190  main_input.put("module_label", "source");
191  main_input.put("maxEvents", -1);
192  if (!pset.get_if_present("source", main_input)) {
193  mf::LogInfo("EventProcessorSourceConfig")
194  << "Could not find a source configuration: using default.";
195  }
196  ModuleDescription const md{
197  main_input.id(),
198  main_input.get<string>("module_type"),
199  main_input.get<string>("module_label"),
201  ProcessConfiguration{processName, pset.id(), getReleaseVersion()}};
202  InputSourceDescription isd{md, outputCallbacks_, actReg_};
203  try {
204  input_.reset(InputSourceFactory::make(main_input, isd).release());
205  }
206  catch (fhicl::detail::validationException const& e) {
208  << "\n\nModule label: " << cet::bold_fontify(md.moduleLabel())
209  << "\nmodule_type : " << cet::bold_fontify(md.moduleName()) << "\n\n"
210  << e.what();
211  }
212  catch (Exception const& x) {
213  if (x.categoryCode() == errors::Configuration) {
214  throw Exception(errors::Configuration, "FailedInputSource")
215  << "Configuration of main input source has failed\n"
216  << x;
217  }
218  throw;
219  }
220  catch (cet::exception const& x) {
221  throw Exception(errors::Configuration, "FailedInputSource")
222  << "Configuration of main input source has failed\n"
223  << x;
224  }
225  catch (...) {
226  throw;
227  }
228  }
229  actReg_.sPostSourceConstruction.invoke(input_->moduleDescription());
230  // Create product tables used for product retrieval within modules.
231  producedProductLookupTables_ = ProductTables{producedProductDescriptions_};
232  outputCallbacks_->invoke(producedProductLookupTables_);
233  }
Float_t x
Definition: compare.C:6
tsan_unique_ptr< InputSource > input_
tsan< ProductDescriptions > producedProductDescriptions_
static ConsumesInfo * instance()
Definition: ConsumesInfo.cc:27
tsan< UpdateOutputCallbacks > outputCallbacks_
std::unique_ptr< InputSource > make(fhicl::ParameterSet const &conf, InputSourceDescription &desc)
tsan< Scheduler > scheduler_
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
bool const handleEmptyRuns_
bool const handleEmptySubRuns_
tsan< PathManager > pathManager_
ScheduleIteration scheduleIteration_
fhicl::ParameterSet const & triggerPSet() const
Definition: Globals.cc:60
std::unique_ptr< GlobalTaskGroup > taskGroup_
tsan< ProducingServiceSignals > psSignals_
decltype(auto) constexpr end(T &&obj)
ADL-aware version of std::end.
Definition: StdUtils.h:77
void freeze(tbb::task_group &group)
static auto instance(bool cleanup=false)
tsan< std::map< ScheduleID, Schedule > > schedules_
ScheduleID::size_type nschedules() const
Definition: Globals.cc:24
detail::SharedResources sharedResources_
tsan< ProductTables > producedProductLookupTables_
#define TDEBUG_FUNC(LEVEL)
T get(std::string const &key) const
Definition: ParameterSet.h:314
std::string const & getReleaseVersion()
tsan_unique_ptr< ServicesManager > servicesManager_
void setupSignals(bool want_sigint_enabled)
ParameterSetID id() const
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
char const * what() const noexcept override
std::string const & processName() const
Definition: Globals.cc:48
id_type size_type
Definition: ScheduleID.h:25
std::optional< T > get_if_present(std::string const &key) const
Definition: ParameterSet.h:267
void setRequireConsumes(bool const)
Definition: ConsumesInfo.cc:96
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
static Globals * instance()
Definition: Globals.cc:17
Float_t e
Definition: plot.C:35
void put(std::string const &key)
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
std::string to_string() const
Definition: ParameterSet.h:196
art::EventProcessor::~EventProcessor ( )
default
art::EventProcessor::EventProcessor ( EventProcessor const &  )
delete
art::EventProcessor::EventProcessor ( EventProcessor &&  )
delete

Member Function Documentation

Level art::EventProcessor::advanceItemType ( )
private

Definition at line 1516 of file EventProcessor.cc.

References art::Event, FDEBUG, art::highest_level(), input_, art::InputFile, art::input::IsEvent, art::input::IsFile, art::input::IsInvalid, art::input::IsRun, art::input::IsStop, art::input::IsSubRun, art::errors::LogicError, art::Run, and art::SubRun.

Referenced by levelsToProcess(), readAndProcessAsync(), and recordOutputModuleClosureRequests().

1517  {
1518  auto const itemType = input_->nextItemType();
1519  FDEBUG(1) << string(4, ' ') << "*** nextItemType: " << itemType << " ***\n";
1520  switch (itemType) {
1521  case input::IsStop:
1522  return highest_level();
1523  case input::IsFile:
1524  return Level::InputFile;
1525  case input::IsRun:
1526  return Level::Run;
1527  case input::IsSubRun:
1528  return Level::SubRun;
1529  case input::IsEvent:
1530  return Level::Event;
1531  case input::IsInvalid:
1533  << "Invalid next item type presented to the event processor.\n"
1534  << "Please contact artists@fnal.gov.";
1535  }
1537  << "Unrecognized next item type presented to the event processor.\n"
1538  << "Please contact artists@fnal.gov.";
1539  }
tsan_unique_ptr< InputSource > input_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
constexpr auto highest_level() noexcept
Definition: Level.h:32
template<Level L>
std::enable_if_t<is_above_most_deeply_nested_level(L)> art::EventProcessor::begin ( )
private
void art::EventProcessor::beginJob ( )
private

Definition at line 433 of file EventProcessor.cc.

References actReg_, breakpoints::beginJob(), art::Schedule::beginJob(), e, FDEBUG, art::ScheduleIteration::for_each_schedule(), input_, invokePostBeginJobWorkers_(), schedule(), scheduleIteration_, sharedResources_, and art::ActivityRegistry::sPostBeginJob.

Referenced by art::EventProcessor::begin< Level::Job >(), and recordOutputModuleClosureRequests().

434  {
435  FDEBUG(1) << string(8, ' ') << "beginJob\n";
437  // NOTE: This implementation assumes 'Job' means one call the
438  // EventProcessor::run. If it really means once per 'application'
439  // then this code will have to be changed. Also have to deal with
440  // case where have 'run' then new Module added and do 'run' again.
441  // In that case the newly added Module needs its 'beginJob' to be
442  // called.
443  try {
444  input_->doBeginJob();
445  }
446  catch (cet::exception& e) {
447  mf::LogError("BeginJob") << "A cet::exception happened while processing"
448  " the beginJob of the 'source'";
449  e << "A cet::exception happened while processing"
450  " the beginJob of the 'source'\n";
451  throw;
452  }
453  catch (exception const&) {
454  mf::LogError("BeginJob") << "A exception happened while processing"
455  " the beginJob of the 'source'";
456  throw;
457  }
458  catch (...) {
459  mf::LogError("BeginJob") << "An unknown exception happened while"
460  " processing the beginJob of the 'source'";
461  throw;
462  }
463  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
465  });
466  actReg_.sPostBeginJob.invoke();
468  }
tsan_unique_ptr< InputSource > input_
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPostBeginJob
void beginJob(detail::SharedResources const &resources)
Definition: Schedule.cc:33
ScheduleIteration scheduleIteration_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
void beginJob()
Definition: Breakpoints.cc:14
detail::SharedResources sharedResources_
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void for_each_schedule(F f) const
Float_t e
Definition: plot.C:35
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::EventProcessor::beginRun ( )
private

Definition at line 667 of file EventProcessor.cc.

References actReg_, art::BeginRun, beginRunCalled_, art::errors::EventProcessorFailure, FDEBUG, finalizeRunEnabled_, art::ScheduleIteration::for_each_schedule(), art::Schedule::process(), r, runPrincipal_, schedule(), scheduleIteration_, art::ActivityRegistry::sPostBeginRun, and art::ActivityRegistry::sPreBeginRun.

Referenced by beginRunIfNotDoneAlready(), art::EventProcessor::begin< Level::Run >(), and recordOutputModuleClosureRequests().

668  {
669  assert(runPrincipal_);
670  RunID const r{runPrincipal_->runID()};
671  if (r.isFlush()) {
672  return;
673  }
674  finalizeRunEnabled_ = true;
675  try {
676  {
677  auto const run =
678  std::as_const(*runPrincipal_).makeRun(invalid_module_context);
679  actReg_.sPreBeginRun.invoke(run);
680  }
681  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
683  });
684  {
685  auto const run =
686  std::as_const(*runPrincipal_).makeRun(invalid_module_context);
687  actReg_.sPostBeginRun.invoke(run);
688  }
689  }
690  catch (cet::exception& ex) {
691  throw Exception{
693  "EventProcessor: an exception occurred during current event processing",
694  ex};
695  }
696  catch (...) {
697  mf::LogError("PassingThrough")
698  << "an exception occurred during current event processing";
699  throw;
700  }
701  FDEBUG(1) << string(8, ' ') << "beginRun....................(" << r
702  << ")\n";
703  beginRunCalled_ = true;
704  }
TRandom r
Definition: spectrum.C:23
void process(Transition, Principal &)
Definition: Schedule.cc:75
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostBeginRun
ScheduleIteration scheduleIteration_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
tsan_unique_ptr< RunPrincipal > runPrincipal_
std::atomic< bool > beginRunCalled_
std::atomic< bool > finalizeRunEnabled_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::FIFO, void(Run const &)> sPreBeginRun
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void for_each_schedule(F f) const
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::EventProcessor::beginRunIfNotDoneAlready ( )
private

Definition at line 707 of file EventProcessor.cc.

References beginRun(), and beginRunCalled_.

Referenced by art::EventProcessor::begin< Level::SubRun >(), process(), and recordOutputModuleClosureRequests().

708  {
709  if (!beginRunCalled_) {
710  beginRun();
711  }
712  }
std::atomic< bool > beginRunCalled_
void art::EventProcessor::beginSubRun ( )
private

Definition at line 839 of file EventProcessor.cc.

References actReg_, art::BeginSubRun, beginSubRunCalled_, art::errors::EventProcessorFailure, FDEBUG, finalizeSubRunEnabled_, art::ScheduleIteration::for_each_schedule(), art::Schedule::process(), schedule(), scheduleIteration_, art::ActivityRegistry::sPostBeginSubRun, art::ActivityRegistry::sPreBeginSubRun, and subRunPrincipal_.

Referenced by beginSubRunIfNotDoneAlready(), art::EventProcessor::begin< Level::SubRun >(), and recordOutputModuleClosureRequests().

840  {
841  assert(subRunPrincipal_);
842  SubRunID const sr{subRunPrincipal_->subRunID()};
843  if (sr.isFlush()) {
844  return;
845  }
846  finalizeSubRunEnabled_ = true;
847  try {
848  {
849  auto const srun =
850  std::as_const(*subRunPrincipal_).makeSubRun(invalid_module_context);
851  actReg_.sPreBeginSubRun.invoke(srun);
852  }
853  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
855  });
856  {
857  auto const srun =
858  std::as_const(*subRunPrincipal_).makeSubRun(invalid_module_context);
859  actReg_.sPostBeginSubRun.invoke(srun);
860  }
861  }
862  catch (cet::exception& ex) {
863  throw Exception{
865  "EventProcessor: an exception occurred during current event processing",
866  ex};
867  }
868  catch (...) {
869  mf::LogError("PassingThrough")
870  << "an exception occurred during current event processing";
871  throw;
872  }
873  FDEBUG(1) << string(8, ' ') << "beginSubRun.................(" << sr
874  << ")\n";
875  beginSubRunCalled_ = true;
876  }
void process(Transition, Principal &)
Definition: Schedule.cc:75
ScheduleIteration scheduleIteration_
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostBeginSubRun
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
std::atomic< bool > finalizeSubRunEnabled_
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
std::atomic< bool > beginSubRunCalled_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::FIFO, void(SubRun const &)> sPreBeginSubRun
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void for_each_schedule(F f) const
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::EventProcessor::beginSubRunIfNotDoneAlready ( )
private

Definition at line 879 of file EventProcessor.cc.

References beginSubRun(), and beginSubRunCalled_.

Referenced by process(), and recordOutputModuleClosureRequests().

880  {
881  if (!beginSubRunCalled_) {
882  beginSubRun();
883  }
884  }
std::atomic< bool > beginSubRunCalled_
void art::EventProcessor::closeAllFiles ( )
private
void art::EventProcessor::closeAllOutputFiles ( )
private

Definition at line 537 of file EventProcessor.cc.

References art::Schedule::closeAllOutputFiles(), FDEBUG, main_schedule(), and respondToCloseOutputFiles().

Referenced by closeAllFiles(), and recordOutputModuleClosureRequests().

538  {
539  if (!main_schedule().someOutputsOpen()) {
540  return;
541  }
544  FDEBUG(1) << string(8, ' ') << "closeAllOutputFiles\n";
545  }
void closeAllOutputFiles()
Definition: Schedule.h:85
Schedule & main_schedule()
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void art::EventProcessor::closeInputFile ( )
private

Definition at line 517 of file EventProcessor.cc.

References actReg_, closeSomeOutputFiles(), FDEBUG, art::Schedule::incrementInputFileNumber(), input_, art::Granularity::InputFile, main_schedule(), art::Schedule::recordOutputClosureRequests(), respondToCloseInputFile(), art::ActivityRegistry::sPostCloseFile, and art::ActivityRegistry::sPreCloseFile.

Referenced by closeAllFiles(), art::EventProcessor::finalize< Level::InputFile >(), and recordOutputModuleClosureRequests().

518  {
520  // Output-file closing on input-file boundaries are tricky since
521  // input files must outlive the output files, which often have
522  // data copied forward from the input files. That's why the
523  // recordOutputClosureRequests call is made here instead of in a
524  // specialization of recordOutputModuleClosureRequests<>.
526  if (main_schedule().outputsToClose()) {
528  }
530  actReg_.sPreCloseFile.invoke();
531  input_->closeFile();
532  actReg_.sPostCloseFile.invoke();
533  FDEBUG(1) << string(8, ' ') << "closeInputFile\n";
534  }
tsan_unique_ptr< InputSource > input_
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreCloseFile
Schedule & main_schedule()
void recordOutputClosureRequests(Granularity const granularity)
Definition: Schedule.h:73
void incrementInputFileNumber()
Definition: Schedule.h:112
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
GlobalSignal< detail::SignalResponseType::LIFO, void()> sPostCloseFile
void art::EventProcessor::closeSomeOutputFiles ( )
private

Definition at line 585 of file EventProcessor.cc.

References art::Schedule::closeSomeOutputFiles(), FDEBUG, main_schedule(), and respondToCloseOutputFiles().

Referenced by closeInputFile(), levelsToProcess(), and recordOutputModuleClosureRequests().

586  {
587  // Precondition: there are SOME output files that have been
588  // flagged as needing to close. Otherwise,
589  // 'respondtoCloseOutputFiles' will be needlessly
590  // called.
591  assert(main_schedule().outputsToClose());
594  FDEBUG(1) << string(8, ' ') << "closeSomeOutputFiles\n";
595  }
Schedule & main_schedule()
void closeSomeOutputFiles()
Definition: Schedule.h:97
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void art::EventProcessor::endJob ( )
private

Definition at line 471 of file EventProcessor.cc.

References actReg_, ec_, endJobAllSchedules(), FDEBUG, input_, art::ConsumesInfo::instance(), mf::LogStatistics(), pathManager_, scheduler_, art::ConsumesInfo::showMissingConsumes(), art::ActivityRegistry::sPostEndJob, timer_, and art::detail::writeSummary().

Referenced by art::EventProcessor::finalize< Level::Job >(), and recordOutputModuleClosureRequests().

472  {
473  FDEBUG(1) << string(8, ' ') << "endJob\n";
474  ec_->call([this] { endJobAllSchedules(); });
475  ec_->call([] { ConsumesInfo::instance()->showMissingConsumes(); });
476  ec_->call([this] { input_->doEndJob(); });
477  ec_->call([this] { actReg_.sPostEndJob.invoke(); });
478  ec_->call([] { mf::LogStatistics(); });
479  ec_->call([this] {
481  });
482  }
tsan_unique_ptr< InputSource > input_
static ConsumesInfo * instance()
Definition: ConsumesInfo.cc:27
tsan< Scheduler > scheduler_
GlobalSignal< detail::SignalResponseType::LIFO, void()> sPostEndJob
tsan< PathManager > pathManager_
tsan< detail::ExceptionCollector > ec_
tsan< cet::cpu_timer > timer_
void writeSummary(PathManager &pm, bool wantSummary, cet::cpu_timer const &timer)
Definition: writeSummary.cc:89
void LogStatistics()
void showMissingConsumes() const
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void art::EventProcessor::endJobAllSchedules ( )
private

Definition at line 485 of file EventProcessor.cc.

References art::Schedule::endJob(), art::ScheduleIteration::for_each_schedule(), schedule(), and scheduleIteration_.

Referenced by endJob(), and recordOutputModuleClosureRequests().

486  {
488  [this](ScheduleID const sid) { schedule(sid).endJob(); });
489  }
void endJob()
Definition: Schedule.cc:40
ScheduleIteration scheduleIteration_
Schedule & schedule(ScheduleID const id)
void for_each_schedule(F f) const
void art::EventProcessor::endRun ( )
private

Definition at line 758 of file EventProcessor.cc.

References actReg_, beginRunCalled_, art::EndRun, art::errors::EventProcessorFailure, FDEBUG, art::ScheduleIteration::for_each_schedule(), art::Schedule::process(), r, runPrincipal_, schedule(), scheduleIteration_, art::ActivityRegistry::sPostEndRun, and art::ActivityRegistry::sPreEndRun.

Referenced by art::EventProcessor::finalize< Level::Run >(), and recordOutputModuleClosureRequests().

759  {
760  assert(runPrincipal_);
761  // Precondition: The RunID does not correspond to a flush ID. --
762  // N.B. The flush flag is not explicitly checked here since endRun
763  // is only called from finalizeRun, which is where the check
764  // happens.
765  RunID const run{runPrincipal_->runID()};
766  assert(!run.isFlush());
767  try {
768  actReg_.sPreEndRun.invoke(runPrincipal_->runID(),
769  runPrincipal_->endTime());
770  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
772  });
773  auto const r =
774  std::as_const(*runPrincipal_).makeRun(invalid_module_context);
775  actReg_.sPostEndRun.invoke(r);
776  }
777  catch (cet::exception& ex) {
778  throw Exception{
780  "EventProcessor: an exception occurred during current event processing",
781  ex};
782  }
783  catch (...) {
784  mf::LogError("PassingThrough")
785  << "an exception occurred during current event processing";
786  throw;
787  }
788  FDEBUG(1) << string(8, ' ') << "endRun......................(" << run
789  << ")\n";
790  beginRunCalled_ = false;
791  }
TRandom r
Definition: spectrum.C:23
void process(Transition, Principal &)
Definition: Schedule.cc:75
ScheduleIteration scheduleIteration_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
tsan_unique_ptr< RunPrincipal > runPrincipal_
std::atomic< bool > beginRunCalled_
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostEndRun
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::FIFO, void(RunID const &, Timestamp const &)> sPreEndRun
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void for_each_schedule(F f) const
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::EventProcessor::endSubRun ( )
private

Definition at line 987 of file EventProcessor.cc.

References actReg_, beginSubRunCalled_, art::EndSubRun, art::errors::EventProcessorFailure, FDEBUG, art::ScheduleIteration::for_each_schedule(), art::Schedule::process(), schedule(), scheduleIteration_, art::ActivityRegistry::sPostEndSubRun, art::ActivityRegistry::sPreEndSubRun, and subRunPrincipal_.

Referenced by art::EventProcessor::finalize< Level::SubRun >(), and recordOutputModuleClosureRequests().

988  {
989  assert(subRunPrincipal_);
990  // Precondition: The SubRunID does not correspond to a flush ID.
991  // Note: the flush flag is not explicitly checked here since
992  // endSubRun is only called from finalizeSubRun, which is where the
993  // check happens.
994  SubRunID const sr{subRunPrincipal_->subRunID()};
995  assert(!sr.isFlush());
996  try {
997  actReg_.sPreEndSubRun.invoke(subRunPrincipal_->subRunID(),
998  subRunPrincipal_->endTime());
999  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
1001  });
1002  auto const srun =
1003  std::as_const(*subRunPrincipal_).makeSubRun(invalid_module_context);
1004  actReg_.sPostEndSubRun.invoke(srun);
1005  }
1006  catch (cet::exception& ex) {
1007  throw Exception{
1009  "EventProcessor: an exception occurred during current event processing",
1010  ex};
1011  }
1012  catch (...) {
1013  mf::LogError("PassingThrough")
1014  << "an exception occurred during current event processing";
1015  throw;
1016  }
1017  FDEBUG(1) << string(8, ' ') << "endSubRun...................(" << sr
1018  << ")\n";
1019  beginSubRunCalled_ = false;
1020  }
void process(Transition, Principal &)
Definition: Schedule.cc:75
ScheduleIteration scheduleIteration_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
std::atomic< bool > beginSubRunCalled_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::FIFO, void(SubRunID const &, Timestamp const &)> sPreEndSubRun
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void for_each_schedule(F f) const
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostEndSubRun
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
actions::ActionCodes art::EventProcessor::error_action ( cet::exception &  e) const
inlineprivate

Definition at line 158 of file EventProcessor.h.

References scheduler_.

Referenced by finishEventAsync(), and processEventAsync().

159  {
160  return scheduler_->actionTable().find(e.root_cause());
161  }
tsan< Scheduler > scheduler_
Float_t e
Definition: plot.C:35
template<Level L>
void art::EventProcessor::finalize ( )
private
template<Level L>
void art::EventProcessor::finalizeContainingLevels ( )
inlineprivate

Definition at line 96 of file EventProcessor.h.

97  {}
void art::EventProcessor::finishEventAsync ( ScheduleID  sid)
private

Definition at line 1412 of file EventProcessor.cc.

References actReg_, e, error_action(), art::Granularity::Event, art::Schedule::event_principal(), art::errors::EventProcessorFailure, FDEBUG, art::actions::IgnoreCompletely, openSomeOutputFiles(), processAllEventsAsync(), art::Schedule::recordOutputClosureRequests(), schedule(), sharedException_, art::ActivityRegistry::sPostProcessEvent, art::SharedException::store(), art::SharedException::store_current(), TDEBUG_BEGIN_FUNC_SI, TDEBUG_END_FUNC_SI, TDEBUG_FUNC_SI, and art::Schedule::writeEvent().

1413  {
1414  auto& ep = schedule(sid).event_principal();
1415  actReg_.sPostProcessEvent.invoke(
1416  std::as_const(ep).makeEvent(invalid_module_context),
1417  ScheduleContext{sid});
1418 
1419  // Note: We are part of the endPathTask.
1420  TDEBUG_BEGIN_FUNC_SI(4, sid);
1421  FDEBUG(1) << string(8, ' ') << "processEvent................("
1422  << ep.eventID() << ")\n";
1423  try {
1424  // Ask the output workers if they have reached their limits, and
1425  // if so setup to end the job the next time around the event
1426  // loop.
1427  FDEBUG(1) << string(8, ' ') << "shouldWeStop\n";
1428  static std::mutex m;
1429  std::lock_guard sentry{m};
1430  // Now we can write the results of processing to the outputs,
1431  // and delete the event principal.
1432  if (!ep.eventID().isFlush()) {
1433  // Possibly open new output files. This is safe to do because
1434  // EndPathExecutor functions are called in a serialized
1435  // context.
1436  TDEBUG_FUNC_SI(5, sid) << "Calling openSomeOutputFiles()";
1438  TDEBUG_FUNC_SI(5, sid) << "Calling schedule(sid).writeEvent()";
1439 
1440  auto const id = ep.eventID();
1441  schedule(sid).writeEvent();
1442  FDEBUG(1) << string(8, ' ') << "writeEvent..................(" << id
1443  << ")\n";
1444  }
1445  TDEBUG_FUNC_SI(5, sid)
1446  << "Calling schedules_->"
1447  "recordOutputClosureRequests(Granularity::Event)";
1449  }
1450  catch (cet::exception& e) {
1454  "EventProcessor: an exception occurred "
1455  "during current event processing",
1456  e);
1457  // And then end this task, terminating event processing.
1458  TDEBUG_END_FUNC_SI(4, sid) << "EXCEPTION";
1459  return;
1460  }
1461  mf::LogWarning(e.category())
1462  << "exception being ignored for current event:\n"
1463  << cet::trim_right_copy(e.what(), " \n");
1464  // WARNING: We continue processing after the catch blocks!!!
1465  }
1466  catch (...) {
1467  mf::LogError("PassingThrough")
1468  << "an exception occurred during current event processing";
1470  // And then end this task, terminating event processing.
1471  TDEBUG_END_FUNC_SI(4, sid) << "EXCEPTION";
1472  return;
1473  }
1474 
1475  // The next event processing task is a continuation of this task.
1476  processAllEventsAsync(sid);
1477  TDEBUG_END_FUNC_SI(4, sid);
1478  }
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
void processAllEventsAsync(ScheduleID sid)
void store(std::exception_ptr ex_ptr)
EventPrincipal & event_principal()
Definition: Schedule.h:185
actions::ActionCodes error_action(cet::exception &e) const
void writeEvent()
Definition: Schedule.h:103
#define TDEBUG_FUNC_SI(LEVEL, SI)
void recordOutputClosureRequests(Granularity const granularity)
Definition: Schedule.h:73
GlobalSignal< detail::SignalResponseType::LIFO, void(Event const &, ScheduleContext)> sPostProcessEvent
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
SharedException sharedException_
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
Float_t e
Definition: plot.C:35
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::EventProcessor::invokePostBeginJobWorkers_ ( )
private

Definition at line 236 of file EventProcessor.cc.

References actReg_, art::ScheduleID::first(), input_, pathManager_, and art::ActivityRegistry::sPostBeginJobWorkers.

Referenced by beginJob(), and recordOutputModuleClosureRequests().

237  {
238  using cet::transform_all;
239  // Need to convert multiple lists of workers into a long list that
240  // the postBeginJobWorkers callbacks can understand.
241  vector<Worker*> allWorkers;
242  transform_all(pathManager_->triggerPathsInfo(ScheduleID::first()).workers(),
243  back_inserter(allWorkers),
244  [](auto const& pr) { return pr.second.get(); });
245  transform_all(pathManager_->endPathInfo(ScheduleID::first()).workers(),
246  back_inserter(allWorkers),
247  [](auto const& pr) { return pr.second.get(); });
248  actReg_.sPostBeginJobWorkers.invoke(input_, allWorkers);
249  }
tsan_unique_ptr< InputSource > input_
static constexpr ScheduleID first()
Definition: ScheduleID.h:50
tsan< PathManager > pathManager_
GlobalSignal< detail::SignalResponseType::LIFO, void(InputSource *, std::vector< Worker * > const &)> sPostBeginJobWorkers
ActivityRegistry actReg_
template<Level L>
bool art::EventProcessor::levelsToProcess ( )
private

Definition at line 256 of file EventProcessor.cc.

References advanceItemType(), closeSomeOutputFiles(), art::highest_level(), art::errors::LogicError, main_schedule(), nextLevel_, art::Schedule::outputsToClose(), art::ReadyToAdvance, setOutputFileStatus(), and art::Switching.

257  {
258  if (nextLevel_.load() == Level::ReadyToAdvance) {
260  // Consider reading right here?
261  }
262  if (nextLevel_.load() == L) {
264  if (main_schedule().outputsToClose()) {
266  finalizeContainingLevels<L>();
268  }
269  return true;
270  }
271  if (nextLevel_.load() < L) {
272  return false;
273  }
274  if (nextLevel_.load() == highest_level()) {
275  return false;
276  }
277  throw Exception{errors::LogicError} << "Incorrect level hierarchy.\n"
278  << " Current level: " << L
279  << " Next level: " << nextLevel_;
280  }
std::atomic< Level > nextLevel_
bool outputsToClose() const
Definition: Schedule.h:67
Schedule & main_schedule()
void setOutputFileStatus(OutputFileStatus)
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
constexpr auto highest_level() noexcept
Definition: Level.h:32
void art::EventProcessor::openInputFile ( )
private

Definition at line 495 of file EventProcessor.cc.

References actReg_, fb_, FDEBUG, input_, art::errors::LogicError, respondToOpenInputFile(), art::ActivityRegistry::sPostOpenFile, and art::ActivityRegistry::sPreOpenFile.

Referenced by art::EventProcessor::begin< Level::InputFile >(), and recordOutputModuleClosureRequests().

496  {
497  actReg_.sPreOpenFile.invoke();
498  FDEBUG(1) << string(8, ' ') << "openInputFile\n";
499  fb_.reset(input_->readFile().release());
500  if (fb_ == nullptr) {
502  << "Source readFile() did not return a valid FileBlock: FileBlock "
503  << "should be valid or readFile() should throw.\n";
504  }
505  actReg_.sPostOpenFile.invoke(fb_->fileName());
507  }
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreOpenFile
tsan_unique_ptr< InputSource > input_
tsan_unique_ptr< FileBlock > fb_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &)> sPostOpenFile
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void art::EventProcessor::openSomeOutputFiles ( )
private

Definition at line 562 of file EventProcessor.cc.

References fb_, FDEBUG, art::ScheduleIteration::for_each_schedule(), art::Schedule::openSomeOutputFiles(), outputsToOpen(), respondToOpenOutputFiles(), schedule(), and scheduleIteration_.

Referenced by art::EventProcessor::finalize< Level::Run >(), art::EventProcessor::finalize< Level::SubRun >(), finishEventAsync(), and recordOutputModuleClosureRequests().

563  {
564  if (!outputsToOpen()) {
565  return;
566  }
567 
568  auto open_some_outputs = [this](ScheduleID const sid) {
570  };
571  scheduleIteration_.for_each_schedule(open_some_outputs);
572 
573  FDEBUG(1) << string(8, ' ') << "openSomeOutputFiles\n";
575  }
tsan_unique_ptr< FileBlock > fb_
ScheduleIteration scheduleIteration_
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void for_each_schedule(F f) const
void openSomeOutputFiles(FileBlock const &fb)
Definition: Schedule.h:91
EventProcessor& art::EventProcessor::operator= ( EventProcessor const &  )
delete
EventProcessor& art::EventProcessor::operator= ( EventProcessor &&  )
delete
bool art::EventProcessor::outputsToOpen ( )
private

Definition at line 548 of file EventProcessor.cc.

References art::ScheduleIteration::for_each_schedule(), art::Schedule::outputsToOpen(), schedule(), and scheduleIteration_.

Referenced by openSomeOutputFiles(), and recordOutputModuleClosureRequests().

549  {
550  bool outputs_to_open{false};
551  auto check_outputs_to_open = [this,
552  &outputs_to_open](ScheduleID const sid) {
553  if (schedule(sid).outputsToOpen()) {
554  outputs_to_open = true;
555  }
556  };
557  scheduleIteration_.for_each_schedule(check_outputs_to_open);
558  return outputs_to_open;
559  }
ScheduleIteration scheduleIteration_
bool outputsToOpen() const
Definition: Schedule.h:61
Schedule & schedule(ScheduleID const id)
void for_each_schedule(F f) const
template<Level L>
void art::EventProcessor::process ( )
private

Definition at line 1482 of file EventProcessor.cc.

References ec_, and art::shutdown_flag.

1483  {
1484  if ((shutdown_flag > 0) || !ec_->empty()) {
1485  return;
1486  }
1487  ec_->call([this] { begin<L>(); });
1488  while ((shutdown_flag == 0) && ec_->empty() &&
1489  levelsToProcess<level_down(L)>()) {
1490  ec_->call([this] { process<level_down(L)>(); });
1491  }
1492  ec_->call([this] {
1493  finalize<L>();
1494  recordOutputModuleClosureRequests<L>();
1495  });
1496  }
tsan< detail::ExceptionCollector > ec_
std::atomic< int > shutdown_flag
template<>
void art::EventProcessor::process ( )
private

Definition at line 1039 of file EventProcessor.cc.

References beginRunIfNotDoneAlready(), beginSubRunIfNotDoneAlready(), art::Schedule::closeSomeOutputFiles(), ec_, FDEBUG, fileSwitchInProgress_, firstEvent_, main_schedule(), processAllEventsAsync(), respondToCloseOutputFiles(), scheduler_, setOutputFileStatus(), sharedException_, art::shutdown_flag, art::Switching, taskGroup_, and art::SharedException::throw_if_stored_exception().

1040  {
1041  if ((shutdown_flag > 0) || !ec_->empty()) {
1042  return;
1043  }
1044  // Note: This loop is to allow output file switching to happen in
1045  // the main thread.
1046  firstEvent_ = true;
1047  bool done = false;
1048  while (!done) {
1051 
1052  auto const last_schedule_index = scheduler_->num_schedules() - 1;
1053  for (ScheduleID::size_type i = 0; i != last_schedule_index; ++i) {
1054  taskGroup_->run([this, i] { processAllEventsAsync(ScheduleID(i)); });
1055  }
1056  taskGroup_->native_group().run_and_wait([this, last_schedule_index] {
1057  processAllEventsAsync(ScheduleID(last_schedule_index));
1058  });
1059 
1060  // If anything bad happened during event processing, let the
1061  // user know.
1063  if (!fileSwitchInProgress_.load()) {
1064  done = true;
1065  continue;
1066  }
1068  finalizeContainingLevels<most_deeply_nested_level()>();
1071  FDEBUG(1) << string(8, ' ') << "closeSomeOutputFiles\n";
1072  // We started the switch after advancing to the next item type;
1073  // we must make sure that we read that event before advancing
1074  // the item type again.
1075  firstEvent_ = true;
1076  fileSwitchInProgress_ = false;
1077  }
1078  }
tsan< Scheduler > scheduler_
Schedule & main_schedule()
std::atomic< bool > firstEvent_
std::unique_ptr< GlobalTaskGroup > taskGroup_
tsan< detail::ExceptionCollector > ec_
std::atomic< int > shutdown_flag
void setOutputFileStatus(OutputFileStatus)
void processAllEventsAsync(ScheduleID sid)
std::atomic< bool > fileSwitchInProgress_
SharedException sharedException_
id_type size_type
Definition: ScheduleID.h:25
void closeSomeOutputFiles()
Definition: Schedule.h:97
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void beginSubRunIfNotDoneAlready()
void art::EventProcessor::processAllEventsAsync ( ScheduleID  sid)
private

Definition at line 1085 of file EventProcessor.cc.

References readAndProcessAsync(), sharedException_, art::SharedException::store_current(), TDEBUG_BEGIN_FUNC_SI, and TDEBUG_END_FUNC_SI.

Referenced by finishEventAsync(), process(), and readAndProcessAsync().

1086  {
1087  // Note: We are part of the processAllEventsTask (schedule head
1088  // task), and our parent is the eventLoopTask.
1089  TDEBUG_BEGIN_FUNC_SI(4, sid);
1090  try {
1091  readAndProcessAsync(sid);
1092  }
1093  catch (...) {
1095  TDEBUG_END_FUNC_SI(4, sid) << "terminate event loop because of EXCEPTION";
1096  return;
1097  }
1098  // If no exception, then end this task, which does not terminate
1099  // event processing because readAndProcessAsync creates a
1100  // continuation task.
1101  TDEBUG_END_FUNC_SI(4, sid);
1102  }
void readAndProcessAsync(ScheduleID sid)
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
SharedException sharedException_
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
void art::EventProcessor::processEvent ( )
private
void art::EventProcessor::processEventAsync ( ScheduleID  sid)
private

Definition at line 1371 of file EventProcessor.cc.

References e, error_action(), art::errors::EventProcessorFailure, art::actions::FailModule, art::actions::FailPath, art::actions::IgnoreCompletely, art::Schedule::process_event_modifiers(), schedule(), sharedException_, art::SharedException::store(), art::SharedException::store_current(), TDEBUG_BEGIN_FUNC_SI, and TDEBUG_END_FUNC_SI.

Referenced by readAndProcessAsync().

1372  {
1373  // Note: We are part of the readAndProcessEventTask (schedule head
1374  // task), and our parent task is the eventLoopTask.
1375  TDEBUG_BEGIN_FUNC_SI(4, sid);
1376  assert(!schedule(sid).event_principal().eventID().isFlush());
1377  // Continue processing via the creation of a continuation.
1378  auto endPathTask = make_waiting_task<EndPathTask>(this, sid);
1379  // Start the trigger paths running. When they finish they will
1380  // spawn the endPathTask which will run the end path, write the
1381  // event, and start the next event processing task.
1382  schedule(sid).process_event_modifiers(endPathTask);
1383  TDEBUG_END_FUNC_SI(4, sid);
1384  }
1385  catch (cet::exception& e) {
1386  // Upon exiting this scope, end this task, terminating event
1387  // processing.
1388  auto const action = error_action(e);
1389  if (action != actions::IgnoreCompletely) {
1390  assert(action != actions::FailModule);
1391  assert(action != actions::FailPath);
1394  "EventProcessor: an exception occurred during current event processing",
1395  e);
1396  TDEBUG_END_FUNC_SI(4, sid) << "terminate event loop because of EXCEPTION";
1397  return;
1398  }
1399  mf::LogWarning(e.category())
1400  << "exception being ignored for current event:\n"
1401  << cet::trim_right_copy(e.what(), " \n");
1402  TDEBUG_END_FUNC_SI(4, sid) << "Ignoring exception.";
1403  }
1404  catch (...) {
1405  mf::LogError("PassingThrough")
1406  << "an exception occurred during current event processing";
1408  TDEBUG_END_FUNC_SI(4, sid) << "terminate event loop because of EXCEPTION";
1409  }
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
void store(std::exception_ptr ex_ptr)
actions::ActionCodes error_action(cet::exception &e) const
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
SharedException sharedException_
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
Schedule & schedule(ScheduleID const id)
void process_event_modifiers(hep::concurrency::WaitingTaskPtr endPathTask)
Definition: Schedule.cc:82
Float_t e
Definition: plot.C:35
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
void art::EventProcessor::readAndProcessAsync ( ScheduleID  sid)
private

Definition at line 1110 of file EventProcessor.cc.

References art::Schedule::accept_principal(), actReg_, advanceItemType(), FDEBUG, fileSwitchInProgress_, firstEvent_, art::highest_level(), input_, art::errors::LogicError, art::most_deeply_nested_level(), nextLevel_, processAllEventsAsync(), processEventAsync(), producedProductLookupTables_, psSignals_, art::ReadyToAdvance, sc, schedule(), art::shutdown_flag, art::ActivityRegistry::sPostSourceEvent, art::ActivityRegistry::sPreSourceEvent, subRunPrincipal_, TDEBUG_BEGIN_FUNC_SI, TDEBUG_END_FUNC_SI, and TDEBUG_FUNC_SI.

Referenced by processAllEventsAsync().

1111  {
1112  // Note: We are part of the readAndProcessEventTask (schedule head
1113  // task), and our parent task is the eventLoopTask.
1114  TDEBUG_BEGIN_FUNC_SI(4, sid);
1115  // Note: shutdown_flag is a extern global atomic int in
1116  // art/art/Utilities/UnixSignalHandlers.cc
1117  if (shutdown_flag) {
1118  // User called for a clean shutdown using a signal or ctrl-c,
1119  // end event processing and this task.
1120  TDEBUG_END_FUNC_SI(4, sid) << "CLEAN SHUTDOWN";
1121  return;
1122  }
1123 
1124  // The item type advance and the event read must be done with the
1125  // input source lock held; however event-processing must not
1126  // serialized.
1127  {
1128  InputSourceMutexSentry lock_input;
1129  if (fileSwitchInProgress_.load()) {
1130  // We must avoid advancing the iterator after a schedule has
1131  // noticed it is time to switch files. After the switch, we
1132  // will need to set firstEvent_ true so that the first
1133  // schedule that resumes after the switch actually reads the
1134  // event that the first schedule which noticed we needed a
1135  // switch had advanced the iterator to.
1136 
1137  // Note: We still have the problem that because the schedules
1138  // do not read events at the same time the file switch point
1139  // can be up to nschedules-1 ahead of where it would have been
1140  // if there was only one schedule. If we are switching output
1141  // files every event in an attempt to create single event
1142  // files, this really does not work out too well.
1143  TDEBUG_END_FUNC_SI(4, sid) << "FILE SWITCH";
1144  return;
1145  }
1146  // Check the next item type and exit this task if it is not an
1147  // event, or if the user has asynchronously requested a
1148  // shutdown.
1149  auto expected = true;
1150  if (firstEvent_.compare_exchange_strong(expected, false)) {
1151  // Do not advance the item type on the first event.
1152  } else {
1153  // Do the advance item type.
1154  if (nextLevel_.load() == Level::ReadyToAdvance) {
1155  // See what the next item is.
1156  TDEBUG_FUNC_SI(5, sid) << "Calling advanceItemType()";
1158  }
1159  if ((nextLevel_.load() < most_deeply_nested_level()) ||
1160  (nextLevel_.load() == highest_level())) {
1161  // We are popping up, end event processing and this task.
1162  TDEBUG_END_FUNC_SI(4, sid) << "END OF SUBRUN";
1163  return;
1164  }
1165  if (nextLevel_.load() != most_deeply_nested_level()) {
1166  // Error: incorrect level hierarchy
1167  TDEBUG_END_FUNC_SI(4, sid) << "BAD HIERARCHY";
1168  throw Exception{errors::LogicError} << "Incorrect level hierarchy.";
1169  }
1171  // At this point we have determined that we are going to read
1172  // an event and we must do that before dropping the lock on
1173  // the input source which is what is protecting us against a
1174  // double-advance caused by a different schedule.
1175  if (schedule(sid).outputsToClose()) {
1176  fileSwitchInProgress_ = true;
1177  TDEBUG_END_FUNC_SI(4, sid) << "FILE SWITCH INITIATED";
1178  return;
1179  }
1180  }
1181 
1182  // Now we can read the event from the source.
1183  ScheduleContext const sc{sid};
1184  assert(subRunPrincipal_);
1185  assert(subRunPrincipal_->subRunID().isValid());
1186  actReg_.sPreSourceEvent.invoke(sc);
1187  TDEBUG_FUNC_SI(5, sid) << "Calling input_->readEvent(subRunPrincipal_)";
1188  auto ep = input_->readEvent(subRunPrincipal_.get());
1189  assert(ep);
1190  // The intended behavior here is that the producing services
1191  // which are called during the sPostReadEvent cannot see each
1192  // others put products. We enforce this by creating the groups
1193  // for the produced products, but do not allow the lookups to
1194  // find them until after the callbacks have run.
1195  ep->createGroupsForProducedProducts(producedProductLookupTables_);
1196  psSignals_->sPostReadEvent.invoke(*ep);
1197  ep->enableLookupOfProducedProducts();
1198  actReg_.sPostSourceEvent.invoke(
1199  std::as_const(*ep).makeEvent(invalid_module_context), sc);
1200  FDEBUG(1) << string(8, ' ') << "readEvent...................("
1201  << ep->eventID() << ")\n";
1202  schedule(sid).accept_principal(std::move(ep));
1203  // Now we drop the input source lock by exiting the guarded
1204  // scope.
1205  }
1206  if (schedule(sid).event_principal().eventID().isFlush()) {
1207  // No processing to do, start next event handling task.
1208  processAllEventsAsync(sid);
1209  TDEBUG_END_FUNC_SI(4, sid) << "FLUSH EVENT";
1210  return;
1211  }
1212 
1213  // Now process the event.
1214  processEventAsync(sid);
1215  TDEBUG_END_FUNC_SI(4, sid);
1216  }
tsan_unique_ptr< InputSource > input_
GlobalSignal< detail::SignalResponseType::LIFO, void(Event const &, ScheduleContext)> sPostSourceEvent
constexpr auto most_deeply_nested_level() noexcept
Definition: Level.h:44
std::atomic< Level > nextLevel_
std::atomic< bool > firstEvent_
tsan< ProducingServiceSignals > psSignals_
std::atomic< int > shutdown_flag
void processAllEventsAsync(ScheduleID sid)
tsan< ProductTables > producedProductLookupTables_
std::atomic< bool > fileSwitchInProgress_
#define TDEBUG_FUNC_SI(LEVEL, SI)
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
void processEventAsync(ScheduleID sid)
Float_t sc
Definition: plot.C:23
void accept_principal(std::unique_ptr< EventPrincipal > principal)
Definition: Schedule.h:178
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
constexpr auto highest_level() noexcept
Definition: Level.h:32
GlobalSignal< detail::SignalResponseType::FIFO, void(ScheduleContext)> sPreSourceEvent
void art::EventProcessor::readEvent ( )
private
void art::EventProcessor::readRun ( )
private

Definition at line 637 of file EventProcessor.cc.

References actReg_, FDEBUG, art::ScheduleIteration::for_each_schedule(), input_, producedProductLookupTables_, psSignals_, r, runPrincipal_, schedule(), scheduleIteration_, art::Schedule::seedRunRangeSet(), art::ActivityRegistry::sPostSourceRun, and art::ActivityRegistry::sPreSourceRun.

Referenced by art::EventProcessor::begin< Level::Run >(), and recordOutputModuleClosureRequests().

638  {
639  actReg_.sPreSourceRun.invoke();
640  runPrincipal_.reset(input_->readRun().release());
641  assert(runPrincipal_);
642  auto rsh = input_->runRangeSetHandler();
643  assert(rsh);
644  auto seed_range_set = [this, &rsh](ScheduleID const sid) {
645  schedule(sid).seedRunRangeSet(*rsh);
646  };
647  scheduleIteration_.for_each_schedule(seed_range_set);
648  // The intended behavior here is that the producing services which
649  // are called during the sPostReadRun cannot see each others put
650  // products. We enforce this by creating the groups for the
651  // produced products, but do not allow the lookups to find them
652  // until after the callbacks have run.
653  runPrincipal_->createGroupsForProducedProducts(
655  psSignals_->sPostReadRun.invoke(*runPrincipal_);
656  runPrincipal_->enableLookupOfProducedProducts();
657  {
658  auto const r =
659  std::as_const(*runPrincipal_).makeRun(invalid_module_context);
660  actReg_.sPostSourceRun.invoke(r);
661  }
662  FDEBUG(1) << string(8, ' ') << "readRun.....................("
663  << runPrincipal_->runID() << ")\n";
664  }
TRandom r
Definition: spectrum.C:23
tsan_unique_ptr< InputSource > input_
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostSourceRun
ScheduleIteration scheduleIteration_
tsan_unique_ptr< RunPrincipal > runPrincipal_
tsan< ProducingServiceSignals > psSignals_
tsan< ProductTables > producedProductLookupTables_
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void for_each_schedule(F f) const
void seedRunRangeSet(RangeSetHandler const &rsh)
Definition: Schedule.h:131
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreSourceRun
void art::EventProcessor::readSubRun ( )
private

Definition at line 809 of file EventProcessor.cc.

References actReg_, FDEBUG, art::ScheduleIteration::for_each_schedule(), input_, producedProductLookupTables_, psSignals_, runPrincipal_, schedule(), scheduleIteration_, art::Schedule::seedSubRunRangeSet(), art::ActivityRegistry::sPostSourceSubRun, art::ActivityRegistry::sPreSourceSubRun, and subRunPrincipal_.

Referenced by art::EventProcessor::begin< Level::SubRun >(), and recordOutputModuleClosureRequests().

810  {
811  actReg_.sPreSourceSubRun.invoke();
812  subRunPrincipal_.reset(input_->readSubRun(runPrincipal_.get()).release());
813  assert(subRunPrincipal_);
814  auto rsh = input_->subRunRangeSetHandler();
815  assert(rsh);
816  auto seed_range_set = [this, &rsh](ScheduleID const sid) {
817  schedule(sid).seedSubRunRangeSet(*rsh);
818  };
819  scheduleIteration_.for_each_schedule(seed_range_set);
820  // The intended behavior here is that the producing services which
821  // are called during the sPostReadSubRun cannot see each others
822  // put products. We enforce this by creating the groups for the
823  // produced products, but do not allow the lookups to find them
824  // until after the callbacks have run.
825  subRunPrincipal_->createGroupsForProducedProducts(
827  psSignals_->sPostReadSubRun.invoke(*subRunPrincipal_);
828  subRunPrincipal_->enableLookupOfProducedProducts();
829  {
830  auto const sr =
831  std::as_const(*subRunPrincipal_).makeSubRun(invalid_module_context);
832  actReg_.sPostSourceSubRun.invoke(sr);
833  }
834  FDEBUG(1) << string(8, ' ') << "readSubRun..................("
835  << subRunPrincipal_->subRunID() << ")\n";
836  }
tsan_unique_ptr< InputSource > input_
void seedSubRunRangeSet(RangeSetHandler const &rsh)
Definition: Schedule.h:156
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreSourceSubRun
ScheduleIteration scheduleIteration_
tsan_unique_ptr< RunPrincipal > runPrincipal_
tsan< ProducingServiceSignals > psSignals_
tsan< ProductTables > producedProductLookupTables_
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
void for_each_schedule(F f) const
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostSourceSubRun
void art::EventProcessor::respondToCloseInputFile ( )
private

Definition at line 607 of file EventProcessor.cc.

References fb_, FDEBUG, art::ScheduleIteration::for_each_schedule(), art::Schedule::respondToCloseInputFile(), schedule(), and scheduleIteration_.

Referenced by closeInputFile(), and recordOutputModuleClosureRequests().

608  {
609  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
611  });
612  FDEBUG(1) << string(8, ' ') << "respondToCloseInputFile\n";
613  }
void respondToCloseInputFile(FileBlock const &)
Definition: Schedule.cc:54
tsan_unique_ptr< FileBlock > fb_
ScheduleIteration scheduleIteration_
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void for_each_schedule(F f) const
void art::EventProcessor::respondToCloseOutputFiles ( )
private

Definition at line 625 of file EventProcessor.cc.

References fb_, FDEBUG, art::ScheduleIteration::for_each_schedule(), art::Schedule::respondToCloseOutputFiles(), schedule(), and scheduleIteration_.

Referenced by closeAllOutputFiles(), closeSomeOutputFiles(), process(), and recordOutputModuleClosureRequests().

626  {
627  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
629  });
630  FDEBUG(1) << string(8, ' ') << "respondToCloseOutputFiles\n";
631  }
void respondToCloseOutputFiles(FileBlock const &)
Definition: Schedule.cc:68
tsan_unique_ptr< FileBlock > fb_
ScheduleIteration scheduleIteration_
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void for_each_schedule(F f) const
void art::EventProcessor::respondToOpenInputFile ( )
private

Definition at line 598 of file EventProcessor.cc.

References fb_, FDEBUG, art::ScheduleIteration::for_each_schedule(), art::Schedule::respondToOpenInputFile(), schedule(), and scheduleIteration_.

Referenced by openInputFile(), and recordOutputModuleClosureRequests().

599  {
600  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
602  });
603  FDEBUG(1) << string(8, ' ') << "respondToOpenInputFile\n";
604  }
tsan_unique_ptr< FileBlock > fb_
ScheduleIteration scheduleIteration_
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void for_each_schedule(F f) const
void respondToOpenInputFile(FileBlock const &)
Definition: Schedule.cc:47
void art::EventProcessor::respondToOpenOutputFiles ( )
private

Definition at line 616 of file EventProcessor.cc.

References fb_, FDEBUG, art::ScheduleIteration::for_each_schedule(), art::Schedule::respondToOpenOutputFiles(), schedule(), and scheduleIteration_.

Referenced by openSomeOutputFiles(), and recordOutputModuleClosureRequests().

617  {
618  scheduleIteration_.for_each_schedule([this](ScheduleID const sid) {
620  });
621  FDEBUG(1) << string(8, ' ') << "respondToOpenOutputFiles\n";
622  }
tsan_unique_ptr< FileBlock > fb_
ScheduleIteration scheduleIteration_
void respondToOpenOutputFiles(FileBlock const &)
Definition: Schedule.cc:61
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void for_each_schedule(F f) const
EventProcessor::StatusCode art::EventProcessor::runToCompletion ( )

Definition at line 1499 of file EventProcessor.cc.

References ec_, epSignal, epSuccess, art::shutdown_flag, and terminateAbnormally_().

1500  {
1501  StatusCode returnCode{epSuccess};
1502  ec_->call([this, &returnCode] {
1503  process<highest_level()>();
1504  if (shutdown_flag > 0) {
1505  returnCode = epSignal;
1506  }
1507  });
1508  if (!ec_->empty()) {
1510  ec_->rethrow();
1511  }
1512  return returnCode;
1513  }
tsan< detail::ExceptionCollector > ec_
std::atomic< int > shutdown_flag
void art::EventProcessor::setOutputFileStatus ( OutputFileStatus  ofs)
private

Definition at line 578 of file EventProcessor.cc.

References FDEBUG, main_schedule(), and art::Schedule::setOutputFileStatus().

Referenced by levelsToProcess(), process(), and recordOutputModuleClosureRequests().

579  {
581  FDEBUG(1) << string(8, ' ') << "setOutputFileStatus\n";
582  }
Schedule & main_schedule()
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void setOutputFileStatus(OutputFileStatus const ofs)
Definition: Schedule.h:118
void art::EventProcessor::setRunAuxiliaryRangeSetID ( )
private

Definition at line 715 of file EventProcessor.cc.

References art::RangeSetHandler::clone(), FDEBUG, art::RangeSetHandler::flushRanges(), art::ScheduleIteration::for_each_schedule(), art::RangeSet::invalid(), main_schedule(), art::RangeSetHandler::Open, runPrincipal_, art::Schedule::runRangeSetHandler(), schedule(), scheduleIteration_, art::RangeSetHandler::seenRanges(), art::Schedule::setRunAuxiliaryRangeSetID(), art::Switching, and tmp.

Referenced by art::EventProcessor::finalize< Level::Run >(), and recordOutputModuleClosureRequests().

716  {
717  assert(runPrincipal_);
718  FDEBUG(1) << string(8, ' ') << "setRunAuxiliaryRangeSetID...("
719  << runPrincipal_->runID() << ")\n";
720  if (main_schedule().runRangeSetHandler().type() ==
722  // We are using EmptyEvent source, need to merge what the
723  // schedules have seen.
724  auto mergedRS = RangeSet::invalid();
725  auto merge_range_sets = [this, &mergedRS](ScheduleID const sid) {
726  auto const& rs = schedule(sid).runRangeSetHandler().seenRanges();
727  // The following constructor ensures that the range is sorted
728  // before 'merge' is called.
729  RangeSet const tmp{rs.run(), rs.ranges()};
730  mergedRS.merge(tmp);
731  };
732  scheduleIteration_.for_each_schedule(merge_range_sets);
733  runPrincipal_->updateSeenRanges(mergedRS);
734  auto update_executors = [this, &mergedRS](ScheduleID const sid) {
735  schedule(sid).setRunAuxiliaryRangeSetID(mergedRS);
736  };
737  scheduleIteration_.for_each_schedule(update_executors);
738  return;
739  }
740 
741  // Since we are using already existing ranges, all the range set
742  // handlers have the same ranges, use the first one. handler with
743  // the largest event number, that will be the one which we will
744  // use as the file switch boundary. Note that is may not match
745  // the exactly the schedule that triggered the switch. Do we need
746  // to fix this?
747  unique_ptr<RangeSetHandler> rshAtSwitch{
749  if (main_schedule().fileStatus() != OutputFileStatus::Switching) {
750  // We are at the end of the job.
751  rshAtSwitch->flushRanges();
752  }
753  runPrincipal_->updateSeenRanges(rshAtSwitch->seenRanges());
754  main_schedule().setRunAuxiliaryRangeSetID(rshAtSwitch->seenRanges());
755  }
RangeSetHandler const & runRangeSetHandler()
Definition: Schedule.h:149
Schedule & main_schedule()
ScheduleIteration scheduleIteration_
Float_t tmp
Definition: plot.C:35
tsan_unique_ptr< RunPrincipal > runPrincipal_
void setRunAuxiliaryRangeSetID(RangeSet const &rs)
Definition: Schedule.h:137
RangeSet seenRanges() const
static RangeSet invalid()
Definition: RangeSet.cc:45
RangeSetHandler * clone() const
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void for_each_schedule(F f) const
void art::EventProcessor::setSubRunAuxiliaryRangeSetID ( )
private

Definition at line 887 of file EventProcessor.cc.

References art::RangeSetHandler::clone(), art::EventID::event(), art::ClosedRangeSetHandler::eventInfo(), FDEBUG, art::ScheduleID::first(), art::ScheduleIteration::for_each_schedule(), art::ClosedRangeSetHandler::EventInfo::id(), art::RangeSet::invalid(), main_schedule(), art::RangeSetHandler::maybeSplitRange(), art::RangeSetHandler::Open, art::Schedule::runRangeSetHandler(), schedule(), scheduleIteration_, art::Schedule::seedRunRangeSet(), art::Schedule::seedSubRunRangeSet(), art::RangeSetHandler::seenRanges(), art::Schedule::setSubRunAuxiliaryRangeSetID(), subRunPrincipal_, art::Schedule::subRunRangeSetHandler(), art::Switching, and tmp.

Referenced by art::EventProcessor::finalize< Level::SubRun >(), and recordOutputModuleClosureRequests().

888  {
889  assert(subRunPrincipal_);
890  FDEBUG(1) << string(8, ' ') << "setSubRunAuxiliaryRangeSetID("
891  << subRunPrincipal_->subRunID() << ")\n";
892  if (main_schedule().subRunRangeSetHandler().type() ==
894  // We are using EmptyEvent source, need to merge what the
895  // schedules have seen.
896  auto mergedRS = RangeSet::invalid();
897  auto merge_range_sets = [this, &mergedRS](ScheduleID const sid) {
898  auto const& rs = schedule(sid).subRunRangeSetHandler().seenRanges();
899  // The following constructor ensures that the range is sorted
900  // before 'merge' is called.
901  RangeSet const tmp{rs.run(), rs.ranges()};
902  mergedRS.merge(tmp);
903  };
904  scheduleIteration_.for_each_schedule(merge_range_sets);
905  subRunPrincipal_->updateSeenRanges(mergedRS);
906  auto update_executors = [this, &mergedRS](ScheduleID const sid) {
907  schedule(sid).setSubRunAuxiliaryRangeSetID(mergedRS);
908  };
909  scheduleIteration_.for_each_schedule(update_executors);
910  return;
911  }
912  // Ranges are split/flushed only for a RangeSetHandler whose
913  // dynamic type is 'ClosedRangeSetHandler'.
914  //
915  // Consider the following range-sets
916  //
917  // SubRun RangeSet:
918  //
919  // { Run 1 : SubRun 1 : Events [1,7) } <-- Current
920  //
921  // Run RangeSet:
922  //
923  // { Run 1 : SubRun 0 : Events [5,11)
924  // SubRun 1 : Events [1,7) <-- Current
925  // SubRun 1 : Events [9,15) }
926  //
927  // For a range split just before SubRun 1, Event 6, the
928  // range sets should become:
929  //
930  // SubRun RangeSet:
931  //
932  // { Run 1 : SubRun 1 : Events [1,6)
933  // SubRun 1 : Events [6,7) } <-- Updated
934  //
935  // Run RangeSet:
936  //
937  // { Run 1 : SubRun 0 : Events [5,11)
938  // SubRun 1 : Events [1,6)
939  // SubRun 1 : Events [6,7) <-- Updated
940  // SubRun 1 : Events [9,15) }
941  //
942  // Since we are using already existing ranges, all the range set
943  // handlers have the same ranges. Find the closed range set
944  // handler with the largest event number, that will be the one
945  // which we will use as the file switch boundary. Note that is
946  // may not match the exactly the schedule that triggered the
947  // switch. Do we need to fix this?
948  //
949  // If we do not find any handlers with valid event info then we
950  // use the first one, which is just fine. This happens for
951  // example when we are dropping all events.
952  unsigned largestEvent = 1U;
953  ScheduleID idxOfMax{ScheduleID::first()};
954  ScheduleID idx{ScheduleID::first()};
955  auto& val = main_schedule().subRunRangeSetHandler();
956  auto& rsh = dynamic_cast<ClosedRangeSetHandler const&>(val);
957  // Make sure the event number is a valid event number before using
958  // it. It can be invalid in the handler if we have not yet read an
959  // event, which happens with empty subruns and when we are
960  // dropping all events.
961  if (rsh.eventInfo().id().isValid() && !rsh.eventInfo().id().isFlush()) {
962  if (rsh.eventInfo().id().event() > largestEvent) {
963  largestEvent = rsh.eventInfo().id().event();
964  idxOfMax = idx;
965  }
966  }
967  idx = idx.next();
968 
969  unique_ptr<RangeSetHandler> rshAtSwitch{
971  if (main_schedule().fileStatus() == OutputFileStatus::Switching) {
972  rshAtSwitch->maybeSplitRange();
973  unique_ptr<RangeSetHandler> runRSHAtSwitch{
974  schedule(idxOfMax).runRangeSetHandler().clone()};
975  runRSHAtSwitch->maybeSplitRange();
976  main_schedule().seedRunRangeSet(*runRSHAtSwitch);
977  } else {
978  // We are at the end of the job.
979  rshAtSwitch->flushRanges();
980  }
981  main_schedule().seedSubRunRangeSet(*rshAtSwitch);
982  subRunPrincipal_->updateSeenRanges(rshAtSwitch->seenRanges());
983  main_schedule().setSubRunAuxiliaryRangeSetID(rshAtSwitch->seenRanges());
984  }
void setSubRunAuxiliaryRangeSetID(RangeSet const &rs)
Definition: Schedule.h:161
RangeSetHandler const & runRangeSetHandler()
Definition: Schedule.h:149
void seedSubRunRangeSet(RangeSetHandler const &rsh)
Definition: Schedule.h:156
Schedule & main_schedule()
static constexpr ScheduleID first()
Definition: ScheduleID.h:50
ScheduleIteration scheduleIteration_
Float_t tmp
Definition: plot.C:35
RangeSetHandler const & subRunRangeSetHandler()
Definition: Schedule.h:172
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
RangeSet seenRanges() const
static RangeSet invalid()
Definition: RangeSet.cc:45
RangeSetHandler * clone() const
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void for_each_schedule(F f) const
void seedRunRangeSet(RangeSetHandler const &rsh)
Definition: Schedule.h:131
void art::EventProcessor::terminateAbnormally_ ( )
private

Definition at line 1544 of file EventProcessor.cc.

Referenced by recordOutputModuleClosureRequests(), and runToCompletion().

1545  {
1546  if (ServiceRegistry::isAvailable<RandomNumberGenerator>()) {
1547  ServiceHandle<RandomNumberGenerator>()->saveToFile_();
1548  }
1549  }
1550  catch (...) {
1551  }
void art::EventProcessor::writeEvent ( )
private
void art::EventProcessor::writeRun ( )
private

Definition at line 794 of file EventProcessor.cc.

References FDEBUG, main_schedule(), r, runPrincipal_, and art::Schedule::writeRun().

Referenced by art::EventProcessor::finalize< Level::Run >(), and recordOutputModuleClosureRequests().

795  {
796  assert(runPrincipal_);
797  // Precondition: The RunID does not correspond to a flush ID.
798  RunID const r{runPrincipal_->runID()};
799  assert(!r.isFlush());
801  FDEBUG(1) << string(8, ' ') << "writeRun....................(" << r
802  << ")\n";
803  }
TRandom r
Definition: spectrum.C:23
Schedule & main_schedule()
tsan_unique_ptr< RunPrincipal > runPrincipal_
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void writeRun(RunPrincipal &rp)
Definition: Schedule.h:143
void art::EventProcessor::writeSubRun ( )
private

Definition at line 1023 of file EventProcessor.cc.

References FDEBUG, main_schedule(), subRunPrincipal_, and art::Schedule::writeSubRun().

Referenced by art::EventProcessor::finalize< Level::SubRun >(), and recordOutputModuleClosureRequests().

1024  {
1025  assert(subRunPrincipal_);
1026  // Precondition: The SubRunID does not correspond to a flush ID.
1027  SubRunID const& sr{subRunPrincipal_->subRunID()};
1028  assert(!sr.isFlush());
1030  FDEBUG(1) << string(8, ' ') << "writeSubRun.................(" << sr
1031  << ")\n";
1032  }
Schedule & main_schedule()
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
void writeSubRun(SubRunPrincipal &srp)
Definition: Schedule.h:166

Member Data Documentation

std::atomic<bool> art::EventProcessor::beginRunCalled_ {false}
private
std::atomic<bool> art::EventProcessor::beginSubRunCalled_ {false}
private
tsan<detail::ExceptionCollector> art::EventProcessor::ec_ {}
private

Definition at line 167 of file EventProcessor.h.

Referenced by endJob(), process(), and runToCompletion().

PerScheduleContainer<std::unique_ptr<EventPrincipal> > art::EventProcessor::eventPrincipals_ {}
private

Definition at line 246 of file EventProcessor.h.

std::atomic<bool> art::EventProcessor::fileSwitchInProgress_ {false}
private

Definition at line 266 of file EventProcessor.h.

Referenced by process(), and readAndProcessAsync().

std::atomic<bool> art::EventProcessor::finalizeRunEnabled_ {false}
private
std::atomic<bool> art::EventProcessor::finalizeSubRunEnabled_ {false}
private
std::atomic<bool> art::EventProcessor::firstEvent_ {true}
private

Definition at line 263 of file EventProcessor.h.

Referenced by process(), and readAndProcessAsync().

bool const art::EventProcessor::handleEmptyRuns_
private

Definition at line 249 of file EventProcessor.h.

Referenced by EventProcessor(), and art::EventProcessor::begin< Level::Run >().

bool const art::EventProcessor::handleEmptySubRuns_
private
tsan<MFStatusUpdater> art::EventProcessor::mfStatusUpdater_ {actReg_}
private

Definition at line 189 of file EventProcessor.h.

std::atomic<Level> art::EventProcessor::nextLevel_ {Level::ReadyToAdvance}
private
tsan<UpdateOutputCallbacks> art::EventProcessor::outputCallbacks_ {}
private

Definition at line 194 of file EventProcessor.h.

Referenced by EventProcessor().

tsan<PathManager> art::EventProcessor::pathManager_
private

Definition at line 228 of file EventProcessor.h.

Referenced by endJob(), EventProcessor(), and invokePostBeginJobWorkers_().

tsan<ProductDescriptions> art::EventProcessor::producedProductDescriptions_ {}
private

Definition at line 200 of file EventProcessor.h.

Referenced by EventProcessor().

tsan<ProductTables> art::EventProcessor::producedProductLookupTables_ {ProductTables::invalid()}
private

Definition at line 208 of file EventProcessor.h.

Referenced by EventProcessor(), readAndProcessAsync(), readRun(), and readSubRun().

tsan<ProducingServiceSignals> art::EventProcessor::psSignals_ {}
private

Definition at line 210 of file EventProcessor.h.

Referenced by EventProcessor(), readAndProcessAsync(), readRun(), and readSubRun().

tsan<Scheduler> art::EventProcessor::scheduler_
private

Definition at line 215 of file EventProcessor.h.

Referenced by endJob(), error_action(), EventProcessor(), and process().

tsan<std::map<ScheduleID, Schedule> > art::EventProcessor::schedules_ {}
private

Definition at line 234 of file EventProcessor.h.

Referenced by EventProcessor(), and schedule().

tsan_unique_ptr<ServicesManager> art::EventProcessor::servicesManager_
private

Definition at line 224 of file EventProcessor.h.

Referenced by EventProcessor().

SharedException art::EventProcessor::sharedException_
private
detail::SharedResources art::EventProcessor::sharedResources_ {}
private

Definition at line 219 of file EventProcessor.h.

Referenced by beginJob(), and EventProcessor().

std::unique_ptr<GlobalTaskGroup> art::EventProcessor::taskGroup_ {nullptr}
private

Definition at line 217 of file EventProcessor.h.

Referenced by EventProcessor(), and process().

tsan<cet::cpu_timer> art::EventProcessor::timer_ {}
private

The documentation for this class was generated from the following files: