LArSoft  v06_85_00
Liquid Argon Software toolkit - http://larsoft.org/
EventProcessor.cc
Go to the documentation of this file.
2 
31 #include "cetlib/container_algorithms.h"
34 #include "tbb/task_arena.h"
35 
36 #include <cassert>
37 #include <exception>
38 #include <iomanip>
39 #include <string>
40 #include <utility>
41 #include <vector>
42 
44 
45 namespace {
46 
48  void
49  setupAsDefaultEmptySource(ParameterSet& p)
50  {
51  p.put("module_type", "EmptyEvent");
52  p.put("module_label", "source");
53  p.put("maxEvents", 1);
54  }
55 
56  std::unique_ptr<art::InputSource>
57  makeInput(ParameterSet const& params,
58  std::string const& processName,
61  {
62  ParameterSet defaultEmptySource;
63  setupAsDefaultEmptySource(defaultEmptySource);
64  // find single source
65  bool sourceSpecified{false};
66  ParameterSet main_input{defaultEmptySource};
67  try {
68  if (!params.get_if_present("source", main_input)) {
69  mf::LogInfo("EventProcessorSourceConfig")
70  << "Could not find a source configuration: using default.";
71  }
72  // Fill in "ModuleDescription", in case the input source
73  // produces any EDproducts, which would be registered in the
74  // MasterProductRegistry. Also fill in the process history item
75  // for this process.
76  art::ModuleDescription const md{
77  main_input.id(),
78  main_input.get<std::string>("module_type"),
79  main_input.get<std::string>("module_label"),
81  processName, params.id(), art::getReleaseVersion()}};
82  sourceSpecified = true;
83  art::InputSourceDescription isd{md, preg, areg};
84  try {
85  auto source = art::InputSourceFactory::make(main_input, isd);
86  return source;
87  }
88  catch (fhicl::detail::validationException const& e) {
90  << "\n\nModule label: " << art::detail::bold_fontify(md.moduleLabel())
91  << "\nmodule_type : " << art::detail::bold_fontify(md.moduleName())
92  << "\n\n"
93  << e.what();
94  }
95  }
96  catch (art::Exception const& x) {
97  if (sourceSpecified == false &&
98  art::errors::Configuration == x.categoryCode()) {
99  throw art::Exception(art::errors::Configuration, "FailedInputSource")
100  << "Configuration of main input source has failed\n"
101  << x;
102  } else {
103  throw;
104  }
105  }
106  catch (cet::exception const& x) {
107  throw art::Exception(art::errors::Configuration, "FailedInputSource")
108  << "Configuration of main input source has failed\n"
109  << x;
110  }
111  return std::unique_ptr<art::InputSource>();
112  }
113 
114  inline std::string
115  spaces(unsigned const n)
116  {
117  return std::string(n, ' ');
118  }
119 }
120 
122  : act_table_{pset.get<ParameterSet>("services.scheduler")}
123  , actReg_()
126  , serviceDirector_{initServices_(pset, actReg_, serviceToken_)}
127  , handleEmptyRuns_{pset.get<bool>("services.scheduler.handleEmptyRuns", true)}
129  pset.get<bool>("services.scheduler.handleEmptySubRuns", true)}
130 {
133 
134  std::string const& processName{pset.get<std::string>("process_name")};
135  ProcessConfiguration const pc{processName, pset.id(), getReleaseVersion()};
137 
138  // Services
139  // System service FileCatalogMetadata needs to know about the process name.
141  ->addMetadataString("process_name", processName);
142 
143  input_ = makeInput(pset, processName, preg_, actReg_);
144  actReg_.sPostSourceConstruction.invoke(input_->moduleDescription());
145 
146  initSchedules_(pset);
148  std::make_unique<EndPathExecutor>(pathManager_, act_table_, actReg_, preg_);
151 
153  FDEBUG(2) << pset.to_string() << std::endl;
155 }
156 
158 {
159  // Services must stay usable until they go out of scope, meaning
160  // that modules may (say) use services in their destructors.
162 }
163 
166  ActivityRegistry& areg,
167  ServiceToken& token)
168 {
169  auto services = top_pset.get<ParameterSet>("services", {});
170 
171  // Save and non-standard service configs, "floating_point_control"
172  // to prevent ServiceDirector trying to make one itself.
173  auto const fpc_pset =
174  services.get<ParameterSet>("floating_point_control", {});
175  services.erase("floating_point_control");
176 
177  // Remove non-standard non-service config, "message."
178  services.erase("message");
179 
180  // Create the service director and all user-configured services.
181  ServiceDirector director{std::move(services), areg, token};
182 
183  // Services requiring special construction.
184  director.addSystemService<CurrentModule>(areg);
185  director.addSystemService<TriggerNamesService>(
186  top_pset, pathManager_.triggerPathNames());
187  director.addSystemService<FloatingPointControl>(fpc_pset, areg);
188  director.addSystemService<ScheduleContext>();
189  return director;
190 }
191 
192 void
194 {
195  // Initialize TBB with desired number of threads.
196  auto const num_threads = pset.get<int>("services.scheduler.num_threads");
197  tbbManager_.initialize(num_threads);
198  mf::LogInfo("MTdiagnostics")
199  << "TBB has been configured to use a maximum of "
200  << tbb::this_task_arena::max_concurrency() << " threads.";
201  schedule_ =
202  std::make_unique<Schedule>(ScheduleID::first(),
203  pathManager_,
204  pset,
206  preg_,
208  act_table_,
209  actReg_);
210 }
211 
212 void
214 {
215  // Need to convert multiple lists of workers into a long list that
216  // the postBeginJobWorkers callbacks can understand.
217  std::vector<Worker*> allWorkers;
218  allWorkers.reserve(
220  pathManager_.endPathInfo().workers().size());
221  auto workerStripper = [&allWorkers](WorkerMap::value_type const& val) {
222  allWorkers.emplace_back(val.second.get());
223  };
225  workerStripper);
226  cet::for_all(pathManager_.endPathInfo().workers(), workerStripper);
227  actReg_.sPostBeginJobWorkers.invoke(input_.get(), allWorkers);
228 }
229 
230 //================================================================
231 // Event-loop infrastructure
232 
233 template <art::Level L>
234 bool
236 {
239  // Consider reading right here?
240  }
241 
242  if (nextLevel_ == L) {
243  activeLevels_.push_back(nextLevel_);
245  if (endPathExecutor_->outputsToClose()) {
247  finalizeContainingLevels<L>();
249  }
250  return true;
251  } else if (nextLevel_ < L) {
252  return false;
253  } else if (nextLevel_ == highest_level()) {
254  return false;
255  }
256 
257  throw Exception{errors::LogicError} << "Incorrect level hierarchy.";
258 }
259 
260 namespace art {
261 
262  // Specializations for process function template
263 
264  template <>
265  inline void
266  EventProcessor::begin<Level::Job>()
267  {
268  timer_.start();
269  beginJob();
270  }
271 
272  template <>
273  inline void
274  EventProcessor::begin<Level::InputFile>()
275  {
276  openInputFile();
277  }
278 
279  template <>
280  void
281  EventProcessor::begin<Level::Run>()
282  {
283  finalizeRunEnabled_ = true;
284  readRun();
285  if (handleEmptyRuns_) {
286  beginRun();
287  }
288  }
289 
290  template <>
291  void
292  EventProcessor::begin<Level::SubRun>()
293  {
294  finalizeSubRunEnabled_ = true;
295  assert(runPrincipal_);
296  assert(runPrincipal_->id().isValid());
297  readSubRun();
298  if (handleEmptySubRuns_) {
300  beginSubRun();
301  }
302  }
303 
304  template <>
305  void
306  EventProcessor::finalize<Level::Event>()
307  {
308  assert(eventPrincipal_);
309  if (eventPrincipal_->id().isFlush())
310  return;
311 
313  writeEvent();
314  }
315 
316  template <>
317  void
318  EventProcessor::finalize<Level::SubRun>()
319  {
320  assert(subRunPrincipal_);
322  return;
323  if (subRunPrincipal_->id().isFlush())
324  return;
325 
328  if (beginSubRunCalled_) {
329  endSubRun();
330  }
331  writeSubRun();
332 
333  finalizeSubRunEnabled_ = false;
334  }
335 
336  template <>
337  void
338  EventProcessor::finalize<Level::Run>()
339  {
340  assert(runPrincipal_);
341  if (!finalizeRunEnabled_)
342  return;
343  if (runPrincipal_->id().isFlush())
344  return;
345 
348  if (beginRunCalled_) {
349  endRun();
350  }
351  writeRun();
352 
353  finalizeRunEnabled_ = false;
354  }
355 
356  template <>
357  void
358  EventProcessor::finalize<Level::InputFile>()
359  {
360  if (nextLevel_ == Level::Job) {
361  closeAllFiles();
362  } else {
363  closeInputFile();
364  }
365  }
366 
367  template <>
368  inline void
369  EventProcessor::finalize<Level::Job>()
370  {
371  endJob();
372  timer_.stop();
373  }
374 
375  template <>
376  inline void
377  EventProcessor::finalizeContainingLevels<Level::SubRun>()
378  {
379  finalize<Level::Run>();
380  }
381 
382  template <>
383  inline void
384  EventProcessor::finalizeContainingLevels<Level::Event>()
385  {
386  finalize<Level::SubRun>();
387  finalize<Level::Run>();
388  }
389 
390  template <>
391  inline void
392  EventProcessor::recordOutputModuleClosureRequests<Level::Run>()
393  {
394  endPathExecutor_->recordOutputClosureRequests(Granularity::Run);
395  }
396 
397  template <>
398  inline void
399  EventProcessor::recordOutputModuleClosureRequests<Level::SubRun>()
400  {
401  endPathExecutor_->recordOutputClosureRequests(Granularity::SubRun);
402  }
403 
404  template <>
405  inline void
406  EventProcessor::recordOutputModuleClosureRequests<Level::Event>()
407  {
408  endPathExecutor_->recordOutputClosureRequests(Granularity::Event);
409  }
410 
411  template <>
412  void
413  EventProcessor::process<most_deeply_nested_level()>()
414  {
415  if (shutdown_flag > 0 || !ec_.empty()) {
416  return;
417  }
418 
421  readEvent();
422 
423  assert(eventPrincipal_);
424  if (eventPrincipal_->id().isFlush())
425  return;
426  processEvent();
427 
428  if (shouldWeStop()) {
429  nextLevel_ = highest_level(); // FIXME: maybe go somewhere else?
430  }
431  finalize<most_deeply_nested_level()>();
432  recordOutputModuleClosureRequests<most_deeply_nested_level()>();
433  }
434 
435 } // namespace art
436 
437 template <art::Level L>
438 void
440 {
441  if (shutdown_flag > 0 || !ec_.empty()) {
442  return;
443  }
444 
445  ec_.call([this] { begin<L>(); });
446 
447  while (shutdown_flag == 0 && ec_.empty() &&
448  levelsToProcess<level_down(L)>()) {
449  ec_.call([this] {
450  process<level_down(L)>();
452  });
453  }
454  ec_.call([this] {
455  finalize<L>();
456  recordOutputModuleClosureRequests<L>();
457  });
458 }
459 
462 {
463  StatusCode returnCode{epSuccess};
464  // Make the services available
466 
467  ec_.call([this, &returnCode] {
468  process<highest_level()>();
469  if (art::shutdown_flag > 0) {
470  returnCode = epSignal;
471  }
472  });
473 
474  if (!ec_.empty()) {
476  ec_.rethrow();
477  }
478 
479  return returnCode;
480 }
481 
482 void
484 {
485  assert(!activeLevels_.empty());
486  activeLevels_.pop_back();
487 }
488 
491 {
492  auto const itemType = input_->nextItemType();
493  FDEBUG(1) << spaces(4) << "*** nextItemType: " << itemType << " ***\n";
494  switch (itemType) {
495  case input::IsStop:
496  return highest_level();
497  case input::IsFile:
498  return Level::InputFile;
499  case input::IsRun:
500  return Level::Run;
501  case input::IsSubRun:
502  return Level::SubRun;
503  case input::IsEvent:
504  return Level::Event;
505  case input::IsInvalid: {
507  << "Invalid next item type presented to the event processor.\n"
508  << "Please contact artists@fnal.gov.";
509  }
510  }
512  << "Unrecognized next item type presented to the event processor.\n"
513  << "Please contact artists@fnal.gov.";
514 }
515 
516 //=============================================
517 // Job level
518 
519 void
521 {
522  FDEBUG(1) << spaces(8) << "beginJob\n";
524  // make the services available
526  // NOTE: This implementation assumes 'Job' means one call the
527  // EventProcessor::run. If it really means once per 'application'
528  // then this code will have to be changed. Also have to deal with
529  // case where have 'run' then new Module added and do 'run' again.
530  // In that case the newly added Module needs its 'beginJob' to be
531  // called.
532  try {
533  input_->doBeginJob();
534  }
535  catch (cet::exception& e) {
536  mf::LogError("BeginJob") << "A cet::exception happened while processing"
537  " the beginJob of the 'source'\n";
538  e << "A cet::exception happened while processing"
539  " the beginJob of the 'source'\n";
540  throw;
541  }
542  catch (std::exception const&) {
543  mf::LogError("BeginJob") << "A std::exception happened while processing"
544  " the beginJob of the 'source'\n";
545  throw;
546  }
547  catch (...) {
548  mf::LogError("BeginJob") << "An unknown exception happened while"
549  " processing the beginJob of the 'source'\n";
550  throw;
551  }
552  schedule_->beginJob();
553  endPathExecutor_->beginJob();
554  actReg_.sPostBeginJob.invoke();
555 
557 }
558 
559 void
561 {
562  FDEBUG(1) << spaces(8) << "endJob\n";
563  // Make the services available
565  ec_.call([this] { schedule_->endJob(); });
566  ec_.call([this] { endPathExecutor_->endJob(); });
567  ec_.call([this] { input_->doEndJob(); });
568  ec_.call([this] { actReg_.sPostEndJob.invoke(); });
569  ec_.call([this] {
571  pathManager_,
573  timer_);
574  });
575 }
576 
577 //====================================================
578 // File level
579 
580 void
582 {
583  actReg_.sPreOpenFile.invoke();
584  FDEBUG(1) << spaces(8) << "openInputFile\n";
585  fb_ = input_->readFile();
586  if (!fb_) {
588  << "Source readFile() did not return a valid FileBlock: FileBlock "
589  << "should be valid or readFile() should throw.\n";
590  }
591  actReg_.sPostOpenFile.invoke(fb_->fileName());
593 }
594 
595 void
597 {
599  closeInputFile();
600 }
601 
602 void
604 {
605  endPathExecutor_->incrementInputFileNumber();
606  // Output-file closing on input-file boundaries are tricky since
607  // input files must outlive the output files, which often have data
608  // copied forward from the input files. That's why the
609  // recordOutputClosureRequests call is made here instead of in a
610  // specialization of recordOutputModuleClosureRequests<>.
611  endPathExecutor_->recordOutputClosureRequests(Granularity::InputFile);
612  if (endPathExecutor_->outputsToClose()) {
614  }
616  actReg_.sPreCloseFile.invoke();
617  input_->closeFile();
618  actReg_.sPostCloseFile.invoke();
619  FDEBUG(1) << spaces(8) << "closeInputFile\n";
620 }
621 
622 void
624 {
625  endPathExecutor_->openAllOutputFiles(*fb_);
626  FDEBUG(1) << spaces(8) << "openAllOutputFiles\n";
627 }
628 
629 void
631 {
632  if (!endPathExecutor_->someOutputsOpen())
633  return;
634 
636  endPathExecutor_->closeAllOutputFiles();
637  FDEBUG(1) << spaces(8) << "closeAllOutputFiles\n";
638 }
639 
640 void
642 {
643  if (!endPathExecutor_->outputsToOpen())
644  return;
645 
646  endPathExecutor_->openSomeOutputFiles(*fb_);
647  FDEBUG(1) << spaces(8) << "openSomeOutputFiles\n";
649 }
650 
651 void
653 {
654  endPathExecutor_->setOutputFileStatus(ofs);
655  FDEBUG(1) << spaces(8) << "setOutputFileStatus\n";
656 }
657 
658 void
660 {
661  // Precondition: there are SOME output files that have been
662  // flagged as needing to close. Otherwise,
663  // 'respondtoCloseOutputFiles' will be needlessly
664  // called.
665  assert(endPathExecutor_->outputsToClose());
667  endPathExecutor_->closeSomeOutputFiles();
668  FDEBUG(1) << spaces(8) << "closeSomeOutputFiles\n";
669 }
670 
671 void
673 {
674  schedule_->respondToOpenInputFile(*fb_);
675  endPathExecutor_->respondToOpenInputFile(*fb_);
676  FDEBUG(1) << spaces(8) << "respondToOpenInputFile\n";
677 }
678 
679 void
681 {
682  schedule_->respondToCloseInputFile(*fb_);
683  endPathExecutor_->respondToCloseInputFile(*fb_);
684  FDEBUG(1) << spaces(8) << "respondToCloseInputFile\n";
685 }
686 
687 void
689 {
690  schedule_->respondToOpenOutputFiles(*fb_);
691  endPathExecutor_->respondToOpenOutputFiles(*fb_);
692  FDEBUG(1) << spaces(8) << "respondToOpenOutputFiles\n";
693 }
694 
695 void
697 {
698  schedule_->respondToCloseOutputFiles(*fb_);
699  endPathExecutor_->respondToCloseOutputFiles(*fb_);
700  FDEBUG(1) << spaces(8) << "respondToCloseOutputFiles\n";
701 }
702 
703 //=============================================
704 // Run level
705 
706 void
708 {
709  {
710  actReg_.sPreSourceRun.invoke();
711  runPrincipal_ = input_->readRun();
712  // Seeding the RangeSet is necessary here in case
713  // 'sPostReadRun.invoke()' throws.
714  endPathExecutor_->seedRunRangeSet(input_->runRangeSetHandler());
715  assert(runPrincipal_);
717  }
718  runPrincipal_->setProducedProducts(producedProducts_.get(InRun));
719  Run const r{
721  actReg_.sPostSourceRun.invoke(r);
722  FDEBUG(1) << spaces(8) << "readRun.....................("
723  << runPrincipal_->id() << ")\n";
724 }
725 
726 void
728 {
729  assert(runPrincipal_);
730  RunID const r{runPrincipal_->id()};
731  if (r.isFlush())
732  return;
733 
734  finalizeRunEnabled_ = true;
735  process_<Begin<Level::Run>>(*runPrincipal_);
736  FDEBUG(1) << spaces(8) << "beginRun....................(" << r << ")\n";
737  beginRunCalled_ = true;
738 }
739 
740 void
742 {
743  if (!beginRunCalled_) {
744  beginRun();
745  }
746 }
747 
748 void
750 {
751  assert(runPrincipal_);
752  endPathExecutor_->setAuxiliaryRangeSetID(*runPrincipal_);
753  FDEBUG(1) << spaces(8) << "setRunAuxiliaryRangeSetID...("
754  << runPrincipal_->id() << ")\n";
755 }
756 
757 void
759 {
760  assert(runPrincipal_);
761  // Precondition: The RunID does not correspond to a flush ID. --
762  // N.B. The flush flag is not explicitly checked here since endRun
763  // is only called from finalizeRun, which is where the check
764  // happens.
765  RunID const run{runPrincipal_->id()};
766  assert(!run.isFlush());
767  process_<End<Level::Run>>(*runPrincipal_);
768  FDEBUG(1) << spaces(8) << "endRun......................(" << run << ")\n";
769  beginRunCalled_ = false;
770 }
771 
772 void
774 {
775  assert(runPrincipal_);
776  // Precondition: The RunID does not correspond to a flush ID.
777  RunID const r{runPrincipal_->id()};
778  assert(!r.isFlush());
779  endPathExecutor_->writeRun(*runPrincipal_);
780  FDEBUG(1) << spaces(8) << "writeRun....................(" << r << ")\n";
781 }
782 
783 //=============================================
784 // SubRun level
785 
786 void
788 {
789  {
790  actReg_.sPreSourceSubRun.invoke();
791  subRunPrincipal_ = input_->readSubRun(runPrincipal_.get());
792  // Seeding the RangeSet is necessary here in case
793  // 'sPostSubRun.invoke()' throws.
794  endPathExecutor_->seedSubRunRangeSet(input_->subRunRangeSetHandler());
795  assert(subRunPrincipal_);
797  }
798  subRunPrincipal_->setProducedProducts(producedProducts_.get(InSubRun));
799  SubRun const sr{
801  actReg_.sPostSourceSubRun.invoke(sr);
802  FDEBUG(1) << spaces(8) << "readSubRun..................("
803  << subRunPrincipal_->id() << ")\n";
804 }
805 
806 void
808 {
809  assert(subRunPrincipal_);
810  SubRunID const sr{subRunPrincipal_->id()};
811  if (sr.isFlush())
812  return;
813 
814  finalizeSubRunEnabled_ = true;
815  process_<Begin<Level::SubRun>>(*subRunPrincipal_);
816  FDEBUG(1) << spaces(8) << "beginSubRun.................(" << sr << ")\n";
817  beginSubRunCalled_ = true;
818 }
819 
820 void
822 {
823  if (!beginSubRunCalled_) {
824  beginSubRun();
825  }
826 }
827 
828 void
830 {
831  assert(subRunPrincipal_);
832  endPathExecutor_->setAuxiliaryRangeSetID(*subRunPrincipal_);
833  FDEBUG(1) << spaces(8) << "setSubRunAuxiliaryRangeSetID("
834  << subRunPrincipal_->id() << ")\n";
835 }
836 
837 void
839 {
840  assert(subRunPrincipal_);
841  // Precondition: The SubRunID does not correspond to a flush ID.
842  // Note: the flush flag is not explicitly checked here since
843  // endSubRun is only called from finalizeSubRun, which is where the
844  // check happens.
845  SubRunID const sr{subRunPrincipal_->id()};
846  assert(!sr.isFlush());
847  process_<End<Level::SubRun>>(*subRunPrincipal_);
848  FDEBUG(1) << spaces(8) << "endSubRun...................(" << sr << ")\n";
849  beginSubRunCalled_ = false;
850 }
851 
852 void
854 {
855  assert(subRunPrincipal_);
856  // Precondition: The SubRunID does not correspond to a flush ID.
857  SubRunID const& sr{subRunPrincipal_->id()};
858  assert(!sr.isFlush());
859  endPathExecutor_->writeSubRun(*subRunPrincipal_);
860  FDEBUG(1) << spaces(8) << "writeSubRun.................(" << sr << ")\n";
861 }
862 
863 //=============================================
864 // Event level
865 
866 void
868 {
869  assert(subRunPrincipal_);
870  assert(subRunPrincipal_->id().isValid());
871  {
872  actReg_.sPreSourceEvent.invoke();
873  eventPrincipal_ = input_->readEvent(subRunPrincipal_.get());
874  assert(eventPrincipal_);
876  }
877  eventPrincipal_->setProducedProducts(producedProducts_.get(InEvent));
878  Event const e{
880  actReg_.sPostSourceEvent.invoke(e);
881  FDEBUG(1) << spaces(8) << "readEvent...................("
882  << eventPrincipal_->id() << ")\n";
883 }
884 
885 void
887 {
888  assert(eventPrincipal_);
889  EventID const& id{eventPrincipal_->id()};
890  // Precondition: The EventID does not correspond to a flush ID.
891  assert(!id.isFlush());
892  process_<Do<Level::Event>>(*eventPrincipal_);
893  FDEBUG(1) << spaces(8) << "processEvent................(" << id << ")\n";
894 }
895 
896 void
898 {
899  assert(eventPrincipal_);
900  EventID const& id{eventPrincipal_->id()};
901  // Precondition: The EventID does not correspond to a flush ID.
902  assert(!id.isFlush());
903  endPathExecutor_->writeEvent(*eventPrincipal_);
904  FDEBUG(1) << spaces(8) << "writeEvent..................(" << id << ")\n";
905  eventPrincipal_.reset();
906 }
907 
908 bool
910 {
911  FDEBUG(1) << spaces(8) << "shouldWeStop\n";
912  if (shouldWeStop_) {
913  return true;
914  }
915  return endPathExecutor_->terminate();
916 }
917 
918 void
920 {
921  servicesSentry_ = std::make_unique<ServiceRegistry::Operate>(st);
922 }
923 
924 void
926 {
927  servicesSentry_.reset();
928 }
929 
930 void
932  if (ServiceRegistry::isAvailable<RandomNumberGenerator>()) {
934  ->saveToFile_();
935  }
936 }
937 catch (...) {
938 }
std::string bold_fontify(std::string const &s)
Definition: bold_fontify.h:9
Float_t x
Definition: compare.C:6
StatusCode runToCompletion()
static std::unique_ptr< InputSource > make(fhicl::ParameterSet const &, InputSourceDescription &)
std::unique_ptr< EndPathExecutor > endPathExecutor_
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
bool const handleEmptyRuns_
std::atomic< int > shutdown_flag
std::unique_ptr< Schedule > schedule_
ProducingServiceSignals psSignals_
std::unique_ptr< InputSource > input_
static cet::exempt_ptr< Consumer > non_module_context()
Definition: Consumer.cc:76
Level
Definition: Level.h:12
static ScheduleID first()
Definition: ScheduleID.h:82
bool const handleEmptySubRuns_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
MasterProductRegistry preg_
void setOutputFileStatus(OutputFileStatus)
Definition: Run.h:30
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRunPrincipal &)> sPostReadSubRun
void servicesActivate_(ServiceToken st)
ServiceToken serviceToken_
std::unique_ptr< SubRunPrincipal > subRunPrincipal_
ProcessConfigurationID id() const
cet::cpu_timer timer_
std::unique_ptr< ServiceRegistry::Operate > servicesSentry_
void registerProducts(MasterProductRegistry &mpr, ProductDescriptions &productsToProduce, ProducingServiceSignals &signals, ProcessConfiguration const &pc)
Definition: ServiceToken.h:67
OutputFileStatus
WorkerMap const & workers() const
Definition: PathsInfo.h:81
ServiceDirector serviceDirector_
void beginJob()
Definition: Breakpoints.cc:14
vstring const & triggerPathNames() const
Definition: PathManager.h:91
void writeSummary(PathManager &pm, bool wantSummary, cet::cpu_timer const &timer)
Definition: writeSummary.cc:55
T get(std::string const &key) const
Definition: ParameterSet.h:231
PathManager pathManager_
MFStatusUpdater mfStatusUpdater_
tbb::task_scheduler_init tbbManager_
bool get_if_present(std::string const &key, T &value) const
Definition: ParameterSet.h:208
std::string const & getReleaseVersion()
PathsInfo & endPathInfo()
Definition: PathManager.cc:132
PathsInfo & triggerPathsInfo(ScheduleID sID)
Definition: PathManager.cc:143
void initSchedules_(fhicl::ParameterSet const &pset)
ParameterSetID id() const
ModuleDescriptionID id() const
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
char const * what() const noexcept override
auto const & get(BranchType const bt) const
Definition: ProductTables.h:42
std::unique_ptr< RunPrincipal > runPrincipal_
ProductDescriptions productsToProduce_
std::unique_ptr< FileBlock > fb_
ProductTables producedProducts_
ActionTable act_table_
bool shouldWeStop() const
HLT enums.
bool erase(std::string const &key)
Char_t n[5]
ActivityRegistry actReg_
constexpr auto highest_level() noexcept
Definition: Level.h:39
#define FDEBUG(lev)
Definition: DebugMacros.h:26
static void create_instance(MasterProductRegistry const &mpr)
std::unique_ptr< EventPrincipal > eventPrincipal_
std::vector< Level > activeLevels_
void beginSubRunIfNotDoneAlready()
detail::ExceptionCollector ec_
Float_t e
Definition: plot.C:34
ServiceDirector initServices_(fhicl::ParameterSet const &top_pset, ActivityRegistry &areg, ServiceToken &token)
void put(std::string const &key)
EventProcessor(EventProcessor const &)=delete
void setSubRunAuxiliaryRangeSetID()
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
GlobalSignal< detail::SignalResponseType::LIFO, void(EventPrincipal &)> sPostReadEvent
GlobalSignal< detail::SignalResponseType::LIFO, void(RunPrincipal &)> sPostReadRun