LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
Worker.h
Go to the documentation of this file.
1 #ifndef art_Framework_Principal_Worker_h
2 #define art_Framework_Principal_Worker_h
3 // vim: set sw=2 expandtab :
4 
5 // ======================================================================
6 // Worker: this is a basic scheduling unit - an abstract base class to
7 // something that is really a producer or filter.
8 //
9 // A worker will not actually call through to the module unless it is
10 // in a Ready state. After a module is actually run, the state will
11 // not be Ready. The Ready state can only be reestablished by doing a
12 // reset().
13 //
14 // Pre/post module signals are posted only in the Ready state.
15 //
16 // Execution statistics are kept here.
17 //
18 // If a module has thrown an exception during execution, that
19 // exception will be rethrown if the worker is entered again and the
20 // state is not Ready. In other words, execution results (status) are
21 // cached and reused until the worker is reset().
22 // ======================================================================
23 
28 #include "hep_concurrency/WaitingTaskList.h"
29 
30 #include <atomic>
31 #include <exception>
32 #include <string>
33 #include <vector>
34 
35 namespace hep::concurrency {
36  class SerialTaskQueueChain;
37 }
38 
39 namespace art {
40  class ActivityRegistry;
41  class ModuleContext;
42  class FileBlock;
43  namespace detail {
44  class SharedResources;
45  }
46 
47  class Worker {
48  public:
49  enum State { Ready, Pass, Fail, Working, ExceptionThrown };
50 
51  virtual ~Worker();
52  Worker(ModuleDescription const&, WorkerParams const&);
53 
54  void beginJob(detail::SharedResources const&);
55  void endJob();
56  void respondToOpenInputFile(FileBlock const& fb);
57  void respondToCloseInputFile(FileBlock const& fb);
58  void respondToOpenOutputFiles(FileBlock const& fb);
59  void respondToCloseOutputFiles(FileBlock const& fb);
60  void doWork(Transition, Principal&, ModuleContext const&);
61 
62  void doWork_event(hep::concurrency::WaitingTaskPtr workerInPathDoneTask,
64  ModuleContext const&);
65 
66  // This is used only to do trigger results insertion.
67  void doWork_event(EventPrincipal&, ModuleContext const&);
68 
70  scheduleID() const
71  {
72  return scheduleID_;
73  }
74  // Used only by WorkerInPath.
75  bool returnCode() const;
76 
77  ModuleDescription const& description() const;
78  hep::concurrency::SerialTaskQueueChain* serialTaskQueueChain() const;
79 
80  // Used by EventProcessor
81  // Used by Schedule
82  // Used by EndPathExecutor
83  void reset();
84 
85  // Used only by writeSummary
86  std::size_t timesVisited() const;
87  std::size_t timesRun() const;
88  std::size_t timesPassed() const;
89  std::size_t timesFailed() const;
90  std::size_t timesExcept() const;
91 
92  void runWorker(EventPrincipal&, ModuleContext const&);
93  bool isUnique() const;
94 
95  protected:
96  std::string const& label() const;
97 
98  std::atomic<std::size_t> counts_visited_{};
99  std::atomic<std::size_t> counts_run_{};
100  std::atomic<std::size_t> counts_passed_{};
101  std::atomic<std::size_t> counts_failed_{};
102  std::atomic<std::size_t> counts_thrown_{};
103 
104  private:
105  virtual hep::concurrency::SerialTaskQueueChain* doSerialTaskQueueChain()
106  const = 0;
107  virtual void doBeginJob(detail::SharedResources const& resources) = 0;
108  virtual void doEndJob() = 0;
109  virtual void doBegin(RunPrincipal& rp, ModuleContext const& mc) = 0;
110  virtual void doEnd(RunPrincipal& rp, ModuleContext const& mc) = 0;
111  virtual void doBegin(SubRunPrincipal& srp, ModuleContext const& mc) = 0;
112  virtual void doEnd(SubRunPrincipal& srp, ModuleContext const& mc) = 0;
113  virtual bool doProcess(EventPrincipal&, ModuleContext const&) = 0;
114 
115  virtual void doRespondToOpenInputFile(FileBlock const& fb) = 0;
116  virtual void doRespondToCloseInputFile(FileBlock const& fb) = 0;
117  virtual void doRespondToOpenOutputFiles(FileBlock const& fb) = 0;
118  virtual void doRespondToCloseOutputFiles(FileBlock const& fb) = 0;
119 
124  std::atomic<int> state_{Ready};
125 
126  // if state is 'exception'
127  // Note: threading: There is no accessor for this data, the only
128  // way it is ever used is from the doWork* functions. Right now
129  // event processing only sets it, but run and subrun processing
130  // reads it. It is not clear that event processing needs this
131  // anymore, and if we go to multiple runs and subruns in flight,
132  // they may not need it anymore as well. For now, leave this, is
133  // not thread safe.
134  std::exception_ptr cached_exception_{};
135 
136  std::atomic<bool> workStarted_{false};
137  std::atomic<bool> returnCode_{false};
138 
139  // Holds the waiting workerInPathDone tasks. Note: For shared
140  // modules the workers are shared. For replicated modules each
141  // schedule has its own private worker copies (the whole reason
142  // schedules exist!).
143  hep::concurrency::WaitingTaskList waitingTasks_;
144  };
145 
146 } // namespace art
147 
148 #endif /* art_Framework_Principal_Worker_h */
149 
150 // Local Variables:
151 // mode: c++
152 // End:
ActionTable const & actions_
Definition: Worker.h:122
Transition
Definition: Transition.h:7
void beginJob()
Definition: Breakpoints.cc:14
ScheduleID scheduleID() const
Definition: Worker.h:70
ActivityRegistry const & actReg_
Definition: Worker.h:123
ModuleDescription const md_
Definition: Worker.h:121
ScheduleID const scheduleID_
Definition: Worker.h:120
TFile fb("Li6.root")
Definition: MVAAlg.h:12
hep::concurrency::WaitingTaskList waitingTasks_
Definition: Worker.h:143