LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
Principal.cc
Go to the documentation of this file.
2 // vim: set sw=2 expandtab :
3 
27 #include "cetlib/container_algorithms.h"
28 #include "cetlib/exempt_ptr.h"
29 #include "range/v3/view.hpp"
30 
31 #include <atomic>
32 #include <cassert>
33 #include <memory>
34 #include <string>
35 #include <utility>
36 #include <vector>
37 
38 using namespace cet;
39 using namespace std;
40 
41 namespace {
42  std::string const indent(2, ' ');
43 }
44 
45 namespace art {
46 
47  namespace {
48 
49  unique_ptr<Group>
50  create_group(DelayedReader* reader, BranchDescription const& bd)
51  {
52  auto const& class_name = bd.producedClassName();
53  auto gt = Group::grouptype::normal;
54  if (is_assns(class_name)) {
55  if (name_of_template_arg(class_name, 2) == "void"s) {
56  gt = Group::grouptype::assns;
57  } else {
58  gt = Group::grouptype::assnsWithData;
59  }
60  }
61  return make_unique<Group>(
62  reader, bd, make_unique<RangeSet>(RangeSet::invalid()), gt);
63  }
64 
65  } // unnamed namespace
66 
67  void
68  Principal::ctor_create_groups(
69  cet::exempt_ptr<ProductTable const> presentProducts)
70  {
71  if (!presentProducts) {
72  return;
73  }
74  // Note: Dropped products are a problem. We should not create
75  // groups for them now because later we may open a secondary
76  // file which actually contains them and we want the
77  // secondary principal to have those groups. However some
78  // code expects to be able to find a group for dropped
79  // products, so getGroupTryAllFiles ignores groups for
80  // dropped products instead.
81  for (auto const& pd :
82  presentProducts->descriptions | ::ranges::views::values) {
83  assert(pd.branchType() == branchType_);
84  fillGroup(pd);
85  }
86  }
87 
88  void
89  Principal::ctor_read_provenance()
90  {
91  for (auto&& provenance : delayedReader_->readProvenance()) {
92  auto g = getGroupLocal(provenance.productID());
93  if (g.get() == nullptr) {
94  continue;
95  }
96  if (provenance.productStatus() != productstatus::unknown()) {
97  g->setProductProvenance(make_unique<ProductProvenance>(provenance));
98  } else {
99  // We have an old format file, convert.
100  g->setProductProvenance(make_unique<ProductProvenance>(
101  provenance.productID(),
103  provenance.parentage().parents()));
104  }
105  }
106  }
107 
108  void
109  Principal::ctor_fetch_process_history(ProcessHistoryID const& phid)
110  {
111  if (!phid.isValid()) {
112  return;
113  }
114  ProcessHistory processHistory;
115  ProcessHistoryRegistry::get(phid, processHistory);
116  std::swap(processHistory_, processHistory);
117  }
118 
119  Principal::Principal(BranchType branchType,
120  ProcessConfiguration const& pc,
121  cet::exempt_ptr<ProductTable const> presentProducts,
122  ProcessHistoryID const& hist,
123  std::unique_ptr<DelayedReader>&&
124  reader /* = std::make_unique<NoDelayedReader>() */)
125  : branchType_{branchType}
127  , presentProducts_{presentProducts.get()}
128  , delayedReader_{std::move(reader)}
129  {
130  delayedReader_->setPrincipal(this);
131  ctor_create_groups(presentProducts);
134  }
135 
136  void
138  {
139  std::lock_guard sentry{groupMutex_};
140  auto it = groups_.find(pd.productID());
141  if (it != std::cend(groups_)) {
142  // The 'combinable' call does not require that the processing
143  // history be the same, which is not what we are checking for here.
144  auto const& found_pd = it->second->productDescription();
145  if (combinable(found_pd, pd)) {
147  << "The process name " << pd.processName()
148  << " was previously used on these products.\n"
149  << "Please modify the configuration file to use a "
150  << "distinct process name.\n";
151  }
153  << "The product ID " << pd.productID() << " of the new product:\n"
154  << pd
155  << " collides with the product ID of the already-existing product:\n"
156  << found_pd
157  << "Please modify the instance name of the new product so as to avoid "
158  "the product ID collision.\n"
159  << "In addition, please notify artists@fnal.gov of this error.\n";
160  }
161 
162  groups_[pd.productID()] = create_group(delayedReader_.get(), pd);
163  }
164 
165  // FIXME: This breaks the purpose of the
166  // Principal::addToProcessHistory() compare_exchange_strong
167  // because of the temporal hole between when the history is
168  // changed and when the flag is set, this must be fixed!
169  void
171  {
173  }
174 
175  // Note: LArSoft uses this extensively to create a Ptr by hand.
176  EDProductGetter const*
178  {
179  auto g = getGroupTryAllFiles(pid);
180  if (g.get() != nullptr) {
181  // Note: All produced products should be found.
182  return g.get();
183  }
184  return nullptr;
185  }
186 
187  // Note: LArSoft uses this extensively to create a Ptr by hand.
188  Provenance
190  {
191  return Provenance{getGroupLocal(pid)};
192  }
193 
194  EDProductGetter const*
196  {
197  return productGetter(pid);
198  }
199 
200  void
202  ProductTables const& producedProducts)
203  {
204  auto const& produced = producedProducts.get(branchType_);
205  producedProducts_ = &produced;
206  if (produced.descriptions.empty()) {
207  return;
208  }
209  // The process history is expanded if there is a product that is
210  // produced in this process.
212  for (auto const& pd : produced.descriptions | ::ranges::views::values) {
213  assert(pd.branchType() == branchType_);
214  // Create a group for the produced product.
215  fillGroup(pd);
216  }
217  }
218 
219  void
221  {
223  }
224 
225  void
227  {
228  // Read all data products and provenance immediately, if
229  // available. Used only by RootInputFile to implement the
230  // delayedRead*Products config options.
231  //
232  // Note: The input source lock will be held when this routine is called.
233  //
234  // MT-TODO: For right now ignore the delay reading option for
235  // product provenance. If we do the delay reading then we
236  // must use a lock to interlock all fetches of provenance
237  // because the delay read fills the pp_by_pid_ one entry
238  // at a time, and we do not want other threads to find
239  // the info only partly there.
240  std::lock_guard sentry{groupMutex_};
241  for (auto const& group : groups_ | ::ranges::views::values) {
242  group->resolveProductIfAvailable();
243  }
244  }
245 
246  ProcessHistory const&
248  {
249  // MT note: We make no attempt to protect callers who use this
250  // call to get access to the iteration interface of the
251  // process history. See the threading notes there and
252  // here for the reasons why.
253  return processHistory_;
254  }
255 
256  ProcessConfiguration const&
258  {
259  return processConfiguration_;
260  }
261 
262  size_t
264  {
265  std::lock_guard sentry{groupMutex_};
266  return groups_.size();
267  }
268 
271  {
272  std::lock_guard sentry{groupMutex_};
273  return groups_.begin();
274  }
275 
278  {
279  std::lock_guard sentry{groupMutex_};
280  return groups_.cbegin();
281  }
282 
285  {
286  std::lock_guard sentry{groupMutex_};
287  return groups_.end();
288  }
289 
292  {
293  std::lock_guard sentry{groupMutex_};
294  return groups_.cend();
295  }
296 
297  cet::exempt_ptr<ProductProvenance const>
299  {
300  // Note: The input source lock will be held when this routine is called.
301  //
302  // MT-TODO: For right now ignore the delay reading option for
303  // product provenance. If we do the delay reading then we
304  // must use a lock to interlock all fetches of provenance
305  // because the delay read fills the pp_by_pid_ one entry
306  // at a time, and we do not want other threads to find
307  // the info only partly there.
308  cet::exempt_ptr<ProductProvenance const> ret;
309  auto g = getGroupLocal(pid);
310  if (g.get() != nullptr) {
311  ret = g->productProvenance();
312  }
313  return ret;
314  }
315 
316  // Note: threading: The solution chosen for the following
317  // problems is to convert groups_ from type:
318  //
319  // std::map<ProductID, std::unique_ptr<Group>>
320  //
321  // to type:
322  //
323  // tbb::concurrent_unordered_map<
324  // ProductID, std::unique_ptr<Group>>
325  //
326  // We get concurrent insertion and iteration, but not
327  // concurrent erasure (which is not a problem because we never
328  // remove groups). Note that tbb uses a value for the end()
329  // iterator for this class which is always valid for comparing
330  // against the result of an interation or find (it is
331  // implemented as (<element-ptr>(nullptr), <internal-table>)).
332  //
333  // Note: threading: May be called from producer and filter
334  // module processing tasks! This requires us to protect
335  // groups_ against multiple threads attempting an insert at
336  // the same time.
337  //
338  // Note: threading: Also anyone using the iterators over
339  // groups_ (which are Principal::begin() and Principal::end())
340  // need protection against having their interators invalidated.
341  // Right now the only code doing this is:
342  //
343  // Principal::readImmediate() const
344  // Principal::getGroupTryAllFiles(ProductID const& pid) const
345  // Principal::removeCachedProduct(ProductID const pid) const
346  // Principal::findGroupsForProcess(...) const
347  // OutputModule::updateBranchChildren()
348  //
349  // Principal::readImmediate() is called just after principal
350  // creation with the input lock held. Module tasks on other
351  // streams could be doing puts which would invalid the iterator
352  // if the data product being put does not exist in the input
353  // file and is being put for the first time so this is a
354  // group insertion.
355  //
356  // Principal::getGroupTryAllFiles(ProductID const& pid) const
357  // Used by Principal::getByProductID(ProductID const& pid) const
358  // Used by art::ProductRetriever<T>::get(ProductID const pid, Handle<T>&
359  // result) const. (easy user-facing api)
360  // Used by Principal::productGetter(ProductID const pid) const
361  // Used by (Run,SubRun,Event,Results)::productGetter (advanced
362  // user-facing api)
363  // Used by Principal::getForOutput(ProductID const pid, bool
364  // resolveProd) const
365  // Used by RootOutputFile to fetch products being written to disk.
366  // Used by FileDumperOutput_module.
367  // Used by ProvenanceCheckerOutput_module.
368  //
369  // These uses are find() and compare against end(). Problem is that
370  // end() may have moved by the time we do the compare with the find
371  // result. There is also use of the mpr and secondary files.
372  //
373  // Principal::removeCachedProduct(ProductID const pid) const
374  // Principal::findGroupsForProcess(...) const
375  //
376  // These uses are find() and compare against end(). Problem is that
377  // end() may have moved by the time we do the compare with the find
378  // result. There is also use of secondary principals.
379  //
380  // OutputModule::updateBranchChildren is called only for events
381  // and only after all module processing tasks which can put
382  // products into the event have finished running, so it does
383  // not need the protection.
384  //
385  // Used by the Run, SubRun, and EventPrincipal constructors
386  // if a product was produced.
387  //
388  // Used by RootOutput_module from write, writeSubRun, and
389  // writeRun if a branch was dropped by selectEvents processing
390  // for that kind of principal.
391  //
392  // Used by RootOutput_module from startEndFile if a branch was
393  // dropped by selectEvents processing for a results principal,
394  // or if a product was produced by a results principal (note
395  // that it has not actually gotten the chance to make the
396  // product, that happens right after this call is made).
397  //
398  // Note: threading: If the only uses were from the constructors
399  // we would have no problems, but the use from the root output
400  // module is bad because it could be running concurrently with
401  // other output modules and analyzers for this same principal.
402  // So we have to use a compare_exchange_strong on
403  // processHistoryModified_ so that only one task tries to do
404  // this. We also need to stall the other output and analyzer
405  // modules that call processHistory() while we are doing the
406  // update so that they get the updated result. For output and
407  // analyzer modules that have already fetched the process
408  // history pointer, we have to stall any attempt to access its
409  // internal process configuration list while we are updating it.
410  //
411  // FIXME: threading: We hand out processHistory_ through the
412  // processHistory() interface, which is in turn handed out by
413  // the ProductRetriever::processHistory() interface to any module
414  // task that wants it. This is a problem for output modules
415  // and analyzers if an output module decides to update the
416  // process history from startEndFile. We must stall users of
417  // the process history if we updating it, both by stalling a
418  // fetch of processHistory_ once we start to update, and for
419  // those modules that have already fetched the processHistory_
420  // we must stall any attempt by them to access its internal
421  // process configuration list while we are changing it.
422  void
424  {
425  bool expected = false;
426  if (processHistoryModified_.compare_exchange_strong(expected, true)) {
427  // MT note: We have now locked out any other task trying to
428  // modify the process history. Now we have to block
429  // tasks that already have a pointer to the process
430  // history from accessing its internals while we update
431  // it. We do not protect the iteration interface, the
432  // begin(), end(), and size() are all separate calls
433  // and we cannot lock in each one because there is no
434  // way to automatically unlock.
435  std::lock_guard sentry{processHistory_.get_mutex()};
436  string const& processName = processConfiguration_.processName();
437  for (auto const& val : processHistory_) {
438  if (processName == val.processName()) {
440  << "The process name " << processName
441  << " was previously used on these products.\n"
442  << "Please modify the configuration file to use a "
443  << "distinct process name.\n";
444  }
445  }
446  processHistory_.push_back(processConfiguration_);
447  // Optimization note: As of 0_9_0_pre3 For very simple Sources
448  // (e.g. EmptyEvent) this routine takes up nearly 50% of the
449  // time per event, and 96% of the time for this routine is spent
450  // in computing the ProcessHistory id which happens because we
451  // are reconstructing the ProcessHistory for each event. It
452  // would probably be better to move the ProcessHistory
453  // construction out to somewhere which persists for longer than
454  // one Event.
455  auto const phid = processHistory_.id();
456  ProcessHistoryRegistry::emplace(phid, processHistory_);
457  }
458  }
459 
460  std::size_t
462  ModuleContext const& mc,
463  SelectorBase const& sel,
464  std::vector<cet::exempt_ptr<Group>>& groups) const
465  {
466  // Loop over processes in reverse time order. Sometimes we want
467  // to stop after we find a process with matches so check for that
468  // at each step.
469  std::size_t found{};
470  // MT note: We must protect the process history iterators here
471  // against possible invalidation by output modules
472  // inserting a process history entry while we are
473  // iterating.
474  std::lock_guard sentry{processHistory_.get_mutex()};
475  // We must skip over duplicate entries of the same process
476  // configuration in the process history. This unfortunately
477  // happened with the SamplingInput source.
478  for (auto const& h :
479  ::ranges::views::reverse(processHistory_) | ::ranges::views::unique) {
480  if (auto it = pl.find(h.processName()); it != pl.end()) {
481  found += findGroupsForProcess(it->second, mc, sel, groups);
482  }
483  }
484  return found;
485  }
486 
489  WrappedTypeID const& wrapped,
490  SelectorBase const& sel,
491  ProcessTag const& processTag) const
492  {
493  auto const groups = findGroupsForProduct(mc, wrapped, sel, processTag);
494  auto const result = resolve_unique_product(groups, wrapped);
495  if (!result.has_value()) {
496  auto whyFailed = std::make_shared<Exception>(errors::ProductNotFound);
497  *whyFailed << "Found zero products matching all selection criteria\n"
498  << indent << "C++ type: " << wrapped.product_type << '\n'
499  << sel.print(indent) << '\n';
500  return GroupQueryResult{whyFailed};
501  }
502  return *result;
503  }
504 
507  WrappedTypeID const& wrapped,
508  string const& label,
509  string const& productInstanceName,
510  ProcessTag const& processTag) const
511  {
512  auto const& processName = processTag.name();
513  Selector const sel{ModuleLabelSelector{label} &&
514  ProductInstanceNameSelector{productInstanceName} &&
515  ProcessNameSelector{processName}};
516  return getBySelector(mc, wrapped, sel, processTag);
517  }
518 
519  std::vector<InputTag>
521  WrappedTypeID const& wrapped,
522  SelectorBase const& sel,
523  ProcessTag const& processTag) const
524  {
525  std::vector<InputTag> tags;
526  auto const groups = findGroupsForProduct(mc, wrapped, sel, processTag);
527  cet::transform_all(groups, back_inserter(tags), [](auto const g) {
528  return g->productDescription().inputTag();
529  });
530  return tags;
531  }
532 
533  std::vector<GroupQueryResult>
535  WrappedTypeID const& wrapped,
536  SelectorBase const& sel,
537  ProcessTag const& processTag) const
538  {
539  auto const groups = findGroupsForProduct(mc, wrapped, sel, processTag);
540  return resolve_products(groups, wrapped.wrapped_product_type);
541  }
542 
543  auto
545  {
546  return delayedReader_->readFromSecondaryFile(nextSecondaryFileIdx_);
547  }
548 
549  std::vector<cet::exempt_ptr<Group>>
551  SelectorBase const& selector,
552  ProcessTag const& processTag) const
553  {
554  std::vector<cet::exempt_ptr<Group>> groups;
555  // Find groups from current process
556  if (processTag.current_process_search_allowed() &&
558  if (findGroups(
559  producedProducts_.load()->viewLookup, mc, selector, groups) != 0) {
560  return groups;
561  }
562  }
563 
564  if (!processTag.input_source_search_allowed()) {
565  return groups;
566  }
567 
568  // Look through currently opened input files
569  if (groups.empty()) {
570  groups = matchingSequenceFromInputFile(mc, selector);
571  if (!groups.empty()) {
572  return groups;
573  }
574  for (auto const& sp : secondaryPrincipals_) {
575  groups = sp->matchingSequenceFromInputFile(mc, selector);
576  if (!groups.empty()) {
577  return groups;
578  }
579  }
580  }
581  // Open more secondary files if necessary
582  if (groups.empty()) {
583  while (auto sp = tryNextSecondaryFile()) {
584  auto& new_sp = secondaryPrincipals_.emplace_back(std::move(sp));
585  groups = new_sp->matchingSequenceFromInputFile(mc, selector);
586  if (!groups.empty()) {
587  return groups;
588  }
589  }
590  }
591  return groups;
592  }
593 
594  std::vector<cet::exempt_ptr<Group>>
596  SelectorBase const& selector) const
597  {
598  std::vector<cet::exempt_ptr<Group>> groups;
599  if (!presentProducts_.load()) {
600  return groups;
601  }
602  findGroups(presentProducts_.load()->viewLookup, mc, selector, groups);
603  return groups;
604  }
605 
606  std::size_t
608  ModuleContext const& mc,
609  WrappedTypeID const& wrapped,
610  SelectorBase const& selector,
611  std::vector<cet::exempt_ptr<Group>>& groups) const
612  {
613  if (!presentProducts_.load()) {
614  return 0;
615  }
616  auto const& lookup = presentProducts_.load()->productLookup;
617  auto it = lookup.find(wrapped.product_type.friendlyClassName());
618  if (it == lookup.end()) {
619  return 0;
620  }
621  return findGroups(it->second, mc, selector, groups);
622  }
623 
624  std::vector<cet::exempt_ptr<Group>>
626  WrappedTypeID const& wrapped,
627  SelectorBase const& selector,
628  ProcessTag const& processTag) const
629  {
630  std::vector<cet::exempt_ptr<Group>> results;
631  unsigned ret{};
632  // Find groups from current process
633  if (processTag.current_process_search_allowed() &&
635  auto const& lookup = producedProducts_.load()->productLookup;
636  auto it = lookup.find(wrapped.product_type.friendlyClassName());
637  if (it != lookup.end()) {
638  ret += findGroups(it->second, mc, selector, results);
639  }
640  }
641 
642  if (!processTag.input_source_search_allowed()) {
643  return results;
644  }
645 
646  // Look through currently opened input files
647  ret += findGroupsFromInputFile(mc, wrapped, selector, results);
648  if (ret) {
649  return results;
650  }
651  for (auto const& sp : secondaryPrincipals_) {
652  if (sp->findGroupsFromInputFile(mc, wrapped, selector, results)) {
653  return results;
654  }
655  }
656  // Open more secondary files if necessary
657  while (auto sp = tryNextSecondaryFile()) {
658  auto& new_sp = secondaryPrincipals_.emplace_back(std::move(sp));
659  if (new_sp->findGroupsFromInputFile(mc, wrapped, selector, results)) {
660  return results;
661  }
662  }
663  return results;
664  }
665 
666  std::size_t
668  std::vector<ProductID> const& vpid,
669  ModuleContext const& mc,
670  SelectorBase const& sel,
671  std::vector<cet::exempt_ptr<Group>>& res) const
672  {
673  std::size_t found{}; // Horrible hack that should go away
674  for (auto const pid : vpid) {
675  auto group = getGroupLocal(pid);
676  if (!group) {
677  continue;
678  }
679  auto const& pd = group->productDescription();
680  if (pd.dropped()) {
681  continue;
682  }
683  // If we are processing a trigger path, the only visible
684  // produced products are those that originate from modules on
685  // the same path we're currently processing.
686  if (mc.onTriggerPath() && pd.produced() &&
687  !mc.onSamePathAs(pd.moduleLabel())) {
688  continue;
689  }
690  if (!sel.match(pd)) {
691  continue;
692  }
693  // Found a good match, save it.
694  res.emplace_back(group);
695  ++found;
696  }
697  return found;
698  }
699 
700  RangeSet
702  {
703  return rangeSet_;
704  }
705 
706  void
708  {
709  rangeSet_ = rs;
710  }
711 
712  void
714  unique_ptr<ProductProvenance const>&& pp,
715  unique_ptr<EDProduct>&& edp,
716  unique_ptr<RangeSet>&& rs)
717  {
718  assert(edp);
720  // Note: We intentionally allow group and provenance replacement
721  // for run and subrun products.
722  auto group = getGroupLocal(bd.productID());
723  assert(group);
724  group->setProductAndProvenance(
725  std::move(pp), std::move(edp), std::move(rs));
726  } else {
727  auto group = getGroupLocal(bd.productID());
728  assert(group);
729  if (group->anyProduct() != nullptr) {
730  throw Exception(errors::ProductRegistrationFailure, "Principal::put:")
731  << "Problem found during put of " << branchType_
732  << " product: product already put for " << bd.branchName() << '\n';
733  }
734  group->setProductAndProvenance(
735  std::move(pp),
736  std::move(edp),
737  make_unique<RangeSet>(RangeSet::invalid()));
738  }
739  }
740 
741  // We invoke the delay reader now if no user module has ever fetched them
742  // for this principal if resolvedProd is true.
743  //
744  // Note: This attempts to resolve the product and converts the
745  // resulting group into an OutputHandle.
746  //
747  // MT note: Right now this is single-threaded. Be careful if this
748  // changes!!!
750  Principal::getForOutput(ProductID const& pid, bool const resolveProd) const
751  {
752  // MT-FIXME: Uses of group!
753  auto g = getGroupTryAllFiles(pid);
754  if (g.get() == nullptr) {
755  return OutputHandle::invalid();
756  }
757  if (resolveProd) {
758  if (!g->resolveProductIfAvailable()) {
759  // Behavior is the same as if the group wasn't there.
760  return OutputHandle::invalid();
761  }
762  if (g->anyProduct() == nullptr) {
763  return OutputHandle::invalid();
764  }
765  if (!g->anyProduct()->isPresent()) {
766  return OutputHandle::invalid();
767  }
768  }
769  if (!g->anyProduct() && !g->productProvenance()) {
770  return OutputHandle{g->rangeOfValidity()};
771  }
772  return OutputHandle{g->anyProduct(),
773  &g->productDescription(),
774  g->productProvenance(),
775  g->rangeOfValidity()};
776  }
777 
778  cet::exempt_ptr<BranchDescription const>
780  ProductID const pid,
781  bool const alwaysEnableLookupOfProducedProducts /*=false*/) const
782  {
783  // Find groups from current process
784  if (alwaysEnableLookupOfProducedProducts ||
786  if (producedProducts_.load() != nullptr) {
787  if (auto result = producedProducts_.load()->description(pid)) {
788  return result;
789  }
790  }
791  }
792  if (presentProducts_.load()) {
793  // Look through currently opened input files
794  if (auto result = presentProducts_.load()->description(pid)) {
795  return result;
796  }
797  }
798  for (auto const& sp : secondaryPrincipals_) {
799  if (auto result = sp->getProductDescription(pid)) {
800  return result;
801  }
802  }
803  return nullptr;
804  }
805 
806  BranchType
808  {
809  return branchType_;
810  }
811 
812  std::optional<ProductInserter>
814  {
815  return std::make_optional<ProductInserter>(branchType_, *this, mc);
816  }
817 
818  bool
820  {
821  if (!enableLookupOfProducedProducts_.load()) {
822  return false;
823  }
824  auto pd = producedProducts_.load()->description(pid);
825  return pd == nullptr ? false : pd->produced();
826  }
827 
828  bool
830  {
831  if (!presentProducts_.load()) {
832  return false;
833  }
834  auto pd = presentProducts_.load()->description(pid);
835  return pd == nullptr ? false : pd->present();
836  }
837 
840  {
841  if (auto g = getGroupTryAllFiles(pid)) {
842  return GroupQueryResult{g};
843  }
844  auto whyFailed =
845  make_shared<Exception>(errors::ProductNotFound, "InvalidID");
846  *whyFailed << "Principal::getByProductID: no product with branch type: "
847  << branchType_ << " product id: " << pid << '\n';
848  return GroupQueryResult{whyFailed};
849  }
850 
851  cet::exempt_ptr<Group>
853  {
854  std::lock_guard sentry{groupMutex_};
855  auto it = groups_.find(pid);
856  return it != groups_.cend() ? it->second.get() : nullptr;
857  }
858 
859  cet::exempt_ptr<Group>
861  {
862  // Look through current process and currently opened primary input file.
863  if (producedInProcess(pid) || presentFromSource(pid)) {
864  return getGroupLocal(pid);
865  }
866  // Look through secondary files
867  for (auto const& sp : secondaryPrincipals_) {
868  if (sp->presentFromSource(pid)) {
869  return sp->getGroupLocal(pid);
870  }
871  }
872  // Try new secondary files
873  while (auto sp = tryNextSecondaryFile()) {
874  auto& new_sp = secondaryPrincipals_.emplace_back(std::move(sp));
875  if (new_sp->presentFromSource(pid)) {
876  return new_sp->getGroupLocal(pid);
877  }
878  }
879  return nullptr;
880  }
881 
882 } // namespace art
std::optional< ProductInserter > makeInserter(ModuleContext const &mc)
Definition: Principal.cc:813
const_iterator cend() const
Definition: Principal.cc:291
std::atomic< bool > processHistoryModified_
Definition: Principal.h:232
size_t findGroups(ProcessLookup const &, ModuleContext const &, SelectorBase const &, std::vector< cet::exempt_ptr< Group >> &groups) const
Definition: Principal.cc:461
std::map< std::string, std::vector< ProductID >> ProcessLookup
Definition: type_aliases.h:17
std::atomic< bool > enableLookupOfProducedProducts_
Definition: Principal.h:238
decltype(auto) constexpr cend(T &&obj)
ADL-aware version of std::cend.
Definition: StdUtils.h:93
auto tryNextSecondaryFile() const
Definition: Principal.cc:544
GroupCollection groups_
Definition: Principal.h:246
const_iterator end() const
Definition: Principal.cc:284
std::vector< InputTag > getInputTags(ModuleContext const &mc, WrappedTypeID const &wrapped, SelectorBase const &, ProcessTag const &) const
Definition: Principal.cc:520
std::string friendlyClassName() const
Definition: TypeID.cc:61
ProcessHistory processHistory_
Definition: Principal.h:231
bool input_source_search_allowed() const
Definition: ProcessTag.cc:60
STL namespace.
std::recursive_mutex & get_mutex() const
auto & get(BranchType const bt)
Definition: ProductTables.h:49
constexpr ProductStatus dummyToPreventDoubleCount() noexcept
Definition: ProductStatus.h:25
void addToProcessHistory()
Definition: Principal.cc:423
std::optional< GroupQueryResult > resolve_unique_product(std::vector< cet::exempt_ptr< art::Group >> const &product_groups, art::WrappedTypeID const &wrapped)
Definition: Group.cc:387
size_t size() const
Definition: Principal.cc:263
std::vector< cet::exempt_ptr< Group > > matchingSequenceFromInputFile(ModuleContext const &, SelectorBase const &) const
Definition: Principal.cc:595
size_t findGroupsFromInputFile(ModuleContext const &, WrappedTypeID const &wrapped, SelectorBase const &, std::vector< cet::exempt_ptr< Group >> &results) const
Definition: Principal.cc:607
void updateSeenRanges(RangeSet const &rs)
Definition: Principal.cc:707
std::recursive_mutex groupMutex_
Definition: Principal.h:241
void readImmediate() const
Definition: Principal.cc:226
std::string const & processName() const noexcept
BranchType branchType_
Definition: Principal.h:230
ProductProvenance const * productProvenance() const
Definition: OutputHandle.cc:44
auto vector(Vector const &v)
Returns a manipulator which will print the specified array.
Definition: DumpUtils.h:289
GroupQueryResult getBySelector(ModuleContext const &mc, WrappedTypeID const &wrapped, SelectorBase const &, ProcessTag const &) const
Definition: Principal.cc:488
TypeID wrapped_product_type
Definition: WrappedTypeID.h:18
void createGroupsForProducedProducts(ProductTables const &producedProducts)
Definition: Principal.cc:201
RangeSet rangeSet_
Definition: Principal.h:266
std::string print(std::string const &indent) const
Definition: SelectorBase.h:47
bool presentFromSource(ProductID) const
Definition: Principal.cc:829
bool current_process_search_allowed() const
Definition: ProcessTag.cc:68
RangeSet seenRanges() const
Definition: Principal.cc:701
std::atomic< ProductTable const * > producedProducts_
Definition: Principal.h:237
OutputHandle getForOutput(ProductID const &, bool resolveProd) const
Definition: Principal.cc:750
cet::exempt_ptr< BranchDescription const > getProductDescription(ProductID const pid, bool const alwaysEnableLookupOfProducedProducts=false) const
Definition: Principal.cc:779
constexpr ProductStatus unknown() noexcept
Definition: ProductStatus.h:31
EDProductGetter const * productGetter(ProductID id) const
Definition: Principal.cc:177
decltype(auto) values(Coll &&coll)
Range-for loop helper iterating across the values of the specified collection.
std::string indent(std::size_t const i)
EDProductGetter const * getEDProductGetter_(ProductID const &) const override
Definition: Principal.cc:195
void put(BranchDescription const &, std::unique_ptr< ProductProvenance const > &&, std::unique_ptr< EDProduct > &&, std::unique_ptr< RangeSet > &&)
Definition: Principal.cc:713
string name_of_template_arg(string const &template_instance, size_t desired_arg)
Definition: TypeID.cc:121
bool isValid() const
Definition: Hash.h:122
std::vector< cet::exempt_ptr< Group > > getMatchingSequence(ModuleContext const &, SelectorBase const &, ProcessTag const &) const
Definition: Principal.cc:550
std::atomic< ProductTable const * > presentProducts_
Definition: Principal.h:236
void ctor_fetch_process_history(ProcessHistoryID const &)
Definition: Principal.cc:109
std::vector< cet::exempt_ptr< Group > > findGroupsForProduct(ModuleContext const &mc, WrappedTypeID const &wrapped, SelectorBase const &, ProcessTag const &) const
Definition: Principal.cc:625
void ctor_create_groups(cet::exempt_ptr< ProductTable const >)
Definition: Principal.cc:68
bool combinable(BranchDescription const &a, BranchDescription const &b)
ProcessHistory const & processHistory() const
Definition: Principal.cc:247
cet::exempt_ptr< ProductProvenance const > branchToProductProvenance(ProductID const &) const
Definition: Principal.cc:298
void enableLookupOfProducedProducts()
Definition: Principal.cc:220
ProcessConfiguration const & processConfiguration() const
Definition: Principal.cc:257
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
bool onSamePathAs(std::string const &module_label) const
Definition: ModuleContext.h:64
auto const & name() const
Definition: ProcessTag.h:23
cet::exempt_ptr< Group > getGroupTryAllFiles(ProductID const) const
Definition: Principal.cc:860
decltype(auto) get(T &&obj)
ADL-aware version of std::to_string.
Definition: StdUtils.h:120
TH2F * hist
Definition: plot.C:134
int nextSecondaryFileIdx_
Definition: Principal.h:264
ProcessConfiguration const & processConfiguration_
Definition: Principal.h:233
static RangeSet invalid()
Definition: RangeSet.cc:45
void ctor_read_provenance()
Definition: Principal.cc:89
cet::exempt_ptr< Group > getGroupLocal(ProductID const) const
Definition: Principal.cc:852
BranchType
Definition: BranchType.h:20
std::vector< GroupQueryResult > getMany(ModuleContext const &mc, WrappedTypeID const &wrapped, SelectorBase const &, ProcessTag const &) const
Definition: Principal.cc:534
bool onTriggerPath() const
Definition: ModuleContext.h:58
void swap(lar::deep_const_fwd_iterator_nested< CITER, INNERCONTEXTRACT > &a, lar::deep_const_fwd_iterator_nested< CITER, INNERCONTEXTRACT > &b)
Definition: MVAAlg.h:12
static OutputHandle invalid()
Definition: OutputHandle.h:44
void fillGroup(BranchDescription const &)
Definition: Principal.cc:137
bool match(BranchDescription const &p) const
Definition: SelectorBase.h:41
GroupQueryResult getByLabel(ModuleContext const &mc, WrappedTypeID const &wrapped, std::string const &label, std::string const &productInstanceName, ProcessTag const &processTag) const
Definition: Principal.cc:506
std::unique_ptr< DelayedReader > delayedReader_
Definition: Principal.h:250
void markProcessHistoryAsModified()
Definition: Principal.cc:170
std::string const & processName() const noexcept
GroupQueryResult getByProductID(ProductID const pid) const
Definition: Principal.cc:839
std::vector< std::unique_ptr< Principal > > secondaryPrincipals_
Definition: Principal.h:260
static auto emplace(value_type const &value)
RangeSet const & rangeOfValidity() const
Definition: OutputHandle.cc:56
GroupCollection::const_iterator const_iterator
Definition: Principal.h:52
std::string const & branchName() const noexcept
constexpr bool range_sets_supported(BranchType const bt)
bool producedInProcess(ProductID) const
Definition: Principal.cc:819
ProductID productID() const noexcept
const_iterator cbegin() const
Definition: Principal.cc:277
size_t findGroupsForProcess(std::vector< ProductID > const &vpid, ModuleContext const &mc, SelectorBase const &selector, std::vector< cet::exempt_ptr< Group >> &groups) const
Definition: Principal.cc:667
bool is_assns(std::string const &type_name)
Definition: TypeID.h:66
const_iterator begin() const
Definition: Principal.cc:270
BranchType branchType() const
Definition: Principal.cc:807
Provenance provenance(ProductID id) const
Definition: Principal.cc:189
std::vector< GroupQueryResult > resolve_products(std::vector< cet::exempt_ptr< art::Group >> const &groups, art::TypeID const &wrapped_type)
Definition: Group.cc:428