27 #include "cetlib/HorizontalRule.h" 28 #include "cetlib/LibraryManager.h" 29 #include "cetlib/bold_fontify.h" 30 #include "cetlib/container_algorithms.h" 31 #include "cetlib/detail/wrapLibraryManagerException.h" 32 #include "cetlib/ostream_handle.h" 37 #include "range/v3/action.hpp" 38 #include "range/v3/view.hpp" 58 std::vector<std::string>
59 sorted_module_labels(std::vector<WorkerInPath::ConfigInfo>
const& wcis)
61 auto to_label = [](
auto const& wci) {
62 return wci.moduleConfigInfo->modDescription.moduleLabel();
65 return wcis | views::transform(to_label) | to<std::vector>() |
66 ::ranges::actions::sort;
76 : outputCallbacks_{outputCallbacks}
89 auto const& trigger_path_specs = enabled_modules.trigger_path_specs();
91 std::set<std::string> recorded_path_name;
92 for (
auto const& [
path_spec, entries] : trigger_path_specs) {
100 for (
auto const& [label, action] : entries) {
102 auto const mci_p = cet::make_exempt_ptr(&mci);
103 worker_config_infos.emplace_back(mci_p, action);
106 std::move(worker_config_infos));
119 auto const& end_paths = enabled_modules.end_paths();
121 for (
auto const& [
path_spec, entries] : end_paths) {
128 for (
auto const& [label, action] : entries) {
131 auto const mci_p = cet::make_exempt_ptr(&mci);
136 if (
size(end_paths) > 1u) {
138 <<
"Multiple end paths have been combined into one end path,\n" 139 <<
"\"end_path\" since order is irrelevant.";
143 std::vector<PathSpec>
150 std::vector<std::string>
155 views::transform([](
auto const& spec) {
return spec.name; }) |
159 std::vector<std::string>
164 views::transform([](
auto const& spec) {
return to_string(spec); }) |
172 std::vector<std::string>
const& producing_services)
176 auto const nschedules =
199 for (
auto const& [
path_spec, worker_config_infos] :
203 sc,
path_spec, sorted_module_labels(worker_config_infos)};
205 pc, worker_config_infos, pinfo.workers(), task_group, resources);
230 if (!results_inserter)
241 trigger_results[sid] = std::move(results_inserter);
246 using namespace detail;
247 auto const graph_info_collection =
250 auto const module_graph =
252 auto const graph_filename =
253 procPS_.
get<
string>(
"services.scheduler.dataDependencyGraph", {});
254 if (!graph_filename.empty()) {
255 cet::ostream_handle osh{graph_filename};
257 cerr <<
"Generated data-dependency graph file: " << graph_filename
260 auto const& err = module_graph.second;
295 std::map<std::string, detail::ModuleConfigInfo>
299 std::map<std::string, detail::ModuleConfigInfo> result{};
301 for (
auto const& [module_label, key_and_type] : enabled_modules.
modules()) {
305 auto const lib_spec = module_pset.
get<
string>(
"module_type");
308 es <<
" ERROR: Module with label " << module_label <<
" of type " 310 <<
" but defined in code as a " <<
to_string(actual_mod_type)
323 result.try_emplace(module_label, std::move(mci));
326 es <<
" ERROR: Configuration of module with label " << module_label
327 <<
" encountered the following error:\n" 331 if (
auto err_msg = es.str(); not
empty(err_msg)) {
333 <<
"The following were encountered while processing the module " 344 vector<string> configErrMsgs;
345 for (
auto const& [module_label, mci] :
allModules_) {
346 auto const& modPS = mci.modPS;
347 auto const& md = mci.modDescription;
349 auto const module_threading_type = md.moduleThreadingType();
356 if (
auto err_msg = get_if<std::string>(&mod)) {
357 configErrMsgs.push_back(*err_msg);
361 assert(std::holds_alternative<ModuleBase*>(mod));
362 auto module = std::get<ModuleBase*>(mod);
366 modules.shared.emplace(module_label,
367 std::unique_ptr<ModuleBase>{
module});
371 replicated_modules[sid].reset(
module);
375 auto fill_replicated_module = [&,
this](
ScheduleID const sid) {
377 if (
auto mod_ptr = get_if<ModuleBase*>(&repl_mod)) {
378 replicated_modules[sid].reset(*mod_ptr);
381 schedule_iteration.for_each_schedule(fill_replicated_module);
382 modules.replicated.emplace(module_label, std::move(replicated_modules));
394 module->getConsumables());
397 if (!configErrMsgs.empty()) {
398 constexpr cet::HorizontalRule rule{100};
401 << rule(
'=') <<
"\n\n" 402 <<
"!! The following modules have been misconfigured: !!" <<
'\n';
403 for (
auto const& err : configErrMsgs) {
404 msg <<
'\n' << rule(
'-') <<
'\n' << err;
406 msg <<
'\n' << rule(
'=') <<
'\n';
420 lm_.getSymbolByLibspec(
module_type,
"make_module", module_factory_func);
423 cet::detail::wrapLibraryManagerException(
426 if (module_factory_func ==
nullptr) {
429 <<
" has internal symbol definition problems: consult an " 433 mod->setModuleDescription(md);
438 es <<
"\n\nModule label: " << cet::bold_fontify(md.moduleLabel())
439 <<
"\nmodule_type : " << cet::bold_fontify(md.moduleName()) <<
"\n\n" 444 std::unique_ptr<ReplicatedProducer>
448 if (pathsInfo.paths().empty()) {
455 "TriggerResultInserter",
460 auto producer = std::make_unique<TriggerResultInserter>(
461 trig_pset, scheduleID, pathsInfo.pathResults());
467 std::unique_ptr<Worker>
475 vector<WorkerInPath::ConfigInfo>
const& wci_list,
476 map<
string, std::shared_ptr<Worker>>& workers,
482 vector<WorkerInPath> wips;
483 for (
auto const& wci : wci_list) {
484 auto const& mci = *wci.moduleConfigInfo;
485 auto const filterAction = wci.filterAction;
486 auto const& module_label = mci.modDescription.moduleLabel();
488 auto const& md = mci.modDescription;
489 std::shared_ptr<Worker> worker{
nullptr};
492 if (
auto it = workers.find(module_label); it != workers.end()) {
494 <<
"Reusing worker " << hex << it->second << dec
495 <<
" path: " <<
to_string(
pi) <<
" type: " << md.moduleName()
496 <<
" label: " << module_label;
507 TDEBUG(5) <<
"Made worker " << hex << worker << dec <<
" (" << sid
508 <<
") path: " <<
to_string(
pi) <<
" type: " << md.moduleName()
509 <<
" label: " << module_label <<
'\n';
513 workers.emplace(module_label, worker);
515 cet::make_exempt_ptr(worker.get()), filterAction, pc, task_group);
520 std::shared_ptr<Worker>
523 auto get_module = [
this](std::string
const& module_label,
536 return module->makeWorker(wp);
544 lm_.getSymbolByLibspec(lib_spec,
"moduleType", mod_type_func);
547 cet::detail::wrapLibraryManagerException(
550 if (mod_type_func ==
nullptr) {
553 <<
" has internal symbol definition problems: consult an expert.";
555 return mod_type_func();
563 lm_.getSymbolByLibspec(
564 lib_spec,
"moduleThreadingType", mod_threading_type_func);
567 cet::detail::wrapLibraryManagerException(
570 if (mod_threading_type_func ==
nullptr) {
573 <<
" has internal symbol definition problems: consult an expert.";
575 return mod_threading_type_func();
579 using namespace detail;
583 std::vector<std::string>
const& producing_services)
586 auto& source_info = result[
"input_source"];
588 set<string> path_names;
590 path_names.insert(spec.name);
592 source_info.paths = path_names;
595 source_info.paths = {
"end_path"};
599 std::map<std::string, std::set<ProductInfo>> produced_products_per_module;
600 std::map<std::string, std::set<std::string>> viewable_products_per_module;
602 auto const& module_name = pd.moduleLabel();
603 produced_products_per_module[module_name].emplace(
605 pd.friendlyClassName(),
607 pd.productInstanceName(),
608 ProcessTag{pd.processName(), pd.processName()});
609 if (pd.supportsView()) {
613 viewable_products_per_module[module_name].insert(
614 pd.productInstanceName());
619 for (
auto const& service_name : producing_services) {
620 auto& graph_info = result[service_name];
623 auto found = produced_products_per_module.find(service_name);
624 if (found ==
cend(produced_products_per_module)) {
628 graph_info.produced_products = found->second;
634 produced_products_per_module,
635 viewable_products_per_module,
640 produced_products_per_module,
641 viewable_products_per_module,
649 string const& path_name,
651 std::map<std::string, std::set<ProductInfo>>
const& produced_products,
652 std::map<std::string, std::set<std::string>>
const& viewable_products,
655 auto const worker_config_begin =
cbegin(worker_configs);
657 for (
auto it = worker_config_begin,
end =
cend(worker_configs); it !=
end;
659 auto const& mci = *it->moduleConfigInfo;
660 auto const& module_name = mci.modDescription.moduleLabel();
661 auto& graph_info = info_collection[module_name];
662 graph_info.paths.insert(path_name);
663 graph_info.module_type = mci.moduleType;
665 auto found = produced_products.find(module_name);
666 if (found !=
cend(produced_products)) {
667 graph_info.produced_products = found->second;
670 auto const& consumables =
672 graph_info.consumed_products =
688 string const allowed_path_spec{R
"([\w\*\?]+)"}; 689 regex const regex{
"(\\w+:)?(!|exception@)?(" + allowed_path_spec +
697 for (
auto const& worker_config : worker_configs) {
698 auto const& mci = *worker_config.moduleConfigInfo;
699 auto const& module_name = mci.modDescription.moduleLabel();
700 auto const& ps = mci.modPS;
701 auto& graph_info = info_collection[module_name];
703 auto path_specs = ps.get<vector<string>>(
"SelectEvents", {});
717 assert(matches.size() == 5);
718 graph_info.select_events.insert(matches[3]);
consumables_t::mapped_type const & consumables(std::string const &module_label) const
void collectConsumes(std::string const &module_label, consumables_t::mapped_type const &consumables)
ProductDescriptions & productsToProduce_
void setProcessName(std::string const &)
static std::string end_path()
static ParameterSetID const & put(ParameterSet const &ps)
ModuleBase *(fhicl::ParameterSet const &, ProcessingFrame const &) ModuleMaker_t
decltype(auto) constexpr cend(T &&obj)
ADL-aware version of std::cend.
keytype_for_name_t const & modules() const noexcept
static ConsumesInfo * instance()
std::map< module_label_t, PerScheduleContainer< std::unique_ptr< ModuleBase > > > replicated
void fillModuleOnlyDeps_(std::string const &path_name, detail::configs_t const &worker_configs, std::map< std::string, std::set< ProductInfo >> const &produced_products, std::map< std::string, std::set< std::string >> const &viewable_products, detail::collection_map_t &info_collection) const
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
GlobalSignal< detail::SignalResponseType::FIFO, void(ModuleDescription const &)> sPreModuleConstruction
std::unique_ptr< Worker > releaseTriggerResultsInserter(ScheduleID)
std::string const & moduleLabel() const
std::map< module_name_t, ModuleGraphInfo > collection_map_t
std::vector< std::string > triggerPathNames_() const
PerScheduleContainer< std::unique_ptr< Worker > > triggerResultsWorkers_
static constexpr ScheduleID first()
art::detail::paths_to_modules_t protoTrigPathLabels_
void setTriggerPSet(fhicl::ParameterSet const &)
std::vector< BranchDescription > ProductDescriptions
ModuleType module_type(std::string const &full_key)
fhicl::ParameterSet const & triggerPSet() const
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostModuleConstruction
maybe_module_t makeModule_(fhicl::ParameterSet const &module_pset, ModuleDescription const &md, ScheduleID) const
std::shared_ptr< Worker > makeWorker_(ModuleDescription const &md, WorkerParams const &wp)
decltype(auto) constexpr end(T &&obj)
ADL-aware version of std::end.
decltype(auto) constexpr size(T &&obj)
ADL-aware version of std::size.
PerScheduleContainer< PathsInfo > triggerPathsInfo_
std::vector< WorkerInPath::ConfigInfo > configs_t
std::vector< PathSpec > path_specs(std::vector< std::string > const &path_spec_strs)
std::variant< ModuleBase *, std::string > maybe_module_t
ScheduleID::size_type nschedules() const
std::vector< std::string > prependedTriggerPathNames_() const
T get(std::string const &key) const
PerScheduleContainer< PathsInfo > const & endPathInfo()
PerScheduleContainer< PathsInfo > endPathInfo_
std::vector< WorkerInPath > fillWorkers_(PathContext const &pc, std::vector< WorkerInPath::ConfigInfo > const &wci_list, std::map< std::string, std::shared_ptr< Worker >> &workers, GlobalTaskGroup &task_group, detail::SharedResources &resources)
ModuleThreadingType loadModuleThreadingType_(std::string const &lib_spec) const
std::map< std::string, detail::ModuleConfigInfo > allModules_
#define TDEBUG_FUNC_SI(LEVEL, SI)
void print_module_graph(std::ostream &os, ModuleGraphInfoMap const &modInfos, ModuleGraph const &graph)
std::string const & getReleaseVersion()
ModulesByThreadingType makeModules_(ScheduleID::size_type n)
ModuleType( ModuleTypeFunc_t)
static auto end_path_spec()
void fillSelectEventsDeps_(detail::configs_t const &worker_configs, detail::collection_map_t &info_collection) const
ParameterSetID id() const
std::vector< PathSpec > triggerPathSpecs() const
ModuleThreadingType moduleThreadingType() const
std::map< std::string, detail::ModuleConfigInfo > moduleInformation_(detail::EnabledModules const &enabled_modules) const
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
UpdateOutputCallbacks & outputCallbacks_
ActionTable const & exceptActions_
char const * what() const noexcept override
PathID pathID() const noexcept
void remove_whitespace(std::string &str)
detail::collection_map_t getModuleGraphInfoCollection_(std::vector< std::string > const &producing_services)
std::map< module_label_t, std::unique_ptr< ModuleBase > > shared
art::detail::module_entries_for_ordered_path_t triggerPathSpecs_
constexpr T pi()
Returns the constant pi (up to 35 decimal digits of precision)
void setTriggerPathNames(std::vector< std::string > const &)
tbb::task_group & native_group()
PerScheduleContainer< PathsInfo > const & triggerPathsInfo()
std::set< ProductInfo > consumed_products_for_module(std::string const ¤t_process, ConsumesInfo::consumables_t::mapped_type const &consumables, std::map< std::string, std::set< ProductInfo >> const &produced_products, std::map< std::string, std::set< std::string >> const &viewable_products, config_const_iterator const config_begin, config_const_iterator const config_it)
decltype(auto) constexpr cbegin(T &&obj)
ADL-aware version of std::cbegin.
void createModulesAndWorkers(GlobalTaskGroup &task_group, detail::SharedResources &resources, std::vector< std::string > const &producing_services)
ModuleThreadingType( ModuleThreadingTypeFunc_t)
PathSpec path_spec(std::string const &path_spec)
bool is_observer(ModuleType const mt)
art::detail::configs_t protoEndPathLabels_
static Globals * instance()
ModulesByThreadingType modules_
ActivityRegistry const & actReg_
std::unique_ptr< ReplicatedProducer > makeTriggerResultsInserter_(ScheduleID scheduleID)
ModuleType loadModuleType_(std::string const &lib_spec) const
std::string to_string(ModuleType const mt)
fhicl::ParameterSet procPS_
void put(std::string const &key)
cet::coded_exception< error, detail::translate > exception
decltype(auto) constexpr empty(T &&obj)
ADL-aware version of std::empty.
std::pair< ModuleGraph, std::string > make_module_graph(ModuleGraphInfoMap const &modInfos, paths_to_modules_t const &trigger_paths, configs_t const &end_path)