LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
InputFileCatalog.cc
Go to the documentation of this file.
1 //
3 // InputFileCatalog
4 //
6 
12 #include "boost/algorithm/string.hpp"
14 #include "cetlib_except/exception.h"
15 
16 #include <cassert>
17 
18 namespace art {
19 
22  : fileSources_{config().namesParameter()}
23  {
24 
25  if (fileSources_.empty()) {
27  "InputFileCatalog::InputFileCatalog()\n")
28  << "Empty '" << config().namesParameter.name()
29  << "' parameter specified for input source.\n";
30  }
31 
32  // Configure FileDelivery service
33  ci_->configure(fileSources_);
34  searchable_ = ci_->isSearchable();
35 
36  if (searchable_)
37  fileCatalogItems_.resize(fileSources_.size());
38  }
39 
40  FileCatalogItem const&
42  {
43  if (fileIdx_ == indexEnd) {
44  throw Exception(
46  "Cannot access the current file while the file catalog is empty!");
47  }
48  assert(fileIdx_ <= maxIdx_);
50  }
51 
52  size_t
54  {
55  return fileIdx_;
56  }
57 
58  bool
59  InputFileCatalog::getNextFile(int const attempts)
60  {
61  // get next file from FileDelivery service and give it to
62  // currentFile_ object returns false if theres no more file
63  //
64  // If hasNextFile() has been called prior to getNextFile(), it
65  // does not actually go fetch the next file from FileDelivery
66  // service, instead, it advances the iterator by one and make the
67  // "hidden" next file current.
68 
70  return false;
71 
72  if ((nextFileProbed_ && hasNextFile_) ||
73  retrieveNextFile(nextItem_, attempts)) {
74  nextFileProbed_ = false;
75  fileIdx_ =
76  (fileIdx_ == indexEnd) ? 0 : (searchable_ ? (fileIdx_ + 1) : 0);
77  if (fileIdx_ > maxIdx_)
78  maxIdx_ = fileIdx_;
80  return true;
81  }
82 
83  return false;
84  }
85 
86  bool
87  InputFileCatalog::hasNextFile(int const attempts)
88  {
89  // A probe. It tries (and actually does) retrieve the next file
90  // from the FileDelivery service. But does not advance the current
91  // file pointer
92  if (nextFileProbed_)
93  return hasNextFile_;
94 
96  nextFileProbed_ = true;
97 
98  return hasNextFile_;
99  }
100 
101  bool
103  int const attempts,
104  bool const transferOnly)
105  {
106  // Tell the service the current opened file (if there is one) is consumed
107  finish();
108 
109  // retrieve (deliver and transfer) next file from service
110  // or, do the transfer only
111  FileCatalogStatus status;
112  if (transferOnly) {
113  status = transferNextFile(item);
114  } else {
115  status = retrieveNextFileFromCacheOrService(item);
116  }
117 
118  switch (status) {
120  // mark the file as transferred
121  ci_->updateStatus(item.uri(), FileDisposition::TRANSFERRED);
122  return true;
123  }
125  return false;
126  }
128  if (attempts <= 1) {
130  "InputFileCatalog::retrieveNextFile()\n")
131  << "Delivery error encountered after reaching maximum number of "
132  "attemtps!";
133  }
134  return retrieveNextFile(item, attempts - 1, false);
135  }
137  if (attempts <= 1) {
138  // if we end up with a transfer error, the method returns
139  // with a true flag and empty filename. Weired enough, but
140  // the next file does exist we just cannot retrieve it. Therefore
141  // we notify the service that the file has been skipped
142  ci_->updateStatus(item.uri(), FileDisposition::SKIPPED);
143  return true;
144  }
145  return retrieveNextFile(item, attempts - 1, true);
146  }
147  }
148  assert(false);
149  return false; // Unreachable, but keeps the compiler happy
150  }
151 
154  {
155  // Try to get it from cached files
156  if (fileIdx_ < maxIdx_) {
157  item = fileCatalogItems_[fileIdx_ + 1];
159  }
160 
161  // Try to get it from the service
162  std::string uri;
163  double wait = 0.0;
164 
165  // get file delivered
166  int const result = ci_->getNextFileURI(uri, wait);
167 
168  if (result == FileDeliveryStatus::NO_MORE_FILES)
170 
171  if (result != FileDeliveryStatus::SUCCESS)
173 
174  item = FileCatalogItem("", "", uri);
175 
176  // get file transfered
177  return transferNextFile(item);
178  }
179 
182  {
183  std::string pfn;
184  int const result = ft_->translateToLocalFilename(item.uri(), pfn);
185 
186  if (result != FileTransferStatus::SUCCESS) {
187  item.fileName("");
188  item.logicalFileName("");
189  item.skip();
191  }
192 
193  // successfully retrieved the file
194  std::string lfn = pfn;
195  boost::trim(pfn);
196 
197  if (pfn.empty()) {
199  "InputFileCatalog::retrieveNextFileFromCacheService()\n")
200  << "An empty string specified in parameter for input source.\n";
201  }
202 
203  if (isPhysical(pfn)) {
204  lfn.clear();
205  }
206 
207  item.fileName(pfn);
208  item.logicalFileName(lfn);
210  }
211 
212  void
214  {
215  if (!searchable_) {
216  throw Exception(errors::LogicError, "InputFileCatalog::rewind()\n")
217  << "A non-searchable catalog is not allowed to rewind!";
218  }
219  fileIdx_ = 0;
220  }
221 
222  void
223  InputFileCatalog::rewindTo(size_t const index)
224  {
225  // rewind to a previous file location in the catalog service is
226  // not rewinded. only usable when
227  // FileDeliveryService::areFilesPersistent() is true
228  if (!searchable_) {
229  throw Exception(errors::LogicError, "InputFileCatalog::rewindTo()\n")
230  << "A non-searchable catalog is not allowed to rewind!";
231  }
232 
233  if (index > maxIdx_) {
234  throw Exception(errors::InvalidNumber, "InputFileCatalog::rewindTo()\n")
235  << "Index " << index << " is out of range!";
236  }
237 
238  fileIdx_ = index;
239  }
240 
241  void
243  {
244  if (fileIdx_ != indexEnd // there is a current file
245  && !currentFile().skipped() // not skipped
246  && !currentFile().consumed()) // not consumed
247  {
248  ci_->updateStatus(currentFile().uri(), FileDisposition::CONSUMED);
249  fileCatalogItems_[fileIdx_].consume();
250  }
251  }
252 
253 } // namespace art
std::vector< std::string > fileSources_
bool hasNextFile(int attempts=5)
static constexpr size_t indexEnd
FileCatalogStatus retrieveNextFileFromCacheOrService(FileCatalogItem &item)
bool retrieveNextFile(FileCatalogItem &item, int attempts, bool transferOnly=false)
bool getNextFile(int attempts=5)
ServiceHandle< FileTransfer > ft_
ServiceHandle< CatalogInterface > ci_
std::string const & uri() const noexcept
Definition: FileCatalog.h:35
FileCatalogStatus
std::string const & fileName() const noexcept
Definition: FileCatalog.h:25
size_t currentIndex() const noexcept
std::string const & logicalFileName() const noexcept
Definition: FileCatalog.h:30
FileCatalogItem const & currentFile() const
std::vector< FileCatalogItem > fileCatalogItems_
FileCatalogItem nextItem_
void rewindTo(size_t index)
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
Definition: MVAAlg.h:12
InputFileCatalog(fhicl::TableFragment< Config > const &config)
static bool isPhysical(std::string const &name) noexcept
Definition: FileCatalog.h:93
FileCatalogStatus transferNextFile(FileCatalogItem &item)