31 #include "cetlib/container_algorithms.h" 34 #include "tbb/task_arena.h" 51 p.
put(
"module_type",
"EmptyEvent");
52 p.
put(
"module_label",
"source");
53 p.
put(
"maxEvents", 1);
56 std::unique_ptr<art::InputSource>
58 std::string
const& processName,
63 setupAsDefaultEmptySource(defaultEmptySource);
65 bool sourceSpecified{
false};
70 <<
"Could not find a source configuration: using default.";
78 main_input.get<std::string>(
"module_type"),
79 main_input.get<std::string>(
"module_label"),
82 sourceSpecified =
true;
97 if (sourceSpecified ==
false &&
100 <<
"Configuration of main input source has failed\n" 108 <<
"Configuration of main input source has failed\n" 111 return std::unique_ptr<art::InputSource>();
115 spaces(
unsigned const n)
117 return std::string(n,
' ');
127 ,
handleEmptyRuns_{pset.get<
bool>(
"services.scheduler.handleEmptyRuns",
true)}
129 pset.get<
bool>(
"services.scheduler.handleEmptySubRuns",
true)}
134 std::string
const& processName{pset.get<std::string>(
"process_name")};
141 ->addMetadataString(
"process_name", processName);
143 input_ = makeInput(pset, processName,
preg_, actReg_);
144 actReg_.sPostSourceConstruction.invoke(
input_->moduleDescription());
153 FDEBUG(2) << pset.to_string() << std::endl;
173 auto const fpc_pset =
175 services.
erase(
"floating_point_control");
178 services.erase(
"message");
196 auto const num_threads = pset.
get<
int>(
"services.scheduler.num_threads");
199 <<
"TBB has been configured to use a maximum of " 200 << tbb::this_task_arena::max_concurrency() <<
" threads.";
217 std::vector<Worker*> allWorkers;
221 auto workerStripper = [&allWorkers](WorkerMap::value_type
const& val) {
222 allWorkers.emplace_back(val.second.get());
227 actReg_.sPostBeginJobWorkers.invoke(
input_.get(), allWorkers);
233 template <art::Level L>
247 finalizeContainingLevels<L>();
266 EventProcessor::begin<Level::Job>()
274 EventProcessor::begin<Level::InputFile>()
281 EventProcessor::begin<Level::Run>()
292 EventProcessor::begin<Level::SubRun>()
306 EventProcessor::finalize<Level::Event>()
318 EventProcessor::finalize<Level::SubRun>()
338 EventProcessor::finalize<Level::Run>()
358 EventProcessor::finalize<Level::InputFile>()
369 EventProcessor::finalize<Level::Job>()
377 EventProcessor::finalizeContainingLevels<Level::SubRun>()
379 finalize<Level::Run>();
384 EventProcessor::finalizeContainingLevels<Level::Event>()
386 finalize<Level::SubRun>();
387 finalize<Level::Run>();
392 EventProcessor::recordOutputModuleClosureRequests<Level::Run>()
399 EventProcessor::recordOutputModuleClosureRequests<Level::SubRun>()
406 EventProcessor::recordOutputModuleClosureRequests<Level::Event>()
413 EventProcessor::process<most_deeply_nested_level()>()
431 finalize<most_deeply_nested_level()>();
432 recordOutputModuleClosureRequests<most_deeply_nested_level()>();
437 template <art::Level L>
445 ec_.
call([
this] { begin<L>(); });
448 levelsToProcess<level_down(L)>()) {
450 process<level_down(L)>();
456 recordOutputModuleClosureRequests<L>();
468 process<highest_level()>();
492 auto const itemType =
input_->nextItemType();
493 FDEBUG(1) << spaces(4) <<
"*** nextItemType: " << itemType <<
" ***\n";
507 <<
"Invalid next item type presented to the event processor.\n" 508 <<
"Please contact artists@fnal.gov.";
512 <<
"Unrecognized next item type presented to the event processor.\n" 513 <<
"Please contact artists@fnal.gov.";
522 FDEBUG(1) << spaces(8) <<
"beginJob\n";
536 mf::LogError(
"BeginJob") <<
"A cet::exception happened while processing" 537 " the beginJob of the 'source'\n";
538 e <<
"A cet::exception happened while processing" 539 " the beginJob of the 'source'\n";
543 mf::LogError(
"BeginJob") <<
"A std::exception happened while processing" 544 " the beginJob of the 'source'\n";
548 mf::LogError(
"BeginJob") <<
"An unknown exception happened while" 549 " processing the beginJob of the 'source'\n";
554 actReg_.sPostBeginJob.invoke();
562 FDEBUG(1) << spaces(8) <<
"endJob\n";
568 ec_.
call([
this] { actReg_.sPostEndJob.invoke(); });
583 actReg_.sPreOpenFile.invoke();
584 FDEBUG(1) << spaces(8) <<
"openInputFile\n";
588 <<
"Source readFile() did not return a valid FileBlock: FileBlock " 589 <<
"should be valid or readFile() should throw.\n";
591 actReg_.sPostOpenFile.invoke(
fb_->fileName());
616 actReg_.sPreCloseFile.invoke();
618 actReg_.sPostCloseFile.invoke();
619 FDEBUG(1) << spaces(8) <<
"closeInputFile\n";
626 FDEBUG(1) << spaces(8) <<
"openAllOutputFiles\n";
637 FDEBUG(1) << spaces(8) <<
"closeAllOutputFiles\n";
647 FDEBUG(1) << spaces(8) <<
"openSomeOutputFiles\n";
655 FDEBUG(1) << spaces(8) <<
"setOutputFileStatus\n";
668 FDEBUG(1) << spaces(8) <<
"closeSomeOutputFiles\n";
676 FDEBUG(1) << spaces(8) <<
"respondToOpenInputFile\n";
684 FDEBUG(1) << spaces(8) <<
"respondToCloseInputFile\n";
692 FDEBUG(1) << spaces(8) <<
"respondToOpenOutputFiles\n";
700 FDEBUG(1) << spaces(8) <<
"respondToCloseOutputFiles\n";
710 actReg_.sPreSourceRun.invoke();
721 actReg_.sPostSourceRun.invoke(r);
722 FDEBUG(1) << spaces(8) <<
"readRun.....................(" 735 process_<Begin<Level::Run>>(*runPrincipal_);
736 FDEBUG(1) << spaces(8) <<
"beginRun....................(" << r <<
")\n";
753 FDEBUG(1) << spaces(8) <<
"setRunAuxiliaryRangeSetID...(" 766 assert(!run.isFlush());
767 process_<End<Level::Run>>(*runPrincipal_);
768 FDEBUG(1) << spaces(8) <<
"endRun......................(" << run <<
")\n";
778 assert(!r.isFlush());
780 FDEBUG(1) << spaces(8) <<
"writeRun....................(" << r <<
")\n";
790 actReg_.sPreSourceSubRun.invoke();
801 actReg_.sPostSourceSubRun.invoke(sr);
802 FDEBUG(1) << spaces(8) <<
"readSubRun..................(" 815 process_<Begin<Level::SubRun>>(*subRunPrincipal_);
816 FDEBUG(1) << spaces(8) <<
"beginSubRun.................(" << sr <<
")\n";
833 FDEBUG(1) << spaces(8) <<
"setSubRunAuxiliaryRangeSetID(" 846 assert(!sr.isFlush());
847 process_<End<Level::SubRun>>(*subRunPrincipal_);
848 FDEBUG(1) << spaces(8) <<
"endSubRun...................(" << sr <<
")\n";
858 assert(!sr.isFlush());
860 FDEBUG(1) << spaces(8) <<
"writeSubRun.................(" << sr <<
")\n";
872 actReg_.sPreSourceEvent.invoke();
880 actReg_.sPostSourceEvent.invoke(e);
881 FDEBUG(1) << spaces(8) <<
"readEvent...................(" 891 assert(!
id.isFlush());
892 process_<Do<Level::Event>>(*eventPrincipal_);
893 FDEBUG(1) << spaces(8) <<
"processEvent................(" <<
id <<
")\n";
902 assert(!
id.isFlush());
904 FDEBUG(1) << spaces(8) <<
"writeEvent..................(" <<
id <<
")\n";
911 FDEBUG(1) << spaces(8) <<
"shouldWeStop\n";
932 if (ServiceRegistry::isAvailable<RandomNumberGenerator>()) {
std::string bold_fontify(std::string const &s)
bool empty() const noexcept
StatusCode runToCompletion()
void respondToOpenOutputFiles()
std::unique_ptr< EndPathExecutor > endPathExecutor_
void respondToCloseInputFile()
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
void openSomeOutputFiles()
bool const handleEmptyRuns_
std::atomic< int > shutdown_flag
std::unique_ptr< Schedule > schedule_
ProducingServiceSignals psSignals_
void invokePostBeginJobWorkers_()
std::unique_ptr< InputSource > input_
static cet::exempt_ptr< Consumer > non_module_context()
static ScheduleID first()
bool const handleEmptySubRuns_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
MasterProductRegistry preg_
void setOutputFileStatus(OutputFileStatus)
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRunPrincipal &)> sPostReadSubRun
void servicesActivate_(ServiceToken st)
ServiceToken serviceToken_
std::unique_ptr< SubRunPrincipal > subRunPrincipal_
ProcessConfigurationID id() const
std::unique_ptr< ServiceRegistry::Operate > servicesSentry_
void registerProducts(MasterProductRegistry &mpr, ProductDescriptions &productsToProduce, ProducingServiceSignals &signals, ProcessConfiguration const &pc)
WorkerMap const & workers() const
ServiceDirector serviceDirector_
vstring const & triggerPathNames() const
void writeSummary(PathManager &pm, bool wantSummary, cet::cpu_timer const &timer)
T get(std::string const &key) const
MFStatusUpdater mfStatusUpdater_
tbb::task_scheduler_init tbbManager_
void finalizeForProcessing()
bool get_if_present(std::string const &key, T &value) const
std::string const & getReleaseVersion()
PathsInfo & endPathInfo()
void closeSomeOutputFiles()
PathsInfo & triggerPathsInfo(ScheduleID sID)
void initSchedules_(fhicl::ParameterSet const &pset)
ParameterSetID id() const
ModuleDescriptionID id() const
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
char const * what() const noexcept override
auto const & get(BranchType const bt) const
std::unique_ptr< RunPrincipal > runPrincipal_
void markLevelAsProcessed()
void setRunAuxiliaryRangeSetID()
ProductDescriptions productsToProduce_
void beginRunIfNotDoneAlready()
std::unique_ptr< FileBlock > fb_
ProductTables producedProducts_
bool shouldWeStop() const
bool finalizeSubRunEnabled_
bool erase(std::string const &key)
constexpr auto highest_level() noexcept
void closeAllOutputFiles()
void servicesDeactivate_()
std::unique_ptr< EventPrincipal > eventPrincipal_
std::vector< Level > activeLevels_
void beginSubRunIfNotDoneAlready()
detail::ExceptionCollector ec_
ServiceDirector initServices_(fhicl::ParameterSet const &top_pset, ActivityRegistry &areg, ServiceToken &token)
void respondToOpenInputFile()
void put(std::string const &key)
EventProcessor(EventProcessor const &)=delete
void setSubRunAuxiliaryRangeSetID()
void openAllOutputFiles()
cet::coded_exception< error, detail::translate > exception
GlobalSignal< detail::SignalResponseType::LIFO, void(EventPrincipal &)> sPostReadEvent
void terminateAbnormally_()
GlobalSignal< detail::SignalResponseType::LIFO, void(RunPrincipal &)> sPostReadRun
void respondToCloseOutputFiles()