LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
EmptyEvent_source.cc
Go to the documentation of this file.
1 // vim: set sw=2 expandtab :
2 
23 #include "cetlib/BasicPluginFactory.h"
24 #include "fhiclcpp/ParameterSet.h"
25 #include "fhiclcpp/types/Atom.h"
30 
31 #include <cassert>
32 #include <chrono>
33 #include <cstdint>
34 #include <memory>
35 #include <thread>
36 
37 using namespace fhicl;
38 using namespace std;
39 using namespace std::chrono_literals;
40 using std::chrono::steady_clock;
41 
42 namespace art {
43 
44  class EmptyEvent final : public InputSource {
45  public:
46  struct Config {
49  Atom<int> numberEventsInRun{Name("numberEventsInRun"),
50  limitsConfig().maxEvents()};
51  Atom<int> numberEventsInSubRun{Name("numberEventsInSubRun"),
52  limitsConfig().maxSubRuns()};
53  Atom<uint32_t> maxTime{
54  Name("maxTime"),
55  Comment(
56  "If specified, the 'maxTime' parameter indicates the maximum "
57  "allowed\n"
58  "wall-clock time (in seconds) for which new events may be created.\n"
59  "This option is mutually exclusive with the 'maxEvents' and "
60  "'maxSubRuns'\n"
61  "configuration parameters."),
62  std::numeric_limits<uint32_t>::max()};
63  Atom<uint32_t> eventCreationDelay{
64  Name("eventCreationDelay"),
65  Comment("The 'eventCreationDelay' parameter is an integral value\n"
66  "in the range [0, 1000000), corresponding to microseconds.\n"
67  "If specified, the input source will sleep for the specified "
68  "duration\n"
69  "of time before each new event, subrun, or run is created.\n"),
70  0u};
71  Atom<bool> resetEventOnSubRun{Name("resetEventOnSubRun"), true};
72  OptionalAtom<RunNumber_t> firstRun{Name("firstRun")};
73  OptionalAtom<SubRunNumber_t> firstSubRun{Name("firstSubRun")};
74  OptionalAtom<EventNumber_t> firstEvent{Name("firstEvent")};
75  OptionalDelegatedParameter timestampPlugin{
76  Name("timestampPlugin"),
77  Comment(
78  "The 'timestampPlugin' parameter must be a FHiCL table\n"
79  "of the form:\n\n"
80  " timestampPlugin: {\n"
81  " plugin_type: <plugin specification>\n"
82  " ...\n"
83  " }\n\n"
84  "See the notes in art/Framework/Core/EmptyEventTimestampPlugin.h\n"
85  "for more details.")};
86 
87  struct KeysToIgnore {
88  std::set<std::string>
89  operator()() const
90  {
91  return {"module_label"};
92  }
93  };
94  };
95 
97 
98  EmptyEvent(Parameters const& config, InputSourceDescription& desc);
99 
100  EmptyEvent(EmptyEvent const&) = delete;
101  EmptyEvent(EmptyEvent&&) = delete;
102  EmptyEvent& operator=(EmptyEvent const&) = delete;
103  EmptyEvent& operator=(EmptyEvent&&) = delete;
104 
105  private:
106  unique_ptr<RangeSetHandler> runRangeSetHandler() override;
107  unique_ptr<RangeSetHandler> subRunRangeSetHandler() override;
108  void doBeginJob() override;
109  void doEndJob() override;
110  void skipEvents(int offset) override;
111  unique_ptr<FileBlock> readFile() override;
112  void closeFile() override;
113  unique_ptr<RunPrincipal> readRun() override;
114  input::ItemType nextItemType() override;
115  unique_ptr<SubRunPrincipal> readSubRun(
116  cet::exempt_ptr<RunPrincipal const>) override;
117  unique_ptr<EventPrincipal> readEvent(
118  cet::exempt_ptr<SubRunPrincipal const>) override;
119 
120  input::ItemType nextItemType_();
121 
122  unique_ptr<EmptyEventTimestampPlugin> makePlugin_(
123  OptionalDelegatedParameter const& maybeConfig);
124 
126  unsigned const numberEventsInRun_;
127  unsigned const numberEventsInSubRun_;
128  steady_clock::time_point const beginTime_{steady_clock::now()};
131  unsigned numberEventsInThisRun_{};
132  unsigned numberEventsInThisSubRun_{};
133  EventID origEventID_{};
134  EventID eventID_{};
135  bool firstTime_{true};
136  bool newFile_{true};
137  bool newRun_{true};
138  bool newSubRun_{true};
140  cet::BasicPluginFactory pluginFactory_{};
141  unique_ptr<EmptyEventTimestampPlugin> plugin_;
142  };
143 
144 } // namespace art
145 
149  , limits_{config().limitsConfig(), [this] { return nextItemType_(); }}
150  , numberEventsInRun_{static_cast<uint32_t>(config().numberEventsInRun())}
151  , numberEventsInSubRun_{static_cast<uint32_t>(
152  config().numberEventsInSubRun())}
153  , maxTime_{config().maxTime()}
154  , eventCreationDelay_{config().eventCreationDelay()}
155  , resetEventOnSubRun_{config().resetEventOnSubRun()}
156  , plugin_{makePlugin_(config().timestampPlugin)}
157 {
158  // Additional configuration checking which is cumbersome to do with
159  // the FHiCL validation system.
160  auto const& pset = config.get_PSet();
161  if (pset.has_key("maxTime") &&
162  (pset.has_key("maxEvents") || pset.has_key("maxSubRuns"))) {
163  throw Exception{
165  "An error occurred while configuring the EmptyEvent source.\n"}
166  << "The 'maxTime' parameter cannot be used with the 'maxEvents' or "
167  "'maxSubRuns' parameters.\n"
168  "Type 'art --print-description EmptyEvent' for the allowed "
169  "configuration.\n";
170  }
171 
172  RunNumber_t firstRun{};
173  bool haveFirstRun = config().firstRun(firstRun);
174  SubRunNumber_t firstSubRun{};
175  bool haveFirstSubRun = config().firstSubRun(firstSubRun);
176  EventNumber_t firstEvent{};
177  bool haveFirstEvent = config().firstEvent(firstEvent);
178  RunID firstRunID = haveFirstRun ? RunID(firstRun) : RunID::firstRun();
179  SubRunID firstSubRunID = haveFirstSubRun ?
180  SubRunID(firstRunID.run(), firstSubRun) :
181  SubRunID::firstSubRun(firstRunID);
182  origEventID_ =
183  haveFirstEvent ?
184  EventID(firstSubRunID.run(), firstSubRunID.subRun(), firstEvent) :
185  EventID::firstEvent(firstSubRunID);
187 }
188 
191 {
192  return limits_.nextItemType();
193 }
194 
197 {
198  // Called by ProcessingLimits::nextItemType.
199 
200  // Trigger framework stop if max allowed time is exceeded.
201  // N.B. Since the begin time corresponds to source construction and
202  // not the actual event loop, there will be minor differences wrt
203  // the time reported for executing a given job.
204  if (steady_clock::now() - beginTime_ > maxTime_) {
205  return input::IsStop;
206  }
207  // First check for sanity because skip(offset) can be abused and so can the
208  // ctor.
209  if (!eventID_.runID().isValid()) {
210  return input::IsStop;
211  }
212  if (!eventID_.subRunID().isValid()) {
213  return input::IsStop;
214  }
215  if (!eventID_.isValid()) {
216  return input::IsStop;
217  }
218  if (newFile_) {
219  newFile_ = false;
220  return input::IsFile;
221  }
222  if (newRun_) {
223  newRun_ = false;
224  if (eventCreationDelay_ > 0ms) {
225  std::this_thread::sleep_for(eventCreationDelay_);
226  }
227  return input::IsRun;
228  }
229  if (newSubRun_) {
230  newSubRun_ = false;
231  if (eventCreationDelay_ > 0ms) {
232  std::this_thread::sleep_for(eventCreationDelay_);
233  }
234  return input::IsSubRun;
235  }
236  if ((numberEventsInRun_ > 0) &&
238  // Time to switch runs.
239  newRun_ = false;
240  newSubRun_ = true;
243  eventID_ = EventID(
245  firstTime_ = true;
246  if (eventCreationDelay_ > 0ms) {
247  std::this_thread::sleep_for(eventCreationDelay_);
248  }
249  return input::IsRun;
250  }
251  if ((numberEventsInSubRun_ > 0) &&
253  // Time to switch subruns.
254  newRun_ = false;
255  newSubRun_ = false;
257  if (resetEventOnSubRun_) {
259  } else {
261  }
262  firstTime_ = true;
263  if (eventCreationDelay_ > 0ms) {
264  std::this_thread::sleep_for(eventCreationDelay_);
265  }
266  return input::IsSubRun;
267  }
268  // same run and subrun
269  if (!firstTime_) {
270  eventID_ = eventID_.next();
271  if (!eventID_.runID().isValid()) {
272  return input::IsStop;
273  }
274  }
275  firstTime_ = false;
278  if (eventCreationDelay_ > 0ms) {
279  std::this_thread::sleep_for(eventCreationDelay_);
280  }
281  return input::IsEvent;
282 }
283 
284 unique_ptr<art::FileBlock>
286 {
287  return make_unique<FileBlock>();
288 }
289 
290 void
292 {}
293 
294 unique_ptr<art::RunPrincipal>
296 {
297  auto ts = plugin_ ? plugin_->doBeginRunTimestamp(eventID_.runID()) :
299  RunAuxiliary const runAux{
301  auto result =
302  make_unique<RunPrincipal>(runAux, processConfiguration(), nullptr);
303  if (plugin_) {
304  ModuleContext const mc{moduleDescription()};
305  plugin_->doBeginRun(std::as_const(*result).makeRun(mc));
306  }
307  return result;
308 }
309 
310 unique_ptr<art::SubRunPrincipal>
311 art::EmptyEvent::readSubRun(cet::exempt_ptr<RunPrincipal const> rp)
312 {
313  auto ts = plugin_ ? plugin_->doBeginSubRunTimestamp(eventID_.subRunID()) :
315  SubRunAuxiliary const subRunAux{
317  auto result =
318  make_unique<SubRunPrincipal>(subRunAux, processConfiguration(), nullptr);
319  result->setRunPrincipal(rp);
320  if (plugin_) {
321  ModuleContext const mc{moduleDescription()};
322  plugin_->doBeginSubRun(std::as_const(*result).makeSubRun(mc));
323  }
324  limits_.update(result->subRunID());
325  return result;
326 }
327 
328 unique_ptr<art::EventPrincipal>
329 art::EmptyEvent::readEvent(cet::exempt_ptr<SubRunPrincipal const> srp)
330 {
331  auto timestamp = plugin_ ? plugin_->doEventTimestamp(eventID_) :
333  EventAuxiliary const eventAux{eventID_, timestamp, false};
334  auto result = make_unique<EventPrincipal>(eventAux,
336  nullptr,
337  make_unique<NoDelayedReader>(),
340  result->setSubRunPrincipal(srp);
341  limits_.update(result->eventID());
342  return result;
343 }
344 
345 unique_ptr<art::RangeSetHandler>
347 {
348  return make_unique<OpenRangeSetHandler>(eventID_.run());
349 }
350 
351 unique_ptr<art::RangeSetHandler>
353 {
354  return make_unique<OpenRangeSetHandler>(eventID_.run());
355 }
356 
357 void
359 {
360  if (plugin_) {
361  plugin_->doBeginJob();
362  }
363 }
364 
365 void
367 {
368  if (plugin_) {
369  plugin_->doEndJob();
370  }
371 }
372 
373 std::unique_ptr<art::EmptyEventTimestampPlugin>
375 {
376  auto pset = maybeConfig.get_if_present<ParameterSet>();
377  if (!pset) {
378  return nullptr;
379  }
380 
381  try {
382  auto const libspec = pset->get<std::string>("plugin_type");
383  auto const pluginType = pluginFactory_.pluginType(libspec);
384  if (pluginType ==
386  return pluginFactory_
387  .makePlugin<std::unique_ptr<EmptyEventTimestampPlugin>>(libspec, *pset);
388  }
389  throw Exception(errors::Configuration, "EmptyEvent: ")
390  << "unrecognized plugin type " << pluginType << "for plugin " << libspec
391  << ".\n";
392  }
393  catch (cet::exception& e) {
394  throw Exception(errors::Configuration, "EmptyEvent: ", e)
395  << "Exception caught while processing plugin spec.\n";
396  }
397 }
398 
399 void
401 {
402  for (; offset < 0; ++offset) {
404  }
405  for (; offset > 0; --offset) {
406  eventID_ = eventID_.next();
407  }
408 }
409 
EventID previous() const
Definition: EventID.h:159
bool isValid() const
Definition: EventID.h:122
RunID const & runID() const
Definition: EventID.h:92
SubRunID const & subRunID() const
Definition: EventID.h:104
void doEndJob() override
std::chrono::microseconds const eventCreationDelay_
std::optional< T > get_if_present() const
EventID next() const
Definition: EventID.h:134
EmptyEvent(Parameters const &config, InputSourceDescription &desc)
unsigned numberEventsInThisRun_
ModuleDescription const & moduleDescription() const
Definition: InputSource.cc:31
unique_ptr< EmptyEventTimestampPlugin > makePlugin_(OptionalDelegatedParameter const &maybeConfig)
unsigned const numberEventsInRun_
void closeFile() override
STL namespace.
input::ItemType nextItemType()
ModuleType module_type(std::string const &full_key)
microsecond microseconds
Alias for common language habits.
Definition: spacetime.h:119
unique_ptr< RangeSetHandler > runRangeSetHandler() override
RunNumber_t run() const
Definition: RunID.h:64
RunNumber_t run() const
Definition: EventID.h:98
bool isValid() const
Definition: SubRunID.h:97
steady_clock::time_point const beginTime_
unique_ptr< EventPrincipal > readEvent(cet::exempt_ptr< SubRunPrincipal const >) override
unique_ptr< FileBlock > readFile() override
second seconds
Alias for common language habits.
Definition: spacetime.h:85
unsigned const numberEventsInSubRun_
std::chrono::seconds const maxTime_
parameter set interface
input::ItemType nextItemType() override
RunNumber_t run() const
Definition: SubRunID.h:85
T get(std::string const &key) const
Definition: ParameterSet.h:314
cet::BasicPluginFactory pluginFactory_
IDNumber_t< Level::SubRun > SubRunNumber_t
Definition: IDNumber.h:119
EventID nextSubRun(EventNumber_t first=IDNumber< Level::Event >::first()) const
Definition: EventID.h:147
unique_ptr< EmptyEventTimestampPlugin > plugin_
static SubRunID firstSubRun()
Definition: SubRunID.h:153
double value
Definition: spectrum.C:18
TableFragment< ProcessingLimits::Config > limitsConfig
static constexpr Timestamp invalidTimestamp()
Definition: Timestamp.h:82
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
std::set< std::string > operator()() const
IDNumber_t< Level::Event > EventNumber_t
Definition: IDNumber.h:118
ProcessConfiguration const & processConfiguration() const
Definition: InputSource.cc:37
bool isValid() const
Definition: RunID.h:70
ModuleDescription const & moduleDescription
Definition: MVAAlg.h:12
EventNumber_t event() const
Definition: EventID.h:116
SubRunNumber_t subRun() const
Definition: SubRunID.h:91
static RunID firstRun()
Definition: RunID.h:116
bool const resetEventOnSubRun_
void doBeginJob() override
EventID nextRun() const
Definition: EventID.h:153
Float_t e
Definition: plot.C:35
static EventID firstEvent()
Definition: EventID.h:190
#define DEFINE_ART_INPUT_SOURCE(klass)
ProcessingLimits limits_
unique_ptr< SubRunPrincipal > readSubRun(cet::exempt_ptr< RunPrincipal const >) override
SubRunNumber_t subRun() const
Definition: EventID.h:110
void update(EventID const &id)
cet::coded_exception< error, detail::translate > exception
Definition: exception.h:33
input::ItemType nextItemType_()
unique_ptr< RunPrincipal > readRun() override
unsigned numberEventsInThisSubRun_
unique_ptr< RangeSetHandler > subRunRangeSetHandler() override
void skipEvents(int offset) override
IDNumber_t< Level::Run > RunNumber_t
Definition: IDNumber.h:120