LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
EndPathExecutor.h
Go to the documentation of this file.
1 #ifndef art_Framework_Core_EndPathExecutor_h
2 #define art_Framework_Core_EndPathExecutor_h
3 // vim: set sw=2 expandtab :
4 
5 // =========================================================================
6 // Class to handle the execution of the end path. Invoked in all the
7 // right places by the event processor.
8 //
9 // The RangeSetHandlers manage the RangeSets that are to be assigned
10 // to (a) the (Sub)RunAuxiliaries and (b) the (Sub)Run products
11 // produced in the current process. Since all (Sub)Run
12 // products/auxiliaries produced in the current process are written to
13 // all output modules during write(Sub)Run, there is only one relevant
14 // RangeSet for the (Sub)Run at any given time. RangeSets
15 // corresponding to multiple (Sub)Run fragments are aggregated on
16 // input.
17 // =========================================================================
18 
21 #include "art/Framework/Core/fwd.h"
28 #include "hep_concurrency/WaitingTask.h"
29 
30 #include <atomic>
31 #include <memory>
32 #include <set>
33 #include <vector>
34 
35 namespace art {
36  class GlobalTaskGroup;
37  namespace detail {
38  class SharedResources;
39  }
40 
42  friend class Schedule;
43 
44  public:
46  PathManager& pm,
47  ActionTable const& actions,
48  UpdateOutputCallbacks& callbacks,
49  GlobalTaskGroup& task_group);
50 
51  EndPathExecutor(EndPathExecutor&&) = delete;
52  EndPathExecutor& operator=(EndPathExecutor&&) = delete;
53  EndPathExecutor(EndPathExecutor const&) = delete;
54  EndPathExecutor& operator=(EndPathExecutor const&) = delete;
55 
56  void beginJob(detail::SharedResources const& resources);
57  void endJob();
58 
59  // Input File Open/Close.
60  void selectProducts(ProductTables const&);
61  void respondToOpenInputFile(FileBlock const& fb);
62  void respondToCloseInputFile(FileBlock const& fb);
63  void respondToOpenOutputFiles(FileBlock const& fb);
64  void respondToCloseOutputFiles(FileBlock const& fb);
65  bool someOutputsOpen() const;
66  void closeAllOutputFiles();
67 
68  void seedRunRangeSet(RangeSetHandler const&);
69  void setRunAuxiliaryRangeSetID(RangeSet const& rs);
70  void writeRun(RunPrincipal& rp);
71 
72  void seedSubRunRangeSet(RangeSetHandler const&);
73  void setSubRunAuxiliaryRangeSetID(RangeSet const& rs);
74  void writeSubRun(SubRunPrincipal& srp);
75 
76  // Process Run/SubRun
77  void process(Transition, Principal&);
78 
79  // Process Event
80  //
81  // Used to make sure only one event is being processed at a time.
82  // The schedules take turns having their events processed on a
83  // first-come first-served basis (FIFO).
84  void process_event(hep::concurrency::WaitingTaskPtr finalizeEventTask,
86  void writeEvent(EventPrincipal&);
87 
88  // Output File Switching API
89  //
90  // Called by EventProcessor::closeSomeOutputFiles(), which is called when
91  // output file switching is happening. Note: This is really returns
92  // !outputWorkersToClose_.empty()
93  bool outputsToClose() const;
94  // MT note: This is where we need to get all the schedules
95  // synchronized, and then have all schedules do the file
96  // close, and then the file open, then the schedules can
97  // proceed. A nasty complication is that a great deal of
98  // time can go by between the file close and the file
99  // open because artdaq may pause the run in between, and
100  // wants to have all output files closed while the run is
101  // paused. They probably want the input file closed too.
102  void closeSomeOutputFiles();
103  // Note: This really just returns !outputWorkersToOpen_.empty()
104  bool outputsToOpen() const;
105  void openSomeOutputFiles(FileBlock const& fb);
106  // Note: When we are passed OutputFileStatus::Switching, we must close
107  // the file and call openSomeOutputFiles which changes it back
108  // to OutputFileStatus::Open.
109  // A side effect of switching status is the run/subrun/event writes
110  // are not counted in the overall counting by
111  // RootOutputClosingCriteria. However, they are still counted by the
112  // individual counters.
113  void setOutputFileStatus(OutputFileStatus);
114  // Note: What this is really used for is to push workers into
115  // the outputWorkersToClose_ data member.
116  void recordOutputClosureRequests(Granularity);
117  void incrementInputFileNumber();
118 
119  private:
121 
122  // Filled by ctor, const after that.
123  ScheduleContext const sc_;
127  // Filled by ctor, const after that.
128  std::vector<OutputWorker*> outputWorkers_{};
129  // Dynamic, updated by run processing.
130  std::unique_ptr<RangeSetHandler> runRangeSetHandler_{nullptr};
131  // Dynamic, updated by subrun processing.
132  std::unique_ptr<RangeSetHandler> subRunRangeSetHandler_{nullptr};
133 
134  // Output File Switching
135  std::atomic<OutputFileStatus> fileStatus_{OutputFileStatus::Closed};
136  std::set<OutputWorker*> outputWorkersToOpen_{};
137  // Note: During an output file switch, after the closes happen, the entire
138  // contents of this is moved to outputWorkersToOpen_.
139  // FIXME: The move to outputWorkersToOpen_ is not really necessary, a flag
140  // is all we need, something that says whether we should close or open what
141  // is in the list. Basically EventProcessor uses recordOutputClosureRequests
142  // to populate the list, then uses the list to do closes, then uses the same
143  // list to do opens, then clears the list.
144  std::set<OutputWorker*> outputWorkersToClose_{};
145  };
146 } // namespace art
147 
148 // Local Variables:
149 // mode: c++
150 // End:
151 
152 #endif /* art_Framework_Core_EndPathExecutor_h */
OutputFileStatus
Transition
Definition: Transition.h:7
void beginJob()
Definition: Breakpoints.cc:14
ScheduleContext const sc_
TFile fb("Li6.root")
ActionTable const & actionTable_
Definition: MVAAlg.h:12
GlobalTaskGroup & taskGroup_