LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
EventProcessor.h
Go to the documentation of this file.
1 #ifndef art_Framework_EventProcessor_EventProcessor_h
2 #define art_Framework_EventProcessor_EventProcessor_h
3 // vim: set sw=2 expandtab :
4 
5 // ===========================
6 // The art application object.
7 // ===========================
8 
18 #include "art/Framework/Core/fwd.h"
34 #include "cetlib/cpu_timer.h"
35 #include "fhiclcpp/fwd.h"
36 #include "hep_concurrency/thread_sanitize.h"
37 
38 #include <atomic>
39 #include <memory>
40 
41 namespace art {
42 
44  public:
45  // Status codes:
46  // 0 successful completion
47  // 3 signal received
48  // values are for historical reasons.
49  enum StatusCode { epSuccess = 0, epSignal = 3 };
50 
51  // Special Member Functions
52  explicit EventProcessor(fhicl::ParameterSet pset,
53  detail::EnabledModules enabled_modules);
55  EventProcessor(EventProcessor const&) = delete;
56  EventProcessor(EventProcessor&&) = delete;
57  EventProcessor& operator=(EventProcessor const&) = delete;
59 
60  // API to run_art
61  //
62  // Run the job until done, which means:
63  //
64  // - no more input data, or
65  // - input maxEvents parameter limit reached, or
66  // - output maxEvents parameter limit reached, or
67  // - input maxSubRuns parameter limit reached.
68  //
69  // Return values:
70  //
71  // epSignal: processing terminated early, SIGUSR2 encountered
72  // epSuccess: all other cases
73  //
75 
76  private:
77  class EndPathTask;
78  class EndPathRunnerTask;
79 
80  // Event-loop infrastructure
83  void processEventAsync(ScheduleID sid);
84  void finishEventAsync(ScheduleID sid);
85 
86  template <Level L>
87  bool levelsToProcess();
88  template <Level L>
89  std::enable_if_t<is_above_most_deeply_nested_level(L)> begin();
90  template <Level L>
91  void process();
92  template <Level L>
93  void finalize();
94  template <Level L>
95  void
97  {}
98  template <Level L>
99  void
101  {}
103 
104  // Level-specific member functions
105  void beginJob();
106  void endJob();
107  void endJobAllSchedules();
108  void openInputFile();
109  bool outputsToOpen();
110  void openSomeOutputFiles();
111  void closeInputFile();
112  void closeSomeOutputFiles();
113  void closeAllOutputFiles();
114  void closeAllFiles();
115  void respondToOpenInputFile();
119  void readRun();
120  void beginRun();
123  void endRun();
124  void writeRun();
125  void readSubRun();
126  void beginSubRun();
129  void endSubRun();
130  void writeSubRun();
131  void readEvent();
132  void processEvent();
133  void writeEvent();
136  void terminateAbnormally_();
137 
138  private:
139  template <typename T>
140  using tsan = hep::concurrency::thread_sanitize<T>;
141 
142  template <typename T>
143  using tsan_unique_ptr = hep::concurrency::thread_sanitize_unique_ptr<T>;
144 
145  Schedule&
147  {
148  return schedules_->at(id);
149  }
150 
151  Schedule&
153  {
154  return schedule(ScheduleID::first());
155  }
156 
159  {
160  return scheduler_->actionTable().find(e.root_cause());
161  }
162 
163  // Next containment level to move to.
164  std::atomic<Level> nextLevel_{Level::ReadyToAdvance};
165 
166  // Utility object to run a functor and collect any exceptions thrown.
168 
169  // Used for timing the job.
171 
172  // Used to keep track of whether or not we have already call beginRun.
173  std::atomic<bool> beginRunCalled_{false};
174 
175  // Used to keep track of whether or not we have already call beginSubRun.
176  std::atomic<bool> beginSubRunCalled_{false};
177 
178  // When set allows runs to end.
179  std::atomic<bool> finalizeRunEnabled_{false};
180 
181  // When set allows subruns to end.
182  std::atomic<bool> finalizeSubRunEnabled_{false};
183 
184  // A signal/slot system for registering a callback to be called
185  // when a specific action is taken by the framework.
187 
188  // Used to update various output fields in logged messages.
190 
191  // List of callbacks which, when invoked, can update the state of
192  // any output modules.
193  // FIXME: Used only in the ctor!
195 
196  // Product descriptions for the products that appear in
197  // produces<T>() clauses in modules. Note that this is the master
198  // copy and must be kept alive until producedProductLookupTables_
199  // is destroyed because it has references to us.
201 
202  // Product lookup tables for the products that appear in
203  // produces<T>() clauses in modules. Note that this also serves as
204  // the master list of produced products and must be kept alive
205  // until no more principals that might use it exist. Also note
206  // that we keep references to the internals of
207  // producedProductDescriptions_.
209 
211 
212  // The entity that manages all configuration data from the
213  // services.scheduler block and (eventually) sets up the TBB task
214  // scheduler.
216 
217  std::unique_ptr<GlobalTaskGroup> taskGroup_{nullptr};
218 
220 
222 
223  // The service subsystem.
225 
226  // Despite the name, this is what parses the paths and modules in
227  // the FHiCL file and creates and owns them.
229 
230  // The source of input data.
232 
233  // The schedule runners.
235 
236  // The currently open primary input file.
238 
239  // The currently active RunPrincipal.
241 
242  // The currently active SubRunPrincipal.
244 
245  // The currently active EventPrincipals.
247 
248  // Are we configured to process empty runs?
249  bool const handleEmptyRuns_;
250 
251  // Are we configured to process empty subruns?
253 
254  // Used to communicate exceptions from worker threads to the main
255  // thread.
257 
258  // Set to true for the first event in a subRun to signal that we
259  // should not advance to the next entry. Note that this is shared
260  // in common between all the schedules. This is only needed
261  // because we cannot peek ahead to see that the next entry is an
262  // event, we actually must advance to it before we can know.
263  std::atomic<bool> firstEvent_{true};
264 
265  // Are we current switching output files?
266  std::atomic<bool> fileSwitchInProgress_{false};
267  };
268 
269 } // namespace art
270 
271 #endif /* art_Framework_EventProcessor_EventProcessor_h */
272 
273 // Local Variables:
274 // mode: c++
275 // End:
void readAndProcessAsync(ScheduleID sid)
tsan_unique_ptr< InputSource > input_
tsan< ProductDescriptions > producedProductDescriptions_
tsan< UpdateOutputCallbacks > outputCallbacks_
tsan< Scheduler > scheduler_
tsan_unique_ptr< FileBlock > fb_
bool const handleEmptyRuns_
std::atomic< Level > nextLevel_
Schedule & main_schedule()
Level
Definition: Level.h:13
static constexpr ScheduleID first()
Definition: ScheduleID.h:50
std::atomic< bool > firstEvent_
bool const handleEmptySubRuns_
tsan< PathManager > pathManager_
ScheduleIteration scheduleIteration_
std::unique_ptr< GlobalTaskGroup > taskGroup_
tsan< detail::ExceptionCollector > ec_
tsan_unique_ptr< RunPrincipal > runPrincipal_
tsan< ProducingServiceSignals > psSignals_
hep::concurrency::thread_sanitize_unique_ptr< T > tsan_unique_ptr
EventProcessor & operator=(EventProcessor const &)=delete
void setOutputFileStatus(OutputFileStatus)
void processAllEventsAsync(ScheduleID sid)
std::enable_if_t< is_above_most_deeply_nested_level(L)> begin()
void recordOutputModuleClosureRequests()
tsan< cet::cpu_timer > timer_
OutputFileStatus
void finalizeContainingLevels()
tsan< std::map< ScheduleID, Schedule > > schedules_
detail::SharedResources sharedResources_
tsan< ProductTables > producedProductLookupTables_
EventProcessor(fhicl::ParameterSet pset, detail::EnabledModules enabled_modules)
std::atomic< bool > finalizeSubRunEnabled_
PerScheduleContainer< std::unique_ptr< EventPrincipal > > eventPrincipals_
actions::ActionCodes error_action(cet::exception &e) const
std::atomic< bool > beginRunCalled_
std::atomic< bool > finalizeRunEnabled_
std::atomic< bool > fileSwitchInProgress_
hep::concurrency::thread_sanitize< T > tsan
tsan_unique_ptr< ServicesManager > servicesManager_
tsan_unique_ptr< SubRunPrincipal > subRunPrincipal_
std::atomic< bool > beginSubRunCalled_
void finishEventAsync(ScheduleID sid)
static ProductTables invalid()
StatusCode runToCompletion()
SharedException sharedException_
void processEventAsync(ScheduleID sid)
Definition: MVAAlg.h:12
Schedule & schedule(ScheduleID const id)
ActivityRegistry actReg_
void beginSubRunIfNotDoneAlready()
Float_t e
Definition: plot.C:35
tsan< MFStatusUpdater > mfStatusUpdater_
void setSubRunAuxiliaryRangeSetID()
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33