41 #include "cetlib/bold_fontify.h" 42 #include "cetlib/container_algorithms.h" 43 #include "cetlib/trim.h" 46 #include "hep_concurrency/WaitingTask.h" 60 using namespace string_literals;
68 ActivityRegistry& actReg,
69 detail::SharedResources& resources)
73 services_pset.
erase(
"FloatingPointControl");
74 services_pset.
erase(
"message");
75 services_pset.
erase(
"scheduler");
77 new ServicesManager{std::move(services_pset), actReg, resources};
78 mgr->addSystemService<FloatingPointControl>(fpcPSet, actReg);
82 auto const invalid_module_context = ModuleContext::invalid();
85 EventProcessor::~EventProcessor() =
default;
100 std::move(enabled_modules)}
105 auto const scheduler_pset = services_pset.
get<
ParameterSet>(
"scheduler");
133 <<
" nthreads: " <<
scheduler_->num_threads();
135 auto const errorOnMissingConsumes =
scheduler_->errorOnMissingConsumes();
168 auto& trigger_paths_info [[maybe_unused]] =
170 assert(trigger_names->getTrigPaths() == trigger_paths_info.pathNames());
173 std::forward_as_tuple(sid),
174 std::forward_as_tuple(sid,
183 FDEBUG(2) << pset.to_string() << endl;
189 main_input.
put(
"module_type",
"EmptyEvent");
190 main_input.
put(
"module_label",
"source");
191 main_input.
put(
"maxEvents", -1);
192 if (!pset.get_if_present(
"source", main_input)) {
194 <<
"Could not find a source configuration: using default.";
198 main_input.
get<
string>(
"module_type"),
199 main_input.
get<
string>(
"module_label"),
208 <<
"\n\nModule label: " << cet::bold_fontify(md.moduleLabel())
209 <<
"\nmodule_type : " << cet::bold_fontify(md.moduleName()) <<
"\n\n" 215 <<
"Configuration of main input source has failed\n" 222 <<
"Configuration of main input source has failed\n" 238 using cet::transform_all;
241 vector<Worker*> allWorkers;
243 back_inserter(allWorkers),
244 [](
auto const& pr) {
return pr.second.get(); });
246 back_inserter(allWorkers),
247 [](
auto const& pr) {
return pr.second.get(); });
266 finalizeContainingLevels<L>();
278 <<
" Current level: " << L
286 EventProcessor::begin<Level::Job>()
294 EventProcessor::begin<Level::InputFile>()
301 EventProcessor::begin<Level::Run>()
315 EventProcessor::begin<Level::SubRun>()
332 EventProcessor::finalize<Level::SubRun>()
354 EventProcessor::finalize<Level::Run>()
376 EventProcessor::finalize<Level::InputFile>()
387 EventProcessor::finalize<Level::Job>()
395 EventProcessor::finalizeContainingLevels<Level::SubRun>()
397 finalize<Level::Run>();
402 EventProcessor::finalizeContainingLevels<Level::Event>()
404 finalize<Level::SubRun>();
405 finalize<Level::Run>();
410 EventProcessor::recordOutputModuleClosureRequests<Level::Run>()
417 EventProcessor::recordOutputModuleClosureRequests<Level::SubRun>()
424 EventProcessor::recordOutputModuleClosureRequests<Level::Event>()
435 FDEBUG(1) << string(8,
' ') <<
"beginJob\n";
447 mf::LogError(
"BeginJob") <<
"A cet::exception happened while processing" 448 " the beginJob of the 'source'";
449 e <<
"A cet::exception happened while processing" 450 " the beginJob of the 'source'\n";
454 mf::LogError(
"BeginJob") <<
"A exception happened while processing" 455 " the beginJob of the 'source'";
459 mf::LogError(
"BeginJob") <<
"An unknown exception happened while" 460 " processing the beginJob of the 'source'";
473 FDEBUG(1) << string(8,
' ') <<
"endJob\n";
476 ec_->call([
this] {
input_->doEndJob(); });
498 FDEBUG(1) << string(8,
' ') <<
"openInputFile\n";
500 if (
fb_ ==
nullptr) {
502 <<
"Source readFile() did not return a valid FileBlock: FileBlock " 503 <<
"should be valid or readFile() should throw.\n";
533 FDEBUG(1) << string(8,
' ') <<
"closeInputFile\n";
544 FDEBUG(1) << string(8,
' ') <<
"closeAllOutputFiles\n";
550 bool outputs_to_open{
false};
551 auto check_outputs_to_open = [
this,
554 outputs_to_open =
true;
558 return outputs_to_open;
568 auto open_some_outputs = [
this](
ScheduleID const sid) {
573 FDEBUG(1) << string(8,
' ') <<
"openSomeOutputFiles\n";
581 FDEBUG(1) << string(8,
' ') <<
"setOutputFileStatus\n";
594 FDEBUG(1) << string(8,
' ') <<
"closeSomeOutputFiles\n";
603 FDEBUG(1) << string(8,
' ') <<
"respondToOpenInputFile\n";
612 FDEBUG(1) << string(8,
' ') <<
"respondToCloseInputFile\n";
621 FDEBUG(1) << string(8,
' ') <<
"respondToOpenOutputFiles\n";
630 FDEBUG(1) << string(8,
' ') <<
"respondToCloseOutputFiles\n";
642 auto rsh =
input_->runRangeSetHandler();
644 auto seed_range_set = [
this, &rsh](
ScheduleID const sid) {
659 std::as_const(*runPrincipal_).makeRun(invalid_module_context);
662 FDEBUG(1) << string(8,
' ') <<
"readRun.....................(" 678 std::as_const(*runPrincipal_).makeRun(invalid_module_context);
686 std::as_const(*runPrincipal_).makeRun(invalid_module_context);
693 "EventProcessor: an exception occurred during current event processing",
698 <<
"an exception occurred during current event processing";
701 FDEBUG(1) << string(8,
' ') <<
"beginRun....................(" <<
r 718 FDEBUG(1) << string(8,
' ') <<
"setRunAuxiliaryRangeSetID...(" 725 auto merge_range_sets = [
this, &mergedRS](
ScheduleID const sid) {
734 auto update_executors = [
this, &mergedRS](
ScheduleID const sid) {
747 unique_ptr<RangeSetHandler> rshAtSwitch{
766 assert(!run.isFlush());
774 std::as_const(*runPrincipal_).makeRun(invalid_module_context);
780 "EventProcessor: an exception occurred during current event processing",
785 <<
"an exception occurred during current event processing";
788 FDEBUG(1) << string(8,
' ') <<
"endRun......................(" << run
799 assert(!
r.isFlush());
801 FDEBUG(1) << string(8,
' ') <<
"writeRun....................(" <<
r 814 auto rsh =
input_->subRunRangeSetHandler();
816 auto seed_range_set = [
this, &rsh](
ScheduleID const sid) {
831 std::as_const(*subRunPrincipal_).makeSubRun(invalid_module_context);
834 FDEBUG(1) << string(8,
' ') <<
"readSubRun..................(" 850 std::as_const(*subRunPrincipal_).makeSubRun(invalid_module_context);
858 std::as_const(*subRunPrincipal_).makeSubRun(invalid_module_context);
865 "EventProcessor: an exception occurred during current event processing",
870 <<
"an exception occurred during current event processing";
873 FDEBUG(1) << string(8,
' ') <<
"beginSubRun.................(" << sr
890 FDEBUG(1) << string(8,
' ') <<
"setSubRunAuxiliaryRangeSetID(" 897 auto merge_range_sets = [
this, &mergedRS](
ScheduleID const sid) {
906 auto update_executors = [
this, &mergedRS](
ScheduleID const sid) {
952 unsigned largestEvent = 1U;
961 if (rsh.eventInfo().id().isValid() && !rsh.eventInfo().id().isFlush()) {
962 if (rsh.eventInfo().id().event() > largestEvent) {
969 unique_ptr<RangeSetHandler> rshAtSwitch{
973 unique_ptr<RangeSetHandler> runRSHAtSwitch{
979 rshAtSwitch->flushRanges();
995 assert(!sr.isFlush());
1003 std::as_const(*subRunPrincipal_).makeSubRun(invalid_module_context);
1009 "EventProcessor: an exception occurred during current event processing",
1014 <<
"an exception occurred during current event processing";
1017 FDEBUG(1) << string(8,
' ') <<
"endSubRun...................(" << sr
1028 assert(!sr.isFlush());
1030 FDEBUG(1) << string(8,
' ') <<
"writeSubRun.................(" << sr
1039 EventProcessor::process<most_deeply_nested_level()>()
1052 auto const last_schedule_index =
scheduler_->num_schedules() - 1;
1056 taskGroup_->native_group().run_and_wait([
this, last_schedule_index] {
1068 finalizeContainingLevels<most_deeply_nested_level()>();
1071 FDEBUG(1) << string(8,
' ') <<
"closeSomeOutputFiles\n";
1149 auto expected =
true;
1150 if (
firstEvent_.compare_exchange_strong(expected,
false)) {
1175 if (
schedule(sid).outputsToClose()) {
1187 TDEBUG_FUNC_SI(5, sid) <<
"Calling input_->readEvent(subRunPrincipal_)";
1197 ep->enableLookupOfProducedProducts();
1199 std::as_const(*ep).makeEvent(invalid_module_context),
sc);
1200 FDEBUG(1) << string(8,
' ') <<
"readEvent...................(" 1201 << ep->eventID() <<
")\n";
1206 if (
schedule(sid).event_principal().eventID().isFlush()) {
1222 : evp_{evp}, sid_{sid}
1231 rethrow_exception(ex);
1235 evp_->sharedException_.store<
Exception>(
1237 "EventProcessor: an exception occurred during current " 1244 <<
"exception being ignored for current event:\n" 1245 << cet::trim_right_copy(e.what(),
" \n");
1250 <<
"an exception occurred during current event processing";
1251 evp_->sharedException_.store_current();
1257 evp_->finishEventAsync(sid_);
1271 : evp_{evp}, sid_{sid}
1281 rethrow_exception(ex);
1284 auto const action = evp_->error_action(e);
1290 <<
"Skipping event due to the following exception:\n" 1291 << cet::trim_right_copy(e.what(),
" \n");
1293 <<
"skipping event because of EXCEPTION";
1296 evp_->sharedException_.store<
Exception>(
1298 "EventProcessor: an exception occurred during current " 1302 <<
"terminate event loop because of EXCEPTION";
1306 <<
"exception being ignored for current event:\n" 1307 << cet::trim_right_copy(e.what(),
" \n");
1312 <<
"an exception occurred during current event processing";
1313 evp_->sharedException_.store_current();
1315 <<
"terminate event loop because of EXCEPTION";
1320 auto finalize_event_task =
1321 make_waiting_task<EndPathRunnerTask>(evp_, sid_);
1323 evp_->schedule(sid_).process_event_observers(finalize_event_task);
1327 evp_->sharedException_.store<
Exception>(
1329 "EventProcessor: an exception occurred during current event " 1333 <<
"terminate event loop because of EXCEPTION";
1337 <<
"exception being ignored for current event:\n" 1338 << cet::trim_right_copy(e.what(),
" \n");
1343 <<
"an exception occurred during current event processing";
1344 evp_->sharedException_.store_current();
1346 <<
"terminate event loop because of EXCEPTION";
1376 assert(!
schedule(sid).event_principal().eventID().isFlush());
1378 auto endPathTask = make_waiting_task<EndPathTask>(
this, sid);
1394 "EventProcessor: an exception occurred during current event processing",
1400 <<
"exception being ignored for current event:\n" 1401 << cet::trim_right_copy(e.what(),
" \n");
1406 <<
"an exception occurred during current event processing";
1416 std::as_const(ep).makeEvent(invalid_module_context),
1421 FDEBUG(1) << string(8,
' ') <<
"processEvent................(" 1422 << ep.eventID() <<
")\n";
1427 FDEBUG(1) << string(8,
' ') <<
"shouldWeStop\n";
1428 static std::mutex m;
1429 std::lock_guard sentry{m};
1432 if (!ep.eventID().isFlush()) {
1440 auto const id = ep.eventID();
1442 FDEBUG(1) << string(8,
' ') <<
"writeEvent..................(" <<
id 1446 <<
"Calling schedules_->" 1447 "recordOutputClosureRequests(Granularity::Event)";
1454 "EventProcessor: an exception occurred " 1455 "during current event processing",
1462 <<
"exception being ignored for current event:\n" 1463 << cet::trim_right_copy(e.what(),
" \n");
1468 <<
"an exception occurred during current event processing";
1487 ec_->call([
this] { begin<L>(); });
1489 levelsToProcess<level_down(L)>()) {
1490 ec_->call([
this] { process<level_down(L)>(); });
1494 recordOutputModuleClosureRequests<L>();
1502 ec_->call([
this, &returnCode] {
1503 process<highest_level()>();
1508 if (!
ec_->empty()) {
1518 auto const itemType =
input_->nextItemType();
1519 FDEBUG(1) << string(4,
' ') <<
"*** nextItemType: " << itemType <<
" ***\n";
1533 <<
"Invalid next item type presented to the event processor.\n" 1534 <<
"Please contact artists@fnal.gov.";
1537 <<
"Unrecognized next item type presented to the event processor.\n" 1538 <<
"Please contact artists@fnal.gov.";
1546 if (ServiceRegistry::isAvailable<RandomNumberGenerator>()) {
void readAndProcessAsync(ScheduleID sid)
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
void closeAllOutputFiles()
void respondToCloseOutputFiles(FileBlock const &)
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreOpenFile
tsan_unique_ptr< InputSource > input_
void setSubRunAuxiliaryRangeSetID(RangeSet const &rs)
void respondToCloseInputFile(FileBlock const &)
tsan< ProductDescriptions > producedProductDescriptions_
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPostBeginJob
void throw_if_stored_exception()
void respondToOpenOutputFiles()
static ConsumesInfo * instance()
void respondToCloseInputFile()
RangeSetHandler const & runRangeSetHandler()
tsan< UpdateOutputCallbacks > outputCallbacks_
tsan< Scheduler > scheduler_
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreCloseFile
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
GlobalSignal< detail::SignalResponseType::LIFO, void(Event const &, ScheduleContext)> sPostSourceEvent
tsan_unique_ptr< FileBlock > fb_
void openSomeOutputFiles()
#define TDEBUG_END_TASK_SI(LEVEL, SI)
constexpr auto most_deeply_nested_level() noexcept
bool const handleEmptyRuns_
GlobalSignal< detail::SignalResponseType::LIFO, void()> sPostEndJob
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostSourceRun
std::atomic< Level > nextLevel_
bool outputsToClose() const
void invokePostBeginJobWorkers_()
void seedSubRunRangeSet(RangeSetHandler const &rsh)
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreSourceSubRun
Schedule & main_schedule()
void process(Transition, Principal &)
void beginJob(detail::SharedResources const &resources)
static constexpr ScheduleID first()
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostBeginRun
std::atomic< bool > firstEvent_
bool const handleEmptySubRuns_
tsan< PathManager > pathManager_
ScheduleIteration scheduleIteration_
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostBeginSubRun
fhicl::ParameterSet const & triggerPSet() const
std::unique_ptr< GlobalTaskGroup > taskGroup_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
tsan< detail::ExceptionCollector > ec_
tsan_unique_ptr< RunPrincipal > runPrincipal_
tsan< ProducingServiceSignals > psSignals_
std::atomic< int > shutdown_flag
void setOutputFileStatus(OutputFileStatus)
void processAllEventsAsync(ScheduleID sid)
void store(std::exception_ptr ex_ptr)
ProcessConfigurationID id() const
GlobalSignal< detail::SignalResponseType::LIFO, void(InputSource *, std::vector< Worker * > const &)> sPostBeginJobWorkers
EndPathRunnerTask(EventProcessor *evp, ScheduleID const sid)
void setRunAuxiliaryRangeSetID(RangeSet const &rs)
decltype(auto) constexpr end(T &&obj)
ADL-aware version of std::end.
void freeze(tbb::task_group &group)
tsan< cet::cpu_timer > timer_
EventPrincipal & event_principal()
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostSourceConstruction
EventID const & id() const
static auto instance(bool cleanup=false)
tsan< std::map< ScheduleID, Schedule > > schedules_
ScheduleID::size_type nschedules() const
detail::SharedResources sharedResources_
tsan< ProductTables > producedProductLookupTables_
#define TDEBUG_FUNC(LEVEL)
void writeSummary(PathManager &pm, bool wantSummary, cet::cpu_timer const &timer)
std::atomic< bool > finalizeSubRunEnabled_
T get(std::string const &key) const
actions::ActionCodes error_action(cet::exception &e) const
std::atomic< bool > beginRunCalled_
EndPathTask(EventProcessor *evp, ScheduleID const sid)
std::atomic< bool > finalizeRunEnabled_
std::atomic< bool > fileSwitchInProgress_
RangeSetHandler const & subRunRangeSetHandler()
#define TDEBUG_FUNC_SI(LEVEL, SI)
std::string const & getReleaseVersion()
void recordOutputClosureRequests(Granularity const granularity)
tsan_unique_ptr< ServicesManager > servicesManager_
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
void closeSomeOutputFiles()
std::atomic< bool > beginSubRunCalled_
void setupSignals(bool want_sigint_enabled)
EventInfo const & eventInfo() const
void finishEventAsync(ScheduleID sid)
ParameterSetID id() const
void respondToOpenOutputFiles(FileBlock const &)
GlobalSignal< detail::SignalResponseType::LIFO, void(Event const &, ScheduleContext)> sPostProcessEvent
void incrementInputFileNumber()
RangeSet seenRanges() const
StatusCode runToCompletion()
bool outputsToOpen() const
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostEndRun
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
SharedException sharedException_
GlobalSignal< detail::SignalResponseType::FIFO, void(Run const &)> sPreBeginRun
char const * what() const noexcept override
void showMissingConsumes() const
GlobalSignal< detail::SignalResponseType::FIFO, void(SubRunID const &, Timestamp const &)> sPreEndSubRun
std::string const & processName() const
void setRunAuxiliaryRangeSetID()
void processEventAsync(ScheduleID sid)
void beginRunIfNotDoneAlready()
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &)> sPostOpenFile
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
void endJobAllSchedules()
static RangeSet invalid()
void closeSomeOutputFiles()
void setRequireConsumes(bool const)
void accept_principal(std::unique_ptr< EventPrincipal > principal)
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
GlobalSignal< detail::SignalResponseType::FIFO, void(RunID const &, Timestamp const &)> sPreEndRun
EventNumber_t event() const
RangeSetHandler * clone() const
bool erase(std::string const &key)
GlobalSignal< detail::SignalResponseType::FIFO, void(SubRun const &)> sPreBeginSubRun
Schedule & schedule(ScheduleID const id)
constexpr auto highest_level() noexcept
void for_each_schedule(F f) const
static Globals * instance()
void process_event_modifiers(hep::concurrency::WaitingTaskPtr endPathTask)
GlobalSignal< detail::SignalResponseType::LIFO, void()> sPostCloseFile
void closeAllOutputFiles()
void beginSubRunIfNotDoneAlready()
void seedRunRangeSet(RangeSetHandler const &rsh)
void writeRun(RunPrincipal &rp)
GlobalSignal< detail::SignalResponseType::FIFO, void(ScheduleContext)> sPreSourceEvent
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostSourceSubRun
void respondToOpenInputFile(FileBlock const &)
void openSomeOutputFiles(FileBlock const &fb)
void operator()(exception_ptr const ex)
void respondToOpenInputFile()
void put(std::string const &key)
void setSubRunAuxiliaryRangeSetID()
void writeSubRun(SubRunPrincipal &srp)
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostEndSubRun
cet::coded_exception< error, detail::translate > exception
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreSourceRun
void terminateAbnormally_()
void operator()(std::exception_ptr ex) const
void respondToCloseOutputFiles()
void setOutputFileStatus(OutputFileStatus const ofs)