LArSoft  v06_85_00
Liquid Argon Software toolkit - http://larsoft.org/
DecrepitRelicInputSourceImplementation.cc
Go to the documentation of this file.
1 // ======================================================================
2 //
3 // DecrepitRelicInputSourceImplementation
4 //
5 // ======================================================================
6 
8 
18 #include "fhiclcpp/ParameterSet.h"
20 
21 #include <cassert>
22 #include <ctime>
23 
25 
26 // ----------------------------------------------------------------------
27 
28 namespace art {
29 
30  namespace {
31  std::string const&
32  suffix(int count)
33  {
34  static std::string const st("st");
35  static std::string const nd("nd");
36  static std::string const rd("rd");
37  static std::string const th("th");
38  // *0, *4 - *9 use "th".
39  int lastDigit = count % 10;
40  if (lastDigit >= 4 || lastDigit == 0)
41  return th;
42  // *11, *12, or *13 use "th".
43  if (count % 100 - lastDigit == 10)
44  return th;
45  return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
46  }
47  } // namespace
48 
49  // ----------------------------------------------------------------------
50 
53  {}
54 
56 
60  ModuleDescription const& desc)
61  : InputSource{desc}
62  , maxEvents_{config().maxEvents()}
63  , maxSubRuns_{config().maxSubRuns()}
64  , reportFrequency_{config().reportFrequency()}
65  {
66  if (reportFrequency_ < 0) {
68  << "reportFrequency has a negative value, which is not meaningful.";
69  }
70  std::string const runMode{"Runs"};
71  std::string const runSubRunMode{"RunsAndSubRuns"};
72  std::string const processingMode = config().processingMode();
73  if (processingMode == runMode) {
75  } else if (processingMode == runSubRunMode) {
77  } else if (processingMode != Config::defaultMode()) {
79  << "DecrepitRelicInputSourceImplementation::"
80  "DecrepitRelicInputSourceImplementation()\n"
81  << "The 'processingMode' parameter for sources has an illegal value '"
82  << processingMode << "'\n"
83  << "Legal values are '" << Config::defaultMode() << "', '"
84  << runSubRunMode << "', or '" << runMode << "'.\n";
85  }
86  }
87 
88  void
90  std::unique_ptr<RunPrincipal>&& rp)
91  {
92  runPrincipal_ = std::move(rp);
93  }
94 
95  void
97  std::unique_ptr<SubRunPrincipal>&& srp)
98  {
99  assert(cachedRunPrincipal_);
100  subRunPrincipal_ = std::move(srp);
101  subRunPrincipal_->setRunPrincipal(cachedRunPrincipal_);
102  }
103 
104  void
106  std::unique_ptr<EventPrincipal>&& ep)
107  {
108  assert(cachedSubRunPrincipal_);
109  eventPrincipal_ = std::move(ep);
110  eventPrincipal_->setSubRunPrincipal(cachedSubRunPrincipal_);
111  }
112 
113  // This next function is to guarantee that "runs only" mode does not
114  // return events or subRuns, and that "runs and subRuns only" mode
115  // does not return events. For input sources that are not random
116  // access (e.g. you need to read through the events to get to the
117  // subRuns and runs), this is all that is involved to implement
118  // these modes. For input sources where events or subRuns can be
119  // skipped, getNextItemType() should implement the skipping
120  // internally, so that the performance gain is realized. If this is
121  // done for a source, the 'if' blocks in this function will never be
122  // entered for that source.
125  {
126  input::ItemType itemType = getNextItemType();
127  if (itemType == input::IsEvent &&
129  readEvent_();
130  return nextItemType_();
131  }
132  if (itemType == input::IsSubRun && processingMode() == Runs) {
133  readSubRun_();
134  return nextItemType_();
135  }
136  return itemType;
137  }
138 
141  {
142  if (doneReadAhead_) {
143  return state_;
144  }
145  doneReadAhead_ = true;
146  input::ItemType oldState = state_;
147  if (eventLimitReached()) {
148  // If the maximum event limit has been reached, stop.
150  } else if (subRunLimitReached()) {
151  // If the maximum subRun limit has been reached, stop
152  // when reaching a new file, run, or subRun.
153  if (oldState == input::IsInvalid || oldState == input::IsFile ||
154  oldState == input::IsRun ||
157  } else {
158  input::ItemType newState = nextItemType_();
159  if (newState == input::IsEvent) {
162  } else {
164  }
165  }
166  } else {
167  input::ItemType newState = nextItemType_();
168  if (newState == input::IsStop) {
170  } else if (newState == input::IsFile || oldState == input::IsInvalid) {
172  } else if (newState == input::IsRun || oldState == input::IsFile) {
175  } else if (newState == input::IsSubRun || oldState == input::IsRun) {
176  assert(processingMode() != Runs);
179  } else {
182  }
183  }
184  if (state_ == input::IsStop) {
185  subRunPrincipal_.reset();
186  runPrincipal_.reset();
187  // FIXME: upon the advent of a catalog system which can do
188  // something intelligent with the difference between whole-file
189  // success, partial-file success, partial-file failure and
190  // whole-file failure (such as file-open failure), we will need to
191  // communicate that difference here. The file disposition options
192  // as they are now (and the mapping to any concrete implementation
193  // we are are aware of currently) are not sufficient to the task,
194  // so we deliberately do not distinguish here between partial-file
195  // and whole-file success in particular.
196  finish();
197  }
198  return state_;
199  }
200 
201  void
203  {
204  beginJob();
205  }
206 
207  void
209  {
210  endJob();
211  }
212 
213  // Return a dummy file block.
214  std::unique_ptr<FileBlock>
216  {
217  assert(doneReadAhead_);
218  assert(state_ == input::IsFile);
219  assert(!limitReached());
220  doneReadAhead_ = false;
221  return readFile_();
222  }
223 
224  void
226  {
227  return closeFile_();
228  }
229 
230  // Return a dummy file block.
231  // This function must be overridden for any input source that reads a file
232  // containing Products. Such a function should update the
233  // MasterProductRegistry to reflect the products found in this new file.
234  std::unique_ptr<FileBlock>
236  {
237  return std::make_unique<FileBlock>();
238  }
239 
240  std::unique_ptr<RunPrincipal>
242  {
243  // Note: For the moment, we do not support saving and restoring the state of
244  // the random number generator if random numbers are generated during
245  // processing of runs (e.g. beginRun(), endRun())
246  assert(doneReadAhead_);
247  assert(state_ == input::IsRun);
248  assert(!limitReached());
249  doneReadAhead_ = false;
251  return std::move(runPrincipal_);
252  }
253 
254  std::unique_ptr<SubRunPrincipal>
256  cet::exempt_ptr<RunPrincipal const> rp)
257  {
258  // Note: For the moment, we do not support saving and restoring the state of
259  // the random number generator if random numbers are generated during
260  // processing of subRuns (e.g. beginSubRun(), endSubRun())
261  assert(doneReadAhead_);
262  assert(state_ == input::IsSubRun);
263  assert(!limitReached());
264  doneReadAhead_ = false;
266  assert(subRunPrincipal_->run() == rp->run());
267  subRunPrincipal_->setRunPrincipal(rp);
269  return std::move(subRunPrincipal_);
270  }
271 
272  std::unique_ptr<EventPrincipal>
274  cet::exempt_ptr<SubRunPrincipal const> srp)
275  {
276  assert(doneReadAhead_);
277  assert(state_ == input::IsEvent);
278  assert(!eventLimitReached());
279  doneReadAhead_ = false;
280 
282  assert(srp->run() == eventPrincipal_->run());
283  assert(srp->subRun() == eventPrincipal_->subRun());
284  eventPrincipal_->setSubRunPrincipal(srp);
285  if (eventPrincipal_.get() != nullptr) {
286  if (remainingEvents_ > 0)
288  ++readCount_;
289  setTimestamp(eventPrincipal_->time());
290  if ((reportFrequency_ > 0) && !(readCount_ % reportFrequency_)) {
292  }
293  }
294  return std::move(eventPrincipal_);
295  }
296 
297  std::unique_ptr<EventPrincipal>
299  {
301  << "DecrepitRelicInputSourceImplementation::readEvent()\n"
302  << "Random access is not implemented for this type of Input Source\n"
303  << "Contact a Framework Developer\n";
304  }
305 
306  void
308  {
309  this->skip(offset);
310  }
311 
312  void
314  {
315  time_t t = time(0);
316  char ts[] = "dd-Mon-yyyy hh:mm:ss TZN ";
317  strftime(ts, strlen(ts) + 1, "%d-%b-%Y %H:%M:%S %Z", localtime(&t));
318  mf::LogVerbatim("ArtReport")
319  << "Begin processing the " << readCount_ << suffix(readCount_)
320  << " record. " << eventID << " at " << ts;
321  // At some point we may want to initiate checkpointing here
322  }
323 
324  void
326  {
328  << "DecrepitRelicInputSourceImplementation::skip()\n"
329  << "Random access is not implemented for this type of Input Source\n"
330  << "Contact a Framework Developer\n";
331  }
332 
333  void
335  {
337  << "DecrepitRelicInputSourceImplementation::rewind()\n"
338  << "Rewind is not implemented for this type of Input Source\n"
339  << "Contact a Framework Developer\n";
340  }
341 
342  void
344  {}
345 
346  void
348  {}
349 
350 } // art
351 
352 // ======================================================================
MaybeLogger_< ELseverityLevel::ELsev_info, true > LogVerbatim
void setEventPrincipal(std::unique_ptr< EventPrincipal > &&ep)
std::unique_ptr< EventPrincipal > readEvent(cet::exempt_ptr< SubRunPrincipal const > srp) override
void issueReports(EventID const &eventID)
issue an event report
virtual std::unique_ptr< SubRunPrincipal > readSubRun_()=0
virtual std::unique_ptr< EventPrincipal > readEvent_()=0
std::unique_ptr< SubRunPrincipal > readSubRun(cet::exempt_ptr< RunPrincipal const > rp) override
Read next subRun.
virtual input::ItemType getNextItemType()=0
virtual std::unique_ptr< RunPrincipal > readRun_()=0
void setSubRunPrincipal(std::unique_ptr< SubRunPrincipal > &&srp)
void setRunPrincipal(std::unique_ptr< RunPrincipal > &&rp)
void setTimestamp(Timestamp const &theTime)
To set the current time, as seen by the input source.
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
void doBeginJob() override
Called by framework at beginning of job.
HLT enums.
ProcessingMode processingMode() const
RunsSubRunsAndEvents (default), RunsAndSubRuns, or Runs.
void doEndJob() override
Called by framework at end of job.
std::unique_ptr< FileBlock > readFile() override
Read next file.
std::unique_ptr< RunPrincipal > readRun() override
Read next run.
DecrepitRelicInputSourceImplementation(DecrepitRelicInputSourceImplementation const &)=delete