LArSoft  v07_13_02
Liquid Argon Software toolkit - http://larsoft.org/
InputFileCatalog.cc
Go to the documentation of this file.
1 //
3 // InputFileCatalog
4 //
6 
10 #include "boost/algorithm/string.hpp"
12 #include "cetlib_except/exception.h"
13 #include "fhiclcpp/ParameterSet.h"
14 
15 #include <limits>
16 
17 namespace art {
18 
21  : fileSources_{config().namesParameter()}
22  {
23 
24  if (fileSources_.empty()) {
26  "InputFileCatalog::InputFileCatalog()\n")
27  << "Empty '" << config().namesParameter.name()
28  << "' parameter specified for input source.\n";
29  }
30 
31  // Configure FileDelivery service
32  ci_->configure(fileSources_);
33  searchable_ = ci_->isSearchable();
34 
35  if (searchable_)
36  fileCatalogItems_.resize(fileSources_.size());
37  }
38 
39  FileCatalogItem const&
41  {
42  if (fileIdx_ == indexEnd) {
43  throw art::Exception(
45  "Cannot access the current file while the file catalog is empty!");
46  }
47  assert(fileIdx_ <= maxIdx_);
49  }
50 
51  size_t
53  {
54  return fileIdx_;
55  }
56 
57  bool
59  {
60  // get next file from FileDelivery service
61  // and give it to currentFile_ object
62  // returns false if theres no more file
63  //
64  // If hasNextFile() has been called prior to
65  // getNextFile(), it does not actually go fetch
66  // the next file from FileDelivery service,
67  // instead, it advances the iterator by one and
68  // make the "hidden" next file current.
69 
71  return false;
72 
73  if ((nextFileProbed_ && hasNextFile_) ||
74  retrieveNextFile(nextItem_, attempts)) {
75  nextFileProbed_ = false;
76  fileIdx_ =
77  (fileIdx_ == indexEnd) ? 0 : (searchable_ ? (fileIdx_ + 1) : 0);
78  if (fileIdx_ > maxIdx_)
79  maxIdx_ = fileIdx_;
81  return true;
82  }
83 
84  return false;
85  }
86 
87  bool
89  {
90  // A probe. It tries(and actually does) retreive
91  // the next file from the FileDelivery service. But
92  // does not advance the current file pointer
93  if (nextFileProbed_)
94  return hasNextFile_;
95 
97  nextFileProbed_ = true;
98 
99  return hasNextFile_;
100  }
101 
102  bool
104  int attempts,
105  bool transferOnly)
106  {
107 
108  // Tell the service the current opened file (if theres one) is consumed
109  finish();
110 
111  // retrieve (deliver and transfer) next file from service
112  // or, do the transfer only
113  FileCatalogStatus status;
114  if (transferOnly) {
115  status = transferNextFile(item);
116  } else {
117  status = retrieveNextFileFromCacheOrService(item);
118  }
119 
120  if (status == FileCatalogStatus::SUCCESS) {
121  // mark the file as transferred
122  ci_->updateStatus(item.uri(), FileDisposition::TRANSFERRED);
123  return true;
124  }
125 
126  if (status == FileCatalogStatus::NO_MORE_FILES) {
127  return false;
128  }
129 
130  if (status == FileCatalogStatus::DELIVERY_ERROR) {
131  if (attempts <= 1) {
133  "InputFileCatalog::retreiveNextFile()\n")
134  << "Delivery error encountered after reaching maximum number of "
135  "attemtps!";
136  } else {
137  return retrieveNextFile(item, attempts - 1, false);
138  }
139  }
140 
141  if (status == FileCatalogStatus::TRANSFER_ERROR) {
142  if (attempts <= 1) {
143  // if we end up with a transfer error, the method returns
144  // with a true flag and empty filename. Weired enough, but
145  // the next file does exist we just cannot retrieve it. Therefore
146  // we notify the service that the file has been skipped
147  ci_->updateStatus(item.uri(), FileDisposition::SKIPPED);
148  return true;
149  } else {
150  return retrieveNextFile(item, attempts - 1, true);
151  }
152  }
153 
154  // should never reach here
155  assert(0);
156  return false;
157  }
158 
161  {
162  // Try to get it from cached files
163  if (fileIdx_ < maxIdx_) {
164  item = fileCatalogItems_[fileIdx_ + 1];
166  }
167 
168  // Try to get it from the service
169  std::string uri;
170  double wait = 0.0;
171 
172  // get file delivered
173  int result = ci_->getNextFileURI(uri, wait);
174 
175  if (result == FileDeliveryStatus::NO_MORE_FILES)
177 
178  if (result != FileDeliveryStatus::SUCCESS)
180 
181  item = FileCatalogItem("", "", uri);
182 
183  // get file transfered
184  return transferNextFile(item);
185  }
186 
189  {
190 
191  std::string pfn;
192 
193  int result = ft_->translateToLocalFilename(item.uri(), pfn);
194 
195  if (result != FileTransferStatus::SUCCESS) {
196  item.fileName("");
197  item.logicalFileName("");
198  item.skip();
200  }
201 
202  // successfully retrieved the file
203  std::string lfn = pfn;
204 
205  boost::trim(pfn);
206 
207  if (pfn.empty()) {
208  throw art::Exception(
210  "InputFileCatalog::retrieveNextFileFromCacheService()\n")
211  << "An empty string specified in parameter for input source.\n";
212  }
213 
214  if (isPhysical(pfn)) {
215  lfn.clear();
216  }
217 
218  item.fileName(pfn);
219  item.logicalFileName(lfn);
221  }
222 
223  void
225  {
226  if (!searchable_) {
228  "InputFileCatalog::rewind()\n")
229  << "A non-searchable catalog is not allowed to rewind!";
230  }
231  fileIdx_ = 0;
232  }
233 
234  void
236  {
237  // rewind to a previous file location in the catalog
238  // service is not rewinded. only usable when FileDeliveryService::
239  // areFilesPersistent() is true
240  if (!searchable_) {
242  "InputFileCatalog::rewindTo()\n")
243  << "A non-searchable catalog is not allowed to rewind!";
244  }
245 
246  if (index > maxIdx_) {
248  "InputFileCatalog::rewindTo()\n")
249  << "Index " << index << " is out of range!";
250  }
251 
252  fileIdx_ = index;
253  }
254 
255  void
257  {
258  if (fileIdx_ != indexEnd // there is a current file
259  && !currentFile().skipped() // not skipped
260  && !currentFile().consumed()) // not consumed
261  {
262  ci_->updateStatus(currentFile().uri(), FileDisposition::CONSUMED);
263  fileCatalogItems_[fileIdx_].consume();
264  }
265  }
266 
267 } // art
std::vector< std::string > fileSources_
size_t currentIndex() const
bool hasNextFile(int attempts=5)
static constexpr size_t indexEnd
FileCatalogStatus retrieveNextFileFromCacheOrService(FileCatalogItem &item)
bool retrieveNextFile(FileCatalogItem &item, int attempts, bool transferOnly=false)
bool getNextFile(int attempts=5)
ServiceHandle< FileTransfer > ft_
std::string const & logicalFileName() const
Definition: FileCatalog.h:30
ServiceHandle< CatalogInterface > ci_
FileCatalogStatus
FileCatalogItem const & currentFile() const
std::vector< FileCatalogItem > fileCatalogItems_
static bool isPhysical(std::string const &name)
Definition: FileCatalog.h:93
FileCatalogItem nextItem_
void rewindTo(size_t index)
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
HLT enums.
InputFileCatalog(fhicl::TableFragment< Config > const &config)
std::string const & fileName() const
Definition: FileCatalog.h:25
FileCatalogStatus transferNextFile(FileCatalogItem &item)
std::string const & uri() const
Definition: FileCatalog.h:35