LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
EventProcessor.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
30 #include "art/Utilities/Globals.h"
41 #include "cetlib/bold_fontify.h"
42 #include "cetlib/container_algorithms.h"
43 #include "cetlib/trim.h"
44 #include "fhiclcpp/ParameterSet.h"
46 #include "hep_concurrency/WaitingTask.h"
48 
49 #include <cassert>
50 #include <exception>
51 #include <functional>
52 #include <iostream>
53 #include <memory>
54 #include <string>
55 #include <utility>
56 #include <vector>
57 
58 using namespace hep::concurrency;
59 using namespace std;
60 using namespace string_literals;
62 
63 namespace art {
64 
65  namespace {
66  ServicesManager*
67  create_services_manager(ParameterSet services_pset,
68  ActivityRegistry& actReg,
69  detail::SharedResources& resources)
70  {
71  auto const fpcPSet =
72  services_pset.get<ParameterSet>("FloatingPointControl", {});
73  services_pset.erase("FloatingPointControl");
74  services_pset.erase("message");
75  services_pset.erase("scheduler");
76  auto mgr =
77  new ServicesManager{std::move(services_pset), actReg, resources};
78  mgr->addSystemService<FloatingPointControl>(fpcPSet, actReg);
79  return mgr;
80  }
81 
82  auto const invalid_module_context = ModuleContext::invalid();
83  }
84 
85  EventProcessor::~EventProcessor() = default;
86 
87  EventProcessor::EventProcessor(ParameterSet pset,
88  detail::EnabledModules enabled_modules)
89  : scheduler_{pset.get<ParameterSet>("services.scheduler")}
90  , scheduleIteration_{scheduler_->num_schedules()}
91  , servicesManager_{create_services_manager(
92  pset.get<ParameterSet>("services"),
93  actReg_,
95  , pathManager_{pset,
98  scheduler_->actionTable(),
99  actReg_,
100  std::move(enabled_modules)}
101  , handleEmptyRuns_{scheduler_->handleEmptyRuns()}
102  , handleEmptySubRuns_{scheduler_->handleEmptySubRuns()}
103  {
104  auto services_pset = pset.get<ParameterSet>("services");
105  auto const scheduler_pset = services_pset.get<ParameterSet>("scheduler");
106  {
107  // FIXME: Signals and threads require more effort than this! A
108  // signal is delivered to only one thread, and which
109  // thread is left up to the implementation to decide. To
110  // get control we must block all signals in the main
111  // thread, create a new thread which will handle the
112  // signals we want to handle, unblock the signals in that
113  // thread only, and have it use sigwaitinfo() to suspend
114  // itselt and wait for those signals.
115  setupSignals(scheduler_pset.get<bool>("enableSigInt", true));
116  }
120  // We do this late because the floating point control word, signal
121  // masks, etc., are per-thread and inherited from the master
122  // thread, so we want to allow system services, user services, and
123  // modules to configure these things in their constructors before
124  // we let tbb create any threads. This means they cannot use tbb
125  // in their constructors, instead they must use the beginJob
126  // callout.
127  taskGroup_ = scheduler_->global_task_group();
128  // Whenever we are ready to enable ROOT's implicit MT, which is
129  // equivalent to its use of TBB, the call should be made after our
130  // own TBB task manager has been initialized.
131  // ROOT::EnableImplicitMT();
132  TDEBUG_FUNC(5) << "nschedules: " << scheduler_->num_schedules()
133  << " nthreads: " << scheduler_->num_threads();
134 
135  auto const errorOnMissingConsumes = scheduler_->errorOnMissingConsumes();
136  ConsumesInfo::instance()->setRequireConsumes(errorOnMissingConsumes);
137 
138  auto const& processName = Globals::instance()->processName();
139 
140  // Trigger-names
141  servicesManager_->addSystemService<TriggerNamesService>(
143  pset.get<ParameterSet>("physics", {}));
144 
145  // We have delayed creating the service instances, now actually
146  // create them.
147  servicesManager_->forceCreation();
148  ServiceHandle<FileCatalogMetadata>()->addMetadataString("art.process_name",
149  processName);
150 
151  // Now that the service module instances have been created we can
152  // set the callbacks, set the module description, and register the
153  // products for each service module instance.
154  ProcessConfiguration const pc{processName, pset.id(), getReleaseVersion()};
155  auto const producing_services = servicesManager_->registerProducts(
157  pathManager_->createModulesAndWorkers(
158  *taskGroup_, sharedResources_, producing_services);
159 
160  ServiceHandle<TriggerNamesService> trigger_names [[maybe_unused]];
161  auto const end = Globals::instance()->nschedules();
162  for (ScheduleID::size_type i = 0; i != end; ++i) {
163  ScheduleID const sid{i};
164 
165  // The ordering of the path results in the TriggerPathsInfo (which is used
166  // for the TriggerResults object), must be the same as that provided by
167  // the TriggerNamesService.
168  auto& trigger_paths_info [[maybe_unused]] =
169  pathManager_->triggerPathsInfo(sid);
170  assert(trigger_names->getTrigPaths() == trigger_paths_info.pathNames());
171 
172  schedules_->emplace(std::piecewise_construct,
173  std::forward_as_tuple(sid),
174  std::forward_as_tuple(sid,
175  pathManager_,
176  scheduler_->actionTable(),
177  actReg_,
179  *taskGroup_));
180  }
181  sharedResources_.freeze(taskGroup_->native_group());
182 
183  FDEBUG(2) << pset.to_string() << endl;
184  // The input source must be created after the end path executor
185  // because the end path executor registers a callback that must
186  // be invoked after the first input file is opened.
187  {
188  ParameterSet main_input;
189  main_input.put("module_type", "EmptyEvent");
190  main_input.put("module_label", "source");
191  main_input.put("maxEvents", -1);
192  if (!pset.get_if_present("source", main_input)) {
193  mf::LogInfo("EventProcessorSourceConfig")
194  << "Could not find a source configuration: using default.";
195  }
196  ModuleDescription const md{
197  main_input.id(),
198  main_input.get<string>("module_type"),
199  main_input.get<string>("module_label"),
201  ProcessConfiguration{processName, pset.id(), getReleaseVersion()}};
203  try {
204  input_.reset(InputSourceFactory::make(main_input, isd).release());
205  }
206  catch (fhicl::detail::validationException const& e) {
208  << "\n\nModule label: " << cet::bold_fontify(md.moduleLabel())
209  << "\nmodule_type : " << cet::bold_fontify(md.moduleName()) << "\n\n"
210  << e.what();
211  }
212  catch (Exception const& x) {
213  if (x.categoryCode() == errors::Configuration) {
214  throw Exception(errors::Configuration, "FailedInputSource")
215  << "Configuration of main input source has failed\n"
216  << x;
217  }
218  throw;
219  }
220  catch (cet::exception const& x) {
221  throw Exception(errors::Configuration, "FailedInputSource")
222  << "Configuration of main input source has failed\n"
223  << x;
224  }
225  catch (...) {
226  throw;
227  }
228  }
229  actReg_.sPostSourceConstruction.invoke(input_->moduleDescription());
230  // Create product tables used for product retrieval within modules.
233  }
234 
235  void
237  {
238  using cet::transform_all;
239  // Need to convert multiple lists of workers into a long list that
240  // the postBeginJobWorkers callbacks can understand.
241  vector<Worker*> allWorkers;
242  transform_all(pathManager_->triggerPathsInfo(ScheduleID::first()).workers(),
243  back_inserter(allWorkers),
244  [](auto const& pr) { return pr.second.get(); });
245  transform_all(pathManager_->endPathInfo(ScheduleID::first()).workers(),
246  back_inserter(allWorkers),
247  [](auto const& pr) { return pr.second.get(); });
248  actReg_.sPostBeginJobWorkers.invoke(input_, allWorkers);
249  }
250 
251  //================================================================
252  // Event-loop infrastructure
253 
254  template <Level L>
255  bool
257  {
258  if (nextLevel_.load() == Level::ReadyToAdvance) {
260  // Consider reading right here?
261  }
262  if (nextLevel_.load() == L) {
264  if (main_schedule().outputsToClose()) {
266  finalizeContainingLevels<L>();
268  }
269  return true;
270  }
271  if (nextLevel_.load() < L) {
272  return false;
273  }
274  if (nextLevel_.load() == highest_level()) {
275  return false;
276  }
277  throw Exception{errors::LogicError} << "Incorrect level hierarchy.\n"
278  << " Current level: " << L
279  << " Next level: " << nextLevel_;
280  }
281 
282  // Specializations for process function template
283 
284  template <>
285  inline void
286  EventProcessor::begin<Level::Job>()
287  {
288  timer_->start();
289  beginJob();
290  }
291 
292  template <>
293  inline void
294  EventProcessor::begin<Level::InputFile>()
295  {
296  openInputFile();
297  }
298 
299  template <>
300  void
301  EventProcessor::begin<Level::Run>()
302  {
303  readRun();
304 
305  // We only enable run finalization if reading was successful.
306  // This appears to be a design weakness.
307  finalizeRunEnabled_ = true;
308  if (handleEmptyRuns_) {
309  beginRun();
310  }
311  }
312 
313  template <>
314  void
315  EventProcessor::begin<Level::SubRun>()
316  {
317  assert(runPrincipal_);
318  assert(runPrincipal_->runID().isValid());
319  readSubRun();
320 
321  // We only enable subrun finalization if reading was successful.
322  // This appears to be a design weakness.
323  finalizeSubRunEnabled_ = true;
324  if (handleEmptySubRuns_) {
326  beginSubRun();
327  }
328  }
329 
330  template <>
331  void
332  EventProcessor::finalize<Level::SubRun>()
333  {
334  if (!finalizeSubRunEnabled_) {
335  return;
336  }
337 
338  assert(subRunPrincipal_);
339  if (subRunPrincipal_->subRunID().isFlush()) {
340  return;
341  }
342 
345  if (beginSubRunCalled_) {
346  endSubRun();
347  }
348  writeSubRun();
349  finalizeSubRunEnabled_ = false;
350  }
351 
352  template <>
353  void
354  EventProcessor::finalize<Level::Run>()
355  {
356  if (!finalizeRunEnabled_) {
357  return;
358  }
359 
360  assert(runPrincipal_);
361  if (runPrincipal_->runID().isFlush()) {
362  return;
363  }
364 
367  if (beginRunCalled_) {
368  endRun();
369  }
370  writeRun();
371  finalizeRunEnabled_ = false;
372  }
373 
374  template <>
375  void
376  EventProcessor::finalize<Level::InputFile>()
377  {
378  if (nextLevel_.load() == Level::Job) {
379  closeAllFiles();
380  } else {
381  closeInputFile();
382  }
383  }
384 
385  template <>
386  void
387  EventProcessor::finalize<Level::Job>()
388  {
389  endJob();
390  timer_->stop();
391  }
392 
393  template <>
394  void
395  EventProcessor::finalizeContainingLevels<Level::SubRun>()
396  {
397  finalize<Level::Run>();
398  }
399 
400  template <>
401  void
402  EventProcessor::finalizeContainingLevels<Level::Event>()
403  {
404  finalize<Level::SubRun>();
405  finalize<Level::Run>();
406  }
407 
408  template <>
409  void
410  EventProcessor::recordOutputModuleClosureRequests<Level::Run>()
411  {
413  }
414 
415  template <>
416  void
417  EventProcessor::recordOutputModuleClosureRequests<Level::SubRun>()
418  {
420  }
421 
422  template <>
423  void
424  EventProcessor::recordOutputModuleClosureRequests<Level::Event>()
425  {
427  }
428 
429  //=============================================
430  // Job level
431 
432  void
434  {
435  FDEBUG(1) << string(8, ' ') << "beginJob\n";
437  // NOTE: This implementation assumes 'Job' means one call the
438  // EventProcessor::run. If it really means once per 'application'
439  // then this code will have to be changed. Also have to deal with
440  // case where have 'run' then new Module added and do 'run' again.
441  // In that case the newly added Module needs its 'beginJob' to be
442  // called.
443  try {
444  input_->doBeginJob();
445  }
446  catch (cet::exception& e) {
447  mf::LogError("BeginJob") << "A cet::exception happened while processing"
448  " the beginJob of the 'source'";
449  e << "A cet::exception happened while processing"
450  " the beginJob of the 'source'\n";
451  throw;
452  }
453  catch (exception const&) {
454  mf::LogError("BeginJob") << "A exception happened while processing"
455  " the beginJob of the 'source'";
456  throw;
457  }
458  catch (...) {
459  mf::LogError("BeginJob") << "An unknown exception happened while"
460  " processing the beginJob of the 'source'";
461  throw;
462  }
465  });
466  actReg_.sPostBeginJob.invoke();
468  }
469 
470  void
472  {
473  FDEBUG(1) << string(8, ' ') << "endJob\n";
474  ec_->call([this] { endJobAllSchedules(); });
475  ec_->call([] { ConsumesInfo::instance()->showMissingConsumes(); });
476  ec_->call([this] { input_->doEndJob(); });
477  ec_->call([this] { actReg_.sPostEndJob.invoke(); });
478  ec_->call([] { mf::LogStatistics(); });
479  ec_->call([this] {
481  });
482  }
483 
484  void
486  {
488  [this](ScheduleID const sid) { schedule(sid).endJob(); });
489  }
490 
491  //====================================================
492  // File level
493 
494  void
496  {
497  actReg_.sPreOpenFile.invoke();
498  FDEBUG(1) << string(8, ' ') << "openInputFile\n";
499  fb_.reset(input_->readFile().release());
500  if (fb_ == nullptr) {
502  << "Source readFile() did not return a valid FileBlock: FileBlock "
503  << "should be valid or readFile() should throw.\n";
504  }
505  actReg_.sPostOpenFile.invoke(fb_->fileName());
507  }
508 
509  void
511  {
513  closeInputFile();
514  }
515 
516  void
518  {
520  // Output-file closing on input-file boundaries are tricky since
521  // input files must outlive the output files, which often have
522  // data copied forward from the input files. That's why the
523  // recordOutputClosureRequests call is made here instead of in a
524  // specialization of recordOutputModuleClosureRequests<>.
526  if (main_schedule().outputsToClose()) {
528  }
530  actReg_.sPreCloseFile.invoke();
531  input_->closeFile();
532  actReg_.sPostCloseFile.invoke();
533  FDEBUG(1) << string(8, ' ') << "closeInputFile\n";
534  }
535 
536  void
538  {
539  if (!main_schedule().someOutputsOpen()) {
540  return;
541  }
544  FDEBUG(1) << string(8, ' ') << "closeAllOutputFiles\n";
545  }
546 
547  bool
549  {
550  bool outputs_to_open{false};
551  auto check_outputs_to_open = [this,
552  &outputs_to_open](ScheduleID const sid) {
553  if (schedule(sid).outputsToOpen()) {
554  outputs_to_open = true;
555  }
556  };
557  scheduleIteration_.for_each_schedule(check_outputs_to_open);
558  return outputs_to_open;
559  }
560 
561  void
563  {
564  if (!outputsToOpen()) {
565  return;
566  }
567 
568  auto open_some_outputs = [this](ScheduleID const sid) {
570  };
571  scheduleIteration_.for_each_schedule(open_some_outputs);
572 
573  FDEBUG(1) << string(8, ' ') << "openSomeOutputFiles\n";
575  }
576 
577  void
579  {
581  FDEBUG(1) << string(8, ' ') << "setOutputFileStatus\n";
582  }
583 
584  void
586  {
587  // Precondition: there are SOME output files that have been
588  // flagged as needing to close. Otherwise,
589  // 'respondtoCloseOutputFiles' will be needlessly
590  // called.
591  assert(main_schedule().outputsToClose());
594  FDEBUG(1) << string(8, ' ') << "closeSomeOutputFiles\n";
595  }
596 
597  void
599  {
602  });
603  FDEBUG(1) << string(8, ' ') << "respondToOpenInputFile\n";
604  }
605 
606  void
608  {
611  });
612  FDEBUG(1) << string(8, ' ') << "respondToCloseInputFile\n";
613  }
614 
615  void
617  {
620  });
621  FDEBUG(1) << string(8, ' ') << "respondToOpenOutputFiles\n";
622  }
623 
624  void
626  {
629  });
630  FDEBUG(1) << string(8, ' ') << "respondToCloseOutputFiles\n";
631  }
632 
633  //=============================================
634  // Run level
635 
636  void
638  {
639  actReg_.sPreSourceRun.invoke();
640  runPrincipal_.reset(input_->readRun().release());
641  assert(runPrincipal_);
642  auto rsh = input_->runRangeSetHandler();
643  assert(rsh);
644  auto seed_range_set = [this, &rsh](ScheduleID const sid) {
645  schedule(sid).seedRunRangeSet(*rsh);
646  };
647  scheduleIteration_.for_each_schedule(seed_range_set);
648  // The intended behavior here is that the producing services which
649  // are called during the sPostReadRun cannot see each others put
650  // products. We enforce this by creating the groups for the
651  // produced products, but do not allow the lookups to find them
652  // until after the callbacks have run.
653  runPrincipal_->createGroupsForProducedProducts(
655  psSignals_->sPostReadRun.invoke(*runPrincipal_);
656  runPrincipal_->enableLookupOfProducedProducts();
657  {
658  auto const r =
659  std::as_const(*runPrincipal_).makeRun(invalid_module_context);
660  actReg_.sPostSourceRun.invoke(r);
661  }
662  FDEBUG(1) << string(8, ' ') << "readRun.....................("
663  << runPrincipal_->runID() << ")\n";
664  }
665 
666  void
668  {
669  assert(runPrincipal_);
670  RunID const r{runPrincipal_->runID()};
671  if (r.isFlush()) {
672  return;
673  }
674  finalizeRunEnabled_ = true;
675  try {
676  {
677  auto const run =
678  std::as_const(*runPrincipal_).makeRun(invalid_module_context);
679  actReg_.sPreBeginRun.invoke(run);
680  }
683  });
684  {
685  auto const run =
686  std::as_const(*runPrincipal_).makeRun(invalid_module_context);
687  actReg_.sPostBeginRun.invoke(run);
688  }
689  }
690  catch (cet::exception& ex) {
691  throw Exception{
693  "EventProcessor: an exception occurred during current event processing",
694  ex};
695  }
696  catch (...) {
697  mf::LogError("PassingThrough")
698  << "an exception occurred during current event processing";
699  throw;
700  }
701  FDEBUG(1) << string(8, ' ') << "beginRun....................(" << r
702  << ")\n";
703  beginRunCalled_ = true;
704  }
705 
706  void
708  {
709  if (!beginRunCalled_) {
710  beginRun();
711  }
712  }
713 
714  void
716  {
717  assert(runPrincipal_);
718  FDEBUG(1) << string(8, ' ') << "setRunAuxiliaryRangeSetID...("
719  << runPrincipal_->runID() << ")\n";
720  if (main_schedule().runRangeSetHandler().type() ==
722  // We are using EmptyEvent source, need to merge what the
723  // schedules have seen.
724  auto mergedRS = RangeSet::invalid();
725  auto merge_range_sets = [this, &mergedRS](ScheduleID const sid) {
726  auto const& rs = schedule(sid).runRangeSetHandler().seenRanges();
727  // The following constructor ensures that the range is sorted
728  // before 'merge' is called.
729  RangeSet const tmp{rs.run(), rs.ranges()};
730  mergedRS.merge(tmp);
731  };
732  scheduleIteration_.for_each_schedule(merge_range_sets);
733  runPrincipal_->updateSeenRanges(mergedRS);
734  auto update_executors = [this, &mergedRS](ScheduleID const sid) {
735  schedule(sid).setRunAuxiliaryRangeSetID(mergedRS);
736  };
737  scheduleIteration_.for_each_schedule(update_executors);
738  return;
739  }
740 
741  // Since we are using already existing ranges, all the range set
742  // handlers have the same ranges, use the first one. handler with
743  // the largest event number, that will be the one which we will
744  // use as the file switch boundary. Note that is may not match
745  // the exactly the schedule that triggered the switch. Do we need
746  // to fix this?
747  unique_ptr<RangeSetHandler> rshAtSwitch{
749  if (main_schedule().fileStatus() != OutputFileStatus::Switching) {
750  // We are at the end of the job.
751  rshAtSwitch->flushRanges();
752  }
753  runPrincipal_->updateSeenRanges(rshAtSwitch->seenRanges());
754  main_schedule().setRunAuxiliaryRangeSetID(rshAtSwitch->seenRanges());
755  }
756 
757  void
759  {
760  assert(runPrincipal_);
761  // Precondition: The RunID does not correspond to a flush ID. --
762  // N.B. The flush flag is not explicitly checked here since endRun
763  // is only called from finalizeRun, which is where the check
764  // happens.
765  RunID const run{runPrincipal_->runID()};
766  assert(!run.isFlush());
767  try {
768  actReg_.sPreEndRun.invoke(runPrincipal_->runID(),
769  runPrincipal_->endTime());
772  });
773  auto const r =
774  std::as_const(*runPrincipal_).makeRun(invalid_module_context);
775  actReg_.sPostEndRun.invoke(r);
776  }
777  catch (cet::exception& ex) {
778  throw Exception{
780  "EventProcessor: an exception occurred during current event processing",
781  ex};
782  }
783  catch (...) {
784  mf::LogError("PassingThrough")
785  << "an exception occurred during current event processing";
786  throw;
787  }
788  FDEBUG(1) << string(8, ' ') << "endRun......................(" << run
789  << ")\n";
790  beginRunCalled_ = false;
791  }
792 
793  void
795  {
796  assert(runPrincipal_);
797  // Precondition: The RunID does not correspond to a flush ID.
798  RunID const r{runPrincipal_->runID()};
799  assert(!r.isFlush());
801  FDEBUG(1) << string(8, ' ') << "writeRun....................(" << r
802  << ")\n";
803  }
804 
805  //=============================================
806  // SubRun level
807 
808  void
810  {
811  actReg_.sPreSourceSubRun.invoke();
812  subRunPrincipal_.reset(input_->readSubRun(runPrincipal_.get()).release());
813  assert(subRunPrincipal_);
814  auto rsh = input_->subRunRangeSetHandler();
815  assert(rsh);
816  auto seed_range_set = [this, &rsh](ScheduleID const sid) {
817  schedule(sid).seedSubRunRangeSet(*rsh);
818  };
819  scheduleIteration_.for_each_schedule(seed_range_set);
820  // The intended behavior here is that the producing services which
821  // are called during the sPostReadSubRun cannot see each others
822  // put products. We enforce this by creating the groups for the
823  // produced products, but do not allow the lookups to find them
824  // until after the callbacks have run.
825  subRunPrincipal_->createGroupsForProducedProducts(
827  psSignals_->sPostReadSubRun.invoke(*subRunPrincipal_);
828  subRunPrincipal_->enableLookupOfProducedProducts();
829  {
830  auto const sr =
831  std::as_const(*subRunPrincipal_).makeSubRun(invalid_module_context);
832  actReg_.sPostSourceSubRun.invoke(sr);
833  }
834  FDEBUG(1) << string(8, ' ') << "readSubRun..................("
835  << subRunPrincipal_->subRunID() << ")\n";
836  }
837 
838  void
840  {
841  assert(subRunPrincipal_);
842  SubRunID const sr{subRunPrincipal_->subRunID()};
843  if (sr.isFlush()) {
844  return;
845  }
846  finalizeSubRunEnabled_ = true;
847  try {
848  {
849  auto const srun =
850  std::as_const(*subRunPrincipal_).makeSubRun(invalid_module_context);
851  actReg_.sPreBeginSubRun.invoke(srun);
852  }
855  });
856  {
857  auto const srun =
858  std::as_const(*subRunPrincipal_).makeSubRun(invalid_module_context);
859  actReg_.sPostBeginSubRun.invoke(srun);
860  }
861  }
862  catch (cet::exception& ex) {
863  throw Exception{
865  "EventProcessor: an exception occurred during current event processing",
866  ex};
867  }
868  catch (...) {
869  mf::LogError("PassingThrough")
870  << "an exception occurred during current event processing";
871  throw;
872  }
873  FDEBUG(1) << string(8, ' ') << "beginSubRun.................(" << sr
874  << ")\n";
875  beginSubRunCalled_ = true;
876  }
877 
878  void
880  {
881  if (!beginSubRunCalled_) {
882  beginSubRun();
883  }
884  }
885 
886  void
888  {
889  assert(subRunPrincipal_);
890  FDEBUG(1) << string(8, ' ') << "setSubRunAuxiliaryRangeSetID("
891  << subRunPrincipal_->subRunID() << ")\n";
892  if (main_schedule().subRunRangeSetHandler().type() ==
894  // We are using EmptyEvent source, need to merge what the
895  // schedules have seen.
896  auto mergedRS = RangeSet::invalid();
897  auto merge_range_sets = [this, &mergedRS](ScheduleID const sid) {
898  auto const& rs = schedule(sid).subRunRangeSetHandler().seenRanges();
899  // The following constructor ensures that the range is sorted
900  // before 'merge' is called.
901  RangeSet const tmp{rs.run(), rs.ranges()};
902  mergedRS.merge(tmp);
903  };
904  scheduleIteration_.for_each_schedule(merge_range_sets);
905  subRunPrincipal_->updateSeenRanges(mergedRS);
906  auto update_executors = [this, &mergedRS](ScheduleID const sid) {
907  schedule(sid).setSubRunAuxiliaryRangeSetID(mergedRS);
908  };
909  scheduleIteration_.for_each_schedule(update_executors);
910  return;
911  }
912  // Ranges are split/flushed only for a RangeSetHandler whose
913  // dynamic type is 'ClosedRangeSetHandler'.
914  //
915  // Consider the following range-sets
916  //
917  // SubRun RangeSet:
918  //
919  // { Run 1 : SubRun 1 : Events [1,7) } <-- Current
920  //
921  // Run RangeSet:
922  //
923  // { Run 1 : SubRun 0 : Events [5,11)
924  // SubRun 1 : Events [1,7) <-- Current
925  // SubRun 1 : Events [9,15) }
926  //
927  // For a range split just before SubRun 1, Event 6, the
928  // range sets should become:
929  //
930  // SubRun RangeSet:
931  //
932  // { Run 1 : SubRun 1 : Events [1,6)
933  // SubRun 1 : Events [6,7) } <-- Updated
934  //
935  // Run RangeSet:
936  //
937  // { Run 1 : SubRun 0 : Events [5,11)
938  // SubRun 1 : Events [1,6)
939  // SubRun 1 : Events [6,7) <-- Updated
940  // SubRun 1 : Events [9,15) }
941  //
942  // Since we are using already existing ranges, all the range set
943  // handlers have the same ranges. Find the closed range set
944  // handler with the largest event number, that will be the one
945  // which we will use as the file switch boundary. Note that is
946  // may not match the exactly the schedule that triggered the
947  // switch. Do we need to fix this?
948  //
949  // If we do not find any handlers with valid event info then we
950  // use the first one, which is just fine. This happens for
951  // example when we are dropping all events.
952  unsigned largestEvent = 1U;
953  ScheduleID idxOfMax{ScheduleID::first()};
955  auto& val = main_schedule().subRunRangeSetHandler();
956  auto& rsh = dynamic_cast<ClosedRangeSetHandler const&>(val);
957  // Make sure the event number is a valid event number before using
958  // it. It can be invalid in the handler if we have not yet read an
959  // event, which happens with empty subruns and when we are
960  // dropping all events.
961  if (rsh.eventInfo().id().isValid() && !rsh.eventInfo().id().isFlush()) {
962  if (rsh.eventInfo().id().event() > largestEvent) {
963  largestEvent = rsh.eventInfo().id().event();
964  idxOfMax = idx;
965  }
966  }
967  idx = idx.next();
968 
969  unique_ptr<RangeSetHandler> rshAtSwitch{
971  if (main_schedule().fileStatus() == OutputFileStatus::Switching) {
972  rshAtSwitch->maybeSplitRange();
973  unique_ptr<RangeSetHandler> runRSHAtSwitch{
974  schedule(idxOfMax).runRangeSetHandler().clone()};
975  runRSHAtSwitch->maybeSplitRange();
976  main_schedule().seedRunRangeSet(*runRSHAtSwitch);
977  } else {
978  // We are at the end of the job.
979  rshAtSwitch->flushRanges();
980  }
981  main_schedule().seedSubRunRangeSet(*rshAtSwitch);
982  subRunPrincipal_->updateSeenRanges(rshAtSwitch->seenRanges());
983  main_schedule().setSubRunAuxiliaryRangeSetID(rshAtSwitch->seenRanges());
984  }
985 
986  void
988  {
989  assert(subRunPrincipal_);
990  // Precondition: The SubRunID does not correspond to a flush ID.
991  // Note: the flush flag is not explicitly checked here since
992  // endSubRun is only called from finalizeSubRun, which is where the
993  // check happens.
994  SubRunID const sr{subRunPrincipal_->subRunID()};
995  assert(!sr.isFlush());
996  try {
997  actReg_.sPreEndSubRun.invoke(subRunPrincipal_->subRunID(),
998  subRunPrincipal_->endTime());
1001  });
1002  auto const srun =
1003  std::as_const(*subRunPrincipal_).makeSubRun(invalid_module_context);
1004  actReg_.sPostEndSubRun.invoke(srun);
1005  }
1006  catch (cet::exception& ex) {
1007  throw Exception{
1009  "EventProcessor: an exception occurred during current event processing",
1010  ex};
1011  }
1012  catch (...) {
1013  mf::LogError("PassingThrough")
1014  << "an exception occurred during current event processing";
1015  throw;
1016  }
1017  FDEBUG(1) << string(8, ' ') << "endSubRun...................(" << sr
1018  << ")\n";
1019  beginSubRunCalled_ = false;
1020  }
1021 
1022  void
1024  {
1025  assert(subRunPrincipal_);
1026  // Precondition: The SubRunID does not correspond to a flush ID.
1027  SubRunID const& sr{subRunPrincipal_->subRunID()};
1028  assert(!sr.isFlush());
1030  FDEBUG(1) << string(8, ' ') << "writeSubRun.................(" << sr
1031  << ")\n";
1032  }
1033 
1034  // ==============================================================================
1035  // Event level
1036 
1037  template <>
1038  void
1039  EventProcessor::process<most_deeply_nested_level()>()
1040  {
1041  if ((shutdown_flag > 0) || !ec_->empty()) {
1042  return;
1043  }
1044  // Note: This loop is to allow output file switching to happen in
1045  // the main thread.
1046  firstEvent_ = true;
1047  bool done = false;
1048  while (!done) {
1051 
1052  auto const last_schedule_index = scheduler_->num_schedules() - 1;
1053  for (ScheduleID::size_type i = 0; i != last_schedule_index; ++i) {
1054  taskGroup_->run([this, i] { processAllEventsAsync(ScheduleID(i)); });
1055  }
1056  taskGroup_->native_group().run_and_wait([this, last_schedule_index] {
1057  processAllEventsAsync(ScheduleID(last_schedule_index));
1058  });
1059 
1060  // If anything bad happened during event processing, let the
1061  // user know.
1063  if (!fileSwitchInProgress_.load()) {
1064  done = true;
1065  continue;
1066  }
1068  finalizeContainingLevels<most_deeply_nested_level()>();
1071  FDEBUG(1) << string(8, ' ') << "closeSomeOutputFiles\n";
1072  // We started the switch after advancing to the next item type;
1073  // we must make sure that we read that event before advancing
1074  // the item type again.
1075  firstEvent_ = true;
1076  fileSwitchInProgress_ = false;
1077  }
1078  }
1079 
1080  // This is the event loop (also known as the schedule head). It
1081  // calls readAndProcessAsync, which reads and processes a single
1082  // event, creates itself again as a continuation task, and then
1083  // exits.
1084  void
1086  {
1087  // Note: We are part of the processAllEventsTask (schedule head
1088  // task), and our parent is the eventLoopTask.
1089  TDEBUG_BEGIN_FUNC_SI(4, sid);
1090  try {
1091  readAndProcessAsync(sid);
1092  }
1093  catch (...) {
1095  TDEBUG_END_FUNC_SI(4, sid) << "terminate event loop because of EXCEPTION";
1096  return;
1097  }
1098  // If no exception, then end this task, which does not terminate
1099  // event processing because readAndProcessAsync creates a
1100  // continuation task.
1101  TDEBUG_END_FUNC_SI(4, sid);
1102  }
1103 
1104  // This function is executed as part of the readAndProcessEvent
1105  // task, our parent task is the EventLoopTask. Here we advance to
1106  // the next item in the file index, end event processing if it is
1107  // not an event, or if the user has requested a shutdown, read the
1108  // event, and then call another function to do the processing.
1109  void
1111  {
1112  // Note: We are part of the readAndProcessEventTask (schedule head
1113  // task), and our parent task is the eventLoopTask.
1114  TDEBUG_BEGIN_FUNC_SI(4, sid);
1115  // Note: shutdown_flag is a extern global atomic int in
1116  // art/art/Utilities/UnixSignalHandlers.cc
1117  if (shutdown_flag) {
1118  // User called for a clean shutdown using a signal or ctrl-c,
1119  // end event processing and this task.
1120  TDEBUG_END_FUNC_SI(4, sid) << "CLEAN SHUTDOWN";
1121  return;
1122  }
1123 
1124  // The item type advance and the event read must be done with the
1125  // input source lock held; however event-processing must not
1126  // serialized.
1127  {
1128  InputSourceMutexSentry lock_input;
1129  if (fileSwitchInProgress_.load()) {
1130  // We must avoid advancing the iterator after a schedule has
1131  // noticed it is time to switch files. After the switch, we
1132  // will need to set firstEvent_ true so that the first
1133  // schedule that resumes after the switch actually reads the
1134  // event that the first schedule which noticed we needed a
1135  // switch had advanced the iterator to.
1136 
1137  // Note: We still have the problem that because the schedules
1138  // do not read events at the same time the file switch point
1139  // can be up to nschedules-1 ahead of where it would have been
1140  // if there was only one schedule. If we are switching output
1141  // files every event in an attempt to create single event
1142  // files, this really does not work out too well.
1143  TDEBUG_END_FUNC_SI(4, sid) << "FILE SWITCH";
1144  return;
1145  }
1146  // Check the next item type and exit this task if it is not an
1147  // event, or if the user has asynchronously requested a
1148  // shutdown.
1149  auto expected = true;
1150  if (firstEvent_.compare_exchange_strong(expected, false)) {
1151  // Do not advance the item type on the first event.
1152  } else {
1153  // Do the advance item type.
1154  if (nextLevel_.load() == Level::ReadyToAdvance) {
1155  // See what the next item is.
1156  TDEBUG_FUNC_SI(5, sid) << "Calling advanceItemType()";
1158  }
1159  if ((nextLevel_.load() < most_deeply_nested_level()) ||
1160  (nextLevel_.load() == highest_level())) {
1161  // We are popping up, end event processing and this task.
1162  TDEBUG_END_FUNC_SI(4, sid) << "END OF SUBRUN";
1163  return;
1164  }
1165  if (nextLevel_.load() != most_deeply_nested_level()) {
1166  // Error: incorrect level hierarchy
1167  TDEBUG_END_FUNC_SI(4, sid) << "BAD HIERARCHY";
1168  throw Exception{errors::LogicError} << "Incorrect level hierarchy.";
1169  }
1171  // At this point we have determined that we are going to read
1172  // an event and we must do that before dropping the lock on
1173  // the input source which is what is protecting us against a
1174  // double-advance caused by a different schedule.
1175  if (schedule(sid).outputsToClose()) {
1176  fileSwitchInProgress_ = true;
1177  TDEBUG_END_FUNC_SI(4, sid) << "FILE SWITCH INITIATED";
1178  return;
1179  }
1180  }
1181 
1182  // Now we can read the event from the source.
1183  ScheduleContext const sc{sid};
1184  assert(subRunPrincipal_);
1185  assert(subRunPrincipal_->subRunID().isValid());
1186  actReg_.sPreSourceEvent.invoke(sc);
1187  TDEBUG_FUNC_SI(5, sid) << "Calling input_->readEvent(subRunPrincipal_)";
1188  auto ep = input_->readEvent(subRunPrincipal_.get());
1189  assert(ep);
1190  // The intended behavior here is that the producing services
1191  // which are called during the sPostReadEvent cannot see each
1192  // others put products. We enforce this by creating the groups
1193  // for the produced products, but do not allow the lookups to
1194  // find them until after the callbacks have run.
1195  ep->createGroupsForProducedProducts(producedProductLookupTables_);
1196  psSignals_->sPostReadEvent.invoke(*ep);
1197  ep->enableLookupOfProducedProducts();
1198  actReg_.sPostSourceEvent.invoke(
1199  std::as_const(*ep).makeEvent(invalid_module_context), sc);
1200  FDEBUG(1) << string(8, ' ') << "readEvent...................("
1201  << ep->eventID() << ")\n";
1202  schedule(sid).accept_principal(std::move(ep));
1203  // Now we drop the input source lock by exiting the guarded
1204  // scope.
1205  }
1206  if (schedule(sid).event_principal().eventID().isFlush()) {
1207  // No processing to do, start next event handling task.
1208  processAllEventsAsync(sid);
1209  TDEBUG_END_FUNC_SI(4, sid) << "FLUSH EVENT";
1210  return;
1211  }
1212 
1213  // Now process the event.
1214  processEventAsync(sid);
1215  TDEBUG_END_FUNC_SI(4, sid);
1216  }
1217 
1218  // ----------------------------------------------------------------------------
1220  public:
1222  : evp_{evp}, sid_{sid}
1223  {}
1224 
1225  void
1226  operator()(std::exception_ptr ex) const
1227  {
1228  TDEBUG_BEGIN_TASK_SI(4, sid_);
1229  if (ex) {
1230  try {
1231  rethrow_exception(ex);
1232  }
1233  catch (cet::exception& e) {
1234  if (evp_->error_action(e) != actions::IgnoreCompletely) {
1235  evp_->sharedException_.store<Exception>(
1237  "EventProcessor: an exception occurred during current "
1238  "event processing",
1239  e);
1240  TDEBUG_END_TASK_SI(4, sid_);
1241  return;
1242  }
1243  mf::LogWarning(e.category())
1244  << "exception being ignored for current event:\n"
1245  << cet::trim_right_copy(e.what(), " \n");
1246  // WARNING: We continue processing after the catch blocks!!!
1247  }
1248  catch (...) {
1249  mf::LogError("PassingThrough")
1250  << "an exception occurred during current event processing";
1251  evp_->sharedException_.store_current();
1252  TDEBUG_END_TASK_SI(4, sid_);
1253  return;
1254  }
1255  }
1256 
1257  evp_->finishEventAsync(sid_);
1258 
1259  TDEBUG_END_TASK_SI(4, sid_);
1260  }
1261 
1262  private:
1265  };
1266 
1267  // ----------------------------------------------------------------------------
1269  public:
1271  : evp_{evp}, sid_{sid}
1272  {}
1273 
1274  void
1275  operator()(exception_ptr const ex)
1276  {
1277  // Note: When we start our parent is the eventLoopTask.
1278  TDEBUG_BEGIN_TASK_SI(4, sid_);
1279  if (ex) {
1280  try {
1281  rethrow_exception(ex);
1282  }
1283  catch (cet::exception& e) {
1284  auto const action = evp_->error_action(e);
1285  if (action != actions::IgnoreCompletely) {
1286  assert(action != actions::FailModule);
1287  assert(action != actions::FailPath);
1288  if (action == actions::SkipEvent) {
1289  mf::LogWarning(e.category())
1290  << "Skipping event due to the following exception:\n"
1291  << cet::trim_right_copy(e.what(), " \n");
1292  TDEBUG_END_TASK_SI(4, sid_)
1293  << "skipping event because of EXCEPTION";
1294  return;
1295  }
1296  evp_->sharedException_.store<Exception>(
1298  "EventProcessor: an exception occurred during current "
1299  "event processing",
1300  e);
1301  TDEBUG_END_TASK_SI(4, sid_)
1302  << "terminate event loop because of EXCEPTION";
1303  return;
1304  }
1305  mf::LogWarning(e.category())
1306  << "exception being ignored for current event:\n"
1307  << cet::trim_right_copy(e.what(), " \n");
1308  // WARNING: We continue processing after the catch blocks!!!
1309  }
1310  catch (...) {
1311  mf::LogError("PassingThrough")
1312  << "an exception occurred during current event processing";
1313  evp_->sharedException_.store_current();
1314  TDEBUG_END_TASK_SI(4, sid_)
1315  << "terminate event loop because of EXCEPTION";
1316  return;
1317  }
1318  }
1319 
1320  auto finalize_event_task =
1321  make_waiting_task<EndPathRunnerTask>(evp_, sid_);
1322  try {
1323  evp_->schedule(sid_).process_event_observers(finalize_event_task);
1324  }
1325  catch (cet::exception& e) {
1326  if (evp_->error_action(e) != actions::IgnoreCompletely) {
1327  evp_->sharedException_.store<Exception>(
1329  "EventProcessor: an exception occurred during current event "
1330  "processing",
1331  e);
1332  TDEBUG_END_TASK_SI(4, sid_)
1333  << "terminate event loop because of EXCEPTION";
1334  return;
1335  }
1336  mf::LogWarning(e.category())
1337  << "exception being ignored for current event:\n"
1338  << cet::trim_right_copy(e.what(), " \n");
1339  // WARNING: We continue processing after the catch blocks!!!
1340  }
1341  catch (...) {
1342  mf::LogError("PassingThrough")
1343  << "an exception occurred during current event processing";
1344  evp_->sharedException_.store_current();
1345  TDEBUG_END_TASK_SI(4, sid_)
1346  << "terminate event loop because of EXCEPTION";
1347  return;
1348  }
1349 
1350  // Once the end path processing is done, exit this task, which
1351  // does not end event-processing because of the continuation
1352  // task.
1353  TDEBUG_END_TASK_SI(4, sid_);
1354  }
1355 
1356  private:
1359  };
1360 
1361  // This function is a continuation of the body of the
1362  // readAndProcessEvent task. Here we call down to Schedule to do the
1363  // trigger path processing, passing it a waiting task which will do
1364  // the end path processing, finalize the event, and start the next
1365  // read and process event task. Note that Schedule will spawn a
1366  // task to process each of the trigger paths, and then when they are
1367  // finished, insert the trigger results, and then spawn the waiting
1368  // task we gave it to do the end path processing, write the event,
1369  // and then start the next event processing task.
1370  void
1372  try {
1373  // Note: We are part of the readAndProcessEventTask (schedule head
1374  // task), and our parent task is the eventLoopTask.
1375  TDEBUG_BEGIN_FUNC_SI(4, sid);
1376  assert(!schedule(sid).event_principal().eventID().isFlush());
1377  // Continue processing via the creation of a continuation.
1378  auto endPathTask = make_waiting_task<EndPathTask>(this, sid);
1379  // Start the trigger paths running. When they finish they will
1380  // spawn the endPathTask which will run the end path, write the
1381  // event, and start the next event processing task.
1382  schedule(sid).process_event_modifiers(endPathTask);
1383  TDEBUG_END_FUNC_SI(4, sid);
1384  }
1385  catch (cet::exception& e) {
1386  // Upon exiting this scope, end this task, terminating event
1387  // processing.
1388  auto const action = error_action(e);
1389  if (action != actions::IgnoreCompletely) {
1390  assert(action != actions::FailModule);
1391  assert(action != actions::FailPath);
1394  "EventProcessor: an exception occurred during current event processing",
1395  e);
1396  TDEBUG_END_FUNC_SI(4, sid) << "terminate event loop because of EXCEPTION";
1397  return;
1398  }
1399  mf::LogWarning(e.category())
1400  << "exception being ignored for current event:\n"
1401  << cet::trim_right_copy(e.what(), " \n");
1402  TDEBUG_END_FUNC_SI(4, sid) << "Ignoring exception.";
1403  }
1404  catch (...) {
1405  mf::LogError("PassingThrough")
1406  << "an exception occurred during current event processing";
1408  TDEBUG_END_FUNC_SI(4, sid) << "terminate event loop because of EXCEPTION";
1409  }
1410 
1411  void
1413  {
1414  auto& ep = schedule(sid).event_principal();
1415  actReg_.sPostProcessEvent.invoke(
1416  std::as_const(ep).makeEvent(invalid_module_context),
1417  ScheduleContext{sid});
1418 
1419  // Note: We are part of the endPathTask.
1420  TDEBUG_BEGIN_FUNC_SI(4, sid);
1421  FDEBUG(1) << string(8, ' ') << "processEvent................("
1422  << ep.eventID() << ")\n";
1423  try {
1424  // Ask the output workers if they have reached their limits, and
1425  // if so setup to end the job the next time around the event
1426  // loop.
1427  FDEBUG(1) << string(8, ' ') << "shouldWeStop\n";
1428  static std::mutex m;
1429  std::lock_guard sentry{m};
1430  // Now we can write the results of processing to the outputs,
1431  // and delete the event principal.
1432  if (!ep.eventID().isFlush()) {
1433  // Possibly open new output files. This is safe to do because
1434  // EndPathExecutor functions are called in a serialized
1435  // context.
1436  TDEBUG_FUNC_SI(5, sid) << "Calling openSomeOutputFiles()";
1438  TDEBUG_FUNC_SI(5, sid) << "Calling schedule(sid).writeEvent()";
1439 
1440  auto const id = ep.eventID();
1441  schedule(sid).writeEvent();
1442  FDEBUG(1) << string(8, ' ') << "writeEvent..................(" << id
1443  << ")\n";
1444  }
1445  TDEBUG_FUNC_SI(5, sid)
1446  << "Calling schedules_->"
1447  "recordOutputClosureRequests(Granularity::Event)";
1449  }
1450  catch (cet::exception& e) {
1454  "EventProcessor: an exception occurred "
1455  "during current event processing",
1456  e);
1457  // And then end this task, terminating event processing.
1458  TDEBUG_END_FUNC_SI(4, sid) << "EXCEPTION";
1459  return;
1460  }
1461  mf::LogWarning(e.category())
1462  << "exception being ignored for current event:\n"
1463  << cet::trim_right_copy(e.what(), " \n");
1464  // WARNING: We continue processing after the catch blocks!!!
1465  }
1466  catch (...) {
1467  mf::LogError("PassingThrough")
1468  << "an exception occurred during current event processing";
1470  // And then end this task, terminating event processing.
1471  TDEBUG_END_FUNC_SI(4, sid) << "EXCEPTION";
1472  return;
1473  }
1474 
1475  // The next event processing task is a continuation of this task.
1476  processAllEventsAsync(sid);
1477  TDEBUG_END_FUNC_SI(4, sid);
1478  }
1479 
1480  template <Level L>
1481  void
1483  {
1484  if ((shutdown_flag > 0) || !ec_->empty()) {
1485  return;
1486  }
1487  ec_->call([this] { begin<L>(); });
1488  while ((shutdown_flag == 0) && ec_->empty() &&
1489  levelsToProcess<level_down(L)>()) {
1490  ec_->call([this] { process<level_down(L)>(); });
1491  }
1492  ec_->call([this] {
1493  finalize<L>();
1494  recordOutputModuleClosureRequests<L>();
1495  });
1496  }
1497 
1500  {
1501  StatusCode returnCode{epSuccess};
1502  ec_->call([this, &returnCode] {
1503  process<highest_level()>();
1504  if (shutdown_flag > 0) {
1505  returnCode = epSignal;
1506  }
1507  });
1508  if (!ec_->empty()) {
1510  ec_->rethrow();
1511  }
1512  return returnCode;
1513  }
1514 
1515  Level
1517  {
1518  auto const itemType = input_->nextItemType();
1519  FDEBUG(1) << string(4, ' ') << "*** nextItemType: " << itemType << " ***\n";
1520  switch (itemType) {
1521  case input::IsStop:
1522  return highest_level();
1523  case input::IsFile:
1524  return Level::InputFile;
1525  case input::IsRun:
1526  return Level::Run;
1527  case input::IsSubRun:
1528  return Level::SubRun;
1529  case input::IsEvent:
1530  return Level::Event;
1531  case input::IsInvalid:
1533  << "Invalid next item type presented to the event processor.\n"
1534  << "Please contact artists@fnal.gov.";
1535  }
1537  << "Unrecognized next item type presented to the event processor.\n"
1538  << "Please contact artists@fnal.gov.";
1539  }
1540 
1541  // ===============================================================================
1542 
1543  void
1545  try {
1546  if (ServiceRegistry::isAvailable<RandomNumberGenerator>()) {
1547  ServiceHandle<RandomNumberGenerator>()->saveToFile_();
1548  }
1549  }
1550  catch (...) {
1551  }
1552 
1553 } // namespace art
void readAndProcessAsync(ScheduleID sid)
Float_t x
Definition: compare.C:6
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
TRandom r
Definition: spectrum.C:23
void endJob()
Definition: Schedule.cc:40
void closeAllOutputFiles()
Definition: Schedule.h:85
void respondToCloseOutputFiles(FileBlock const &)
Definition: Schedule.cc:68
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreOpenFile
tsan_unique_ptr< InputSource > input_
void setSubRunAuxiliaryRangeSetID(RangeSet const &rs)
Definition: Schedule.h:161
void respondToCloseInputFile(FileBlock const &)
Definition: Schedule.cc:54
tsan< ProductDescriptions > producedProductDescriptions_
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPostBeginJob
static ConsumesInfo * instance()
Definition: ConsumesInfo.cc:27
RangeSetHandler const & runRangeSetHandler()
Definition: Schedule.h:149
tsan< UpdateOutputCallbacks > outputCallbacks_
std::unique_ptr< InputSource > make(fhicl::ParameterSet const &conf, InputSourceDescription &desc)
tsan< Scheduler > scheduler_
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreCloseFile
MaybeLogger_< ELseverityLevel::ELsev_info, false > LogInfo
GlobalSignal< detail::SignalResponseType::LIFO, void(Event const &, ScheduleContext)> sPostSourceEvent
tsan_unique_ptr< FileBlock > fb_
#define TDEBUG_END_TASK_SI(LEVEL, SI)
constexpr auto most_deeply_nested_level() noexcept
Definition: Level.h:44
bool const handleEmptyRuns_
GlobalSignal< detail::SignalResponseType::LIFO, void()> sPostEndJob
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostSourceRun
std::atomic< Level > nextLevel_
bool outputsToClose() const
Definition: Schedule.h:67
void seedSubRunRangeSet(RangeSetHandler const &rsh)
Definition: Schedule.h:156
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreSourceSubRun
Schedule & main_schedule()
void process(Transition, Principal &)
Definition: Schedule.cc:75
Level
Definition: Level.h:13
void beginJob(detail::SharedResources const &resources)
Definition: Schedule.cc:33
static constexpr ScheduleID first()
Definition: ScheduleID.h:50
STL namespace.
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostBeginRun
std::atomic< bool > firstEvent_
bool const handleEmptySubRuns_
tsan< PathManager > pathManager_
ScheduleIteration scheduleIteration_
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostBeginSubRun
Float_t tmp
Definition: plot.C:35
fhicl::ParameterSet const & triggerPSet() const
Definition: Globals.cc:60
std::unique_ptr< GlobalTaskGroup > taskGroup_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
tsan< detail::ExceptionCollector > ec_
tsan_unique_ptr< RunPrincipal > runPrincipal_
tsan< ProducingServiceSignals > psSignals_
std::atomic< int > shutdown_flag
void setOutputFileStatus(OutputFileStatus)
void processAllEventsAsync(ScheduleID sid)
void store(std::exception_ptr ex_ptr)
ProcessConfigurationID id() const
GlobalSignal< detail::SignalResponseType::LIFO, void(InputSource *, std::vector< Worker * > const &)> sPostBeginJobWorkers
EndPathRunnerTask(EventProcessor *evp, ScheduleID const sid)
void setRunAuxiliaryRangeSetID(RangeSet const &rs)
Definition: Schedule.h:137
decltype(auto) constexpr end(T &&obj)
ADL-aware version of std::end.
Definition: StdUtils.h:77
void freeze(tbb::task_group &group)
tsan< cet::cpu_timer > timer_
OutputFileStatus
EventPrincipal & event_principal()
Definition: Schedule.h:185
GlobalSignal< detail::SignalResponseType::LIFO, void(ModuleDescription const &)> sPostSourceConstruction
static auto instance(bool cleanup=false)
tsan< std::map< ScheduleID, Schedule > > schedules_
void beginJob()
Definition: Breakpoints.cc:14
ScheduleID::size_type nschedules() const
Definition: Globals.cc:24
detail::SharedResources sharedResources_
tsan< ProductTables > producedProductLookupTables_
#define TDEBUG_FUNC(LEVEL)
void writeSummary(PathManager &pm, bool wantSummary, cet::cpu_timer const &timer)
Definition: writeSummary.cc:89
std::atomic< bool > finalizeSubRunEnabled_
T get(std::string const &key) const
Definition: ParameterSet.h:314
actions::ActionCodes error_action(cet::exception &e) const
std::atomic< bool > beginRunCalled_
EndPathTask(EventProcessor *evp, ScheduleID const sid)
void writeEvent()
Definition: Schedule.h:103
std::atomic< bool > finalizeRunEnabled_
std::atomic< bool > fileSwitchInProgress_
RangeSetHandler const & subRunRangeSetHandler()
Definition: Schedule.h:172
#define TDEBUG_FUNC_SI(LEVEL, SI)
std::string const & getReleaseVersion()
void recordOutputClosureRequests(Granularity const granularity)
Definition: Schedule.h:73
tsan_unique_ptr< ServicesManager > servicesManager_
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
std::atomic< bool > beginSubRunCalled_
void setupSignals(bool want_sigint_enabled)
EventInfo const & eventInfo() const
void finishEventAsync(ScheduleID sid)
void LogStatistics()
ParameterSetID id() const
void respondToOpenOutputFiles(FileBlock const &)
Definition: Schedule.cc:61
GlobalSignal< detail::SignalResponseType::LIFO, void(Event const &, ScheduleContext)> sPostProcessEvent
void incrementInputFileNumber()
Definition: Schedule.h:112
RangeSet seenRanges() const
StatusCode runToCompletion()
bool outputsToOpen() const
Definition: Schedule.h:61
GlobalSignal< detail::SignalResponseType::LIFO, void(Run const &)> sPostEndRun
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
SharedException sharedException_
GlobalSignal< detail::SignalResponseType::FIFO, void(Run const &)> sPreBeginRun
char const * what() const noexcept override
void showMissingConsumes() const
GlobalSignal< detail::SignalResponseType::FIFO, void(SubRunID const &, Timestamp const &)> sPreEndSubRun
std::string const & processName() const
Definition: Globals.cc:48
id_type size_type
Definition: ScheduleID.h:25
void processEventAsync(ScheduleID sid)
Float_t sc
Definition: plot.C:23
GlobalSignal< detail::SignalResponseType::LIFO, void(std::string const &)> sPostOpenFile
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
static RangeSet invalid()
Definition: RangeSet.cc:45
void closeSomeOutputFiles()
Definition: Schedule.h:97
void setRequireConsumes(bool const)
Definition: ConsumesInfo.cc:96
void accept_principal(std::unique_ptr< EventPrincipal > principal)
Definition: Schedule.h:178
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
Definition: MVAAlg.h:12
GlobalSignal< detail::SignalResponseType::FIFO, void(RunID const &, Timestamp const &)> sPreEndRun
EventNumber_t event() const
Definition: EventID.h:116
RangeSetHandler * clone() const
bool erase(std::string const &key)
GlobalSignal< detail::SignalResponseType::FIFO, void(SubRun const &)> sPreBeginSubRun
Schedule & schedule(ScheduleID const id)
#define FDEBUG(LEVEL)
Definition: DebugMacros.h:25
ActivityRegistry actReg_
constexpr auto highest_level() noexcept
Definition: Level.h:32
void for_each_schedule(F f) const
static Globals * instance()
Definition: Globals.cc:17
void process_event_modifiers(hep::concurrency::WaitingTaskPtr endPathTask)
Definition: Schedule.cc:82
GlobalSignal< detail::SignalResponseType::LIFO, void()> sPostCloseFile
void beginSubRunIfNotDoneAlready()
Float_t e
Definition: plot.C:35
void seedRunRangeSet(RangeSetHandler const &rsh)
Definition: Schedule.h:131
void writeRun(RunPrincipal &rp)
Definition: Schedule.h:143
GlobalSignal< detail::SignalResponseType::FIFO, void(ScheduleContext)> sPreSourceEvent
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostSourceSubRun
void respondToOpenInputFile(FileBlock const &)
Definition: Schedule.cc:47
void openSomeOutputFiles(FileBlock const &fb)
Definition: Schedule.h:91
void operator()(exception_ptr const ex)
void put(std::string const &key)
void setSubRunAuxiliaryRangeSetID()
void writeSubRun(SubRunPrincipal &srp)
Definition: Schedule.h:166
GlobalSignal< detail::SignalResponseType::LIFO, void(SubRun const &)> sPostEndSubRun
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
GlobalSignal< detail::SignalResponseType::FIFO, void()> sPreSourceRun
void operator()(std::exception_ptr ex) const
void setOutputFileStatus(OutputFileStatus const ofs)
Definition: Schedule.h:118