1 // ======================================================================
2 //
3 // DecrepitRelicInputSourceImplementation
4 //
5 // ======================================================================
18 #include "fhiclcpp/ParameterSet.h"
21 #include <cassert>
22 #include <ctime>
26 // ----------------------------------------------------------------------
28 namespace art {
30  namespace {
31  std::string const&
32  suffix(int count)
33  {
34  static std::string const st("st");
35  static std::string const nd("nd");
36  static std::string const rd("rd");
37  static std::string const th("th");
38  // *0, *4 - *9 use "th".
39  int lastDigit = count % 10;
40  if (lastDigit >= 4 || lastDigit == 0)
41  return th;
42  // *11, *12, or *13 use "th".
43  if (count % 100 - lastDigit == 10)
44  return th;
45  return (lastDigit == 1 ? st : (lastDigit == 2 ? nd : rd));
46  }
47  } // namespace
49  // ----------------------------------------------------------------------
53  {}
60  ModuleDescription const& desc)
61  : InputSource{desc}
62  , maxEvents_{config().maxEvents()}
63  , maxSubRuns_{config().maxSubRuns()}
64  , reportFrequency_{config().reportFrequency()}
65  {
66  if (reportFrequency_ < 0) {
68  << "reportFrequency has a negative value, which is not meaningful.";
69  }
70  std::string const runMode{"Runs"};
71  std::string const runSubRunMode{"RunsAndSubRuns"};
72  std::string const processingMode = config().processingMode();
73  if (processingMode == runMode) {
75  } else if (processingMode == runSubRunMode) {
77  } else if (processingMode != Config::defaultMode()) {
79  << "DecrepitRelicInputSourceImplementation::"
80  "DecrepitRelicInputSourceImplementation()\n"
81  << "The 'processingMode' parameter for sources has an illegal value '"
82  << processingMode << "'\n"
83  << "Legal values are '" << Config::defaultMode() << "', '"
84  << runSubRunMode << "', or '" << runMode << "'.\n";
85  }
86  }
88  void
90  std::unique_ptr<RunPrincipal>&& rp)
91  {
92  runPrincipal_ = std::move(rp);
93  }
95  void
97  std::unique_ptr<SubRunPrincipal>&& srp)
98  {
99  assert(cachedRunPrincipal_);
100  subRunPrincipal_ = std::move(srp);
101  subRunPrincipal_->setRunPrincipal(cachedRunPrincipal_);
102  }
104  void
106  std::unique_ptr<EventPrincipal>&& ep)
107  {
108  assert(cachedSubRunPrincipal_);
109  eventPrincipal_ = std::move(ep);
110  eventPrincipal_->setSubRunPrincipal(cachedSubRunPrincipal_);
111  }
113  // This next function is to guarantee that "runs only" mode does not
114  // return events or subRuns, and that "runs and subRuns only" mode
115  // does not return events. For input sources that are not random
116  // access (e.g. you need to read through the events to get to the
117  // subRuns and runs), this is all that is involved to implement
118  // these modes. For input sources where events or subRuns can be
119  // skipped, getNextItemType() should implement the skipping
120  // internally, so that the performance gain is realized. If this is
121  // done for a source, the 'if' blocks in this function will never be
122  // entered for that source.
125  {
126  input::ItemType itemType = getNextItemType();
127  if (itemType == input::IsEvent &&
129  readEvent_();
130  return nextItemType_();
131  }
132  if (itemType == input::IsSubRun && processingMode() == Runs) {
133  readSubRun_();
134  return nextItemType_();
135  }
136  return itemType;
137  }
141  {
142  if (doneReadAhead_) {
143  return state_;
144  }
145  doneReadAhead_ = true;
146  input::ItemType oldState = state_;
147  if (eventLimitReached()) {
148  // If the maximum event limit has been reached, stop.
150  } else if (subRunLimitReached()) {
151  // If the maximum subRun limit has been reached, stop
152  // when reaching a new file, run, or subRun.
153  if (oldState == input::IsInvalid || oldState == input::IsFile ||
154  oldState == input::IsRun ||
157  } else {
158  input::ItemType newState = nextItemType_();
159  if (newState == input::IsEvent) {
162  } else {
164  }
165  }
166  } else {
167  input::ItemType newState = nextItemType_();
168  if (newState == input::IsStop) {
170  } else if (newState == input::IsFile || oldState == input::IsInvalid) {
172  } else if (newState == input::IsRun || oldState == input::IsFile) {
175  } else if (newState == input::IsSubRun || oldState == input::IsRun) {
176  assert(processingMode() != Runs);
179  } else {
182  }
183  }
184  if (state_ == input::IsStop) {
185  subRunPrincipal_.reset();
186  runPrincipal_.reset();
187  // FIXME: upon the advent of a catalog system which can do
188  // something intelligent with the difference between whole-file
189  // success, partial-file success, partial-file failure and
190  // whole-file failure (such as file-open failure), we will need to
191  // communicate that difference here. The file disposition options
192  // as they are now (and the mapping to any concrete implementation
193  // we are are aware of currently) are not sufficient to the task,
194  // so we deliberately do not distinguish here between partial-file
195  // and whole-file success in particular.
196  finish();
197  }
198  return state_;
199  }
201  void
203  {
204  beginJob();
205  }
207  void
209  {
210  endJob();
211  }
213  // Return a dummy file block.
214  std::unique_ptr<FileBlock>
216  {
217  assert(doneReadAhead_);
218  assert(state_ == input::IsFile);
219  assert(!limitReached());
220  doneReadAhead_ = false;
221  return readFile_();
222  }
224  void
226  {
227  return closeFile_();
228  }
230  // Return a dummy file block.
231  // This function must be overridden for any input source that reads a file
232  // containing Products. Such a function should update the
233  // MasterProductRegistry to reflect the products found in this new file.
234  std::unique_ptr<FileBlock>
236  {
237  return std::make_unique<FileBlock>();
238  }
240  std::unique_ptr<RunPrincipal>
242  {
243  // Note: For the moment, we do not support saving and restoring the state of
244  // the random number generator if random numbers are generated during
245  // processing of runs (e.g. beginRun(), endRun())
246  assert(doneReadAhead_);
247  assert(state_ == input::IsRun);
248  assert(!limitReached());
249  doneReadAhead_ = false;
251  return std::move(runPrincipal_);
252  }
254  std::unique_ptr<SubRunPrincipal>
256  cet::exempt_ptr<RunPrincipal const> rp)
257  {
258  // Note: For the moment, we do not support saving and restoring the state of
259  // the random number generator if random numbers are generated during
260  // processing of subRuns (e.g. beginSubRun(), endSubRun())
261  assert(doneReadAhead_);
262  assert(state_ == input::IsSubRun);
263  assert(!limitReached());
264  doneReadAhead_ = false;
266  assert(subRunPrincipal_->run() == rp->run());
267  subRunPrincipal_->setRunPrincipal(rp);
269  return std::move(subRunPrincipal_);
270  }
272  std::unique_ptr<EventPrincipal>
274  cet::exempt_ptr<SubRunPrincipal const> srp)
275  {
276  assert(doneReadAhead_);
277  assert(state_ == input::IsEvent);
278  assert(!eventLimitReached());
279  doneReadAhead_ = false;
282  assert(srp->run() == eventPrincipal_->run());
283  assert(srp->subRun() == eventPrincipal_->subRun());
284  eventPrincipal_->setSubRunPrincipal(srp);
285  if (eventPrincipal_.get() != nullptr) {
286  if (remainingEvents_ > 0)
288  ++readCount_;
289  setTimestamp(eventPrincipal_->time());
290  if ((reportFrequency_ > 0) && !(readCount_ % reportFrequency_)) {
292  }
293  }
294  return std::move(eventPrincipal_);
295  }
297  std::unique_ptr<EventPrincipal>
299  {
301  << "DecrepitRelicInputSourceImplementation::readEvent()\n"
302  << "Random access is not implemented for this type of Input Source\n"
303  << "Contact a Framework Developer\n";
304  }
306  void
308  {
309  this->skip(offset);
310  }
312  void
314  {
315  time_t t = time(0);
316  char ts[] = "dd-Mon-yyyy hh:mm:ss TZN ";
317  strftime(ts, strlen(ts) + 1, "%d-%b-%Y %H:%M:%S %Z", localtime(&t));
318  mf::LogVerbatim("ArtReport")
319  << "Begin processing the " << readCount_ << suffix(readCount_)
320  << " record. " << eventID << " at " << ts;
321  // At some point we may want to initiate checkpointing here
322  }
324  void
326  {
328  << "DecrepitRelicInputSourceImplementation::skip()\n"
329  << "Random access is not implemented for this type of Input Source\n"
330  << "Contact a Framework Developer\n";
331  }
333  void
335  {
337  << "DecrepitRelicInputSourceImplementation::rewind()\n"
338  << "Rewind is not implemented for this type of Input Source\n"
339  << "Contact a Framework Developer\n";
340  }
342  void
344  {}
346  void
348  {}
350 } // art
352 // ======================================================================
