LArSoft  v09_90_00
Liquid Argon Software toolkit - https://larsoft.org/
TimeTracker_service.cc
Go to the documentation of this file.
1 // vim: set sw=2 expandtab :
2 
14 #include "boost/format.hpp"
17 #include "cetlib/HorizontalRule.h"
18 #include "cetlib/sqlite/Connection.h"
19 #include "cetlib/sqlite/Ntuple.h"
20 #include "cetlib/sqlite/helpers.h"
21 #include "cetlib/sqlite/statistics.h"
22 #include "fhiclcpp/types/Atom.h"
23 #include "fhiclcpp/types/Name.h"
24 #include "fhiclcpp/types/Table.h"
26 #include "tbb/concurrent_unordered_map.h"
27 
28 #include <algorithm>
29 #include <cassert>
30 #include <chrono>
31 #include <iomanip>
32 #include <memory>
33 #include <sstream>
34 #include <string>
35 #include <vector>
36 
37 using namespace std;
38 using namespace cet;
39 using namespace hep::concurrency;
40 
41 using chrono::steady_clock;
42 
43 namespace art {
44 
45  namespace {
46 
47  using ConcurrentKey = std::pair<ScheduleID, std::string>;
48  struct ConcurrentKeyHasher {
49  size_t
50  operator()(ConcurrentKey const& key) const
51  {
52  static std::hash<ScheduleID> schedule_hasher{};
53  static std::hash<std::string> string_hasher{};
54  // A better hash will be desirable if this becomes a bottleneck.
55  return schedule_hasher(key.first) ^ string_hasher(key.second);
56  }
57  };
58 
59  auto
60  key(ScheduleID const sid)
61  {
62  return ConcurrentKey{sid, {}};
63  }
64  auto
65  key(ModuleContext const& mc)
66  {
67  return ConcurrentKey{mc.scheduleID(), mc.moduleLabel()};
68  }
69 
70  auto now = bind(&steady_clock::now);
71 
72  struct Statistics {
73  explicit Statistics() = default;
74 
75  explicit Statistics(string const& p,
76  string const& label,
77  string const& type,
78  sqlite3* const db,
79  string const& table,
80  string const& column)
81  : path{p}
82  , mod_label{label}
83  , mod_type{type}
84  , min{sqlite::min(db, table, column)}
85  , mean{sqlite::mean(db, table, column)}
86  , max{sqlite::max(db, table, column)}
87  , median{sqlite::median(db, table, column)}
88  , rms{sqlite::rms(db, table, column)}
89  , n{sqlite::nrows(db, table)}
90  {}
91 
92  string path{};
93  string mod_label{};
94  string mod_type{};
95  double min{-1.};
96  double mean{-1.};
97  double max{-1.};
98  double median{-1.};
99  double rms{-1.};
100  unsigned n{0u};
101  };
102 
103  ostream&
104  operator<<(ostream& os, Statistics const& info)
105  {
106  string label{info.path};
107  if (!info.mod_label.empty()) {
108  label += ':' + info.mod_label;
109  }
110  if (!info.mod_type.empty()) {
111  label += ':' + info.mod_type;
112  }
113  os << label << " " << boost::format(" %=12g ") % info.min
114  << boost::format(" %=12g ") % info.mean
115  << boost::format(" %=12g ") % info.max
116  << boost::format(" %=12g ") % info.median
117  << boost::format(" %=12g ") % info.rms
118  << boost::format(" %=10d ") % info.n;
119  return os;
120  }
121 
122  } // unnamed namespace
123 
124  class TimeTracker {
125  public:
126  static constexpr bool service_handle_allowed{false};
127 
128  struct Config {
129  fhicl::Atom<bool> printSummary{fhicl::Name{"printSummary"}, true};
130  struct DBoutput {
131  fhicl::Atom<string> filename{fhicl::Name{"filename"}, ""};
132  fhicl::Atom<bool> overwrite{fhicl::Name{"overwrite"}, false};
133  };
134  fhicl::Table<DBoutput> dbOutput{fhicl::Name{"dbOutput"}};
135  };
137  explicit TimeTracker(Parameters const&, ActivityRegistry&);
138 
139  private:
142  steady_clock::time_point eventStart;
143  steady_clock::time_point moduleStart;
144  };
145  template <unsigned SIZE>
146  using name_array = cet::sqlite::name_array<SIZE>;
147  using timeSource_t =
148  cet::sqlite::Ntuple<uint32_t, uint32_t, uint32_t, string, double>;
149  using timeEvent_t =
150  cet::sqlite::Ntuple<uint32_t, uint32_t, uint32_t, double>;
151  using timeModule_t = cet::sqlite::
152  Ntuple<uint32_t, uint32_t, uint32_t, string, string, string, double>;
153 
154  void postSourceConstruction(ModuleDescription const&);
155  void postEndJob();
156  void preEventReading(ScheduleContext);
157  void postEventReading(Event const&, ScheduleContext);
158  void preEventProcessing(Event const&, ScheduleContext);
159  void postEventProcessing(Event const&, ScheduleContext);
160  void startTime(ModuleContext const& mc);
161  void recordTime(ModuleContext const& mc, string const& suffix);
162  void logToDestination_(Statistics const& evt,
163  vector<Statistics> const& modules);
164  bool anyTableFull_() const;
165 
166  tbb::concurrent_unordered_map<ConcurrentKey,
168  ConcurrentKeyHasher>
170  bool const printSummary_;
171  unique_ptr<cet::sqlite::Connection> const db_;
172  bool const overwriteContents_;
173  string sourceType_{};
180  };
181 
182  TimeTracker::TimeTracker(Parameters const& config, ActivityRegistry& areg)
183  : printSummary_{config().printSummary()}
185  -> get(config().dbOutput().filename())}
186  , overwriteContents_{config().dbOutput().overwrite()}
187  , timeSourceColumnNames_{{"Run", "SubRun", "Event", "Source", "Time"}}
188  , timeEventColumnNames_{{"Run", "SubRun", "Event", "Time"}}
189  , timeModuleColumnNames_{{"Run",
190  "SubRun",
191  "Event",
192  "Path",
193  "ModuleLabel",
194  "ModuleType",
195  "Time"}}
197  "TimeSource",
200  , timeEventTable_{*db_,
201  "TimeEvent",
203  overwriteContents_}
205  "TimeModule",
207  overwriteContents_}
208  {
209  areg.sPostSourceConstruction.watch(this,
211  areg.sPostEndJob.watch(this, &TimeTracker::postEndJob);
212  // Event reading
213  areg.sPreSourceEvent.watch(this, &TimeTracker::preEventReading);
214  areg.sPostSourceEvent.watch(this, &TimeTracker::postEventReading);
215  // Event execution
216  areg.sPreProcessEvent.watch(this, &TimeTracker::preEventProcessing);
217  areg.sPostProcessEvent.watch(this, &TimeTracker::postEventProcessing);
218  // Module execution
219  areg.sPreModule.watch(this, &TimeTracker::startTime);
220  areg.sPostModule.watch(
221  [this](auto const& mc) { this->recordTime(mc, ""s); });
222  areg.sPreWriteEvent.watch(this, &TimeTracker::startTime);
223  areg.sPostWriteEvent.watch(
224  [this](auto const& mc) { this->recordTime(mc, "(write)"s); });
225  }
226 
227  void
229  {
230  timeSourceTable_.flush();
231  timeEventTable_.flush();
232  timeModuleTable_.flush();
233  if (!printSummary_) {
234  return;
235  }
236  using namespace cet::sqlite;
237  query_result<size_t> rEvents;
238  rEvents << select("count(*)").from(*db_, timeEventTable_.name());
239  query_result<size_t> rModules;
240  rModules << select("count(*)").from(*db_, timeModuleTable_.name());
241  auto const nEventRows = unique_value(rEvents);
242  auto const nModuleRows = unique_value(rModules);
243  if ((nEventRows == 0) && (nModuleRows == 0)) {
244  logToDestination_(Statistics{}, vector<Statistics>{});
245  return;
246  }
247  if (nEventRows == 0 && nModuleRows != 0) {
248  string const errMsg{
249  "Malformed TimeTracker database. The TimeEvent table is empty, but\n"
250  "the TimeModule table is not. This can happen if an exception has\n"
251  "been thrown from a module while processing the first event. Any\n"
252  "saved database file is suspect and should not be used."};
253  mf::LogAbsolute("TimeTracker") << errMsg;
254  return;
255  }
256  // Gather statistics for full Event
257  // -- Unfortunately, this is not a simple query since the (e.g.)
258  // 'RootOutput(write)' times and the source time are not
259  // recorded in the TimeEvent rows. They must be added in.
260  string const fullEventTime_ddl =
261  "CREATE TABLE temp.fullEventTime AS "
262  "SELECT Run,Subrun,Event,SUM(Time) AS FullEventTime FROM ("
263  " SELECT Run,Subrun,Event,Time FROM TimeEvent"
264  " UNION"
265  " SELECT Run,Subrun,Event,Time FROM TimeModule WHERE ModuleType "
266  "LIKE '%(write)'"
267  " UNION"
268  " SELECT Run,Subrun,Event,Time FROM TimeSource"
269  ") GROUP BY Run,Subrun,Event";
270  using namespace cet::sqlite;
271  exec(*db_, fullEventTime_ddl);
272  Statistics const evtStats{
273  "Full event", "", "", *db_, "temp.fullEventTime", "FullEventTime"};
274  drop_table(*db_, "temp.fullEventTime");
275  query_result<string, string, string> r;
276  r << select_distinct("Path", "ModuleLabel", "ModuleType")
277  .from(*db_, timeModuleTable_.name());
278  vector<Statistics> modStats;
279  modStats.emplace_back(
280  "source", sourceType_ + "(read)", "", *db_, "TimeSource", "Time");
281  for (auto const& row : r) {
282  auto const& [path, mod_label, mod_type] = row;
283  create_table_as("temp.tmpModTable",
284  select("*")
285  .from(*db_, "TimeModule")
286  .where("Path='"s + path + "'"s + " AND ModuleLabel='"s +
287  mod_label + "'"s + " AND ModuleType='"s +
288  mod_type + "'"s));
289  modStats.emplace_back(
290  path, mod_label, mod_type, *db_, "temp.tmpModTable", "Time");
291  drop_table(*db_, "temp.tmpModTable");
292  }
293  if (anyTableFull_()) {
294  ostringstream msgOss;
295  HorizontalRule const rule{40};
296  msgOss << rule('=');
297  msgOss << '\n'
298  << "The SQLite database connected to the TimeTracker exceeded the "
299  "available resources.\n"
300  << "No timing information summary is available.\n"
301  << "The database will contain an incomplete record of this job's "
302  "timing information.\n";
303  msgOss << rule('=');
304  mf::LogAbsolute("TimeTracker") << msgOss.str();
305  }
306 
307  logToDestination_(evtStats, modStats);
308  }
309 
310  void
312  {
313  sourceType_ = md.moduleName();
314  }
315 
316  void
318  {
319  auto& d = data_[key(sc.id())];
320  d.eventID = EventID::invalidEvent();
321  d.eventStart = now();
322  }
323 
324  void
326  {
327  auto& d = data_[key(sc.id())];
328  d.eventID = e.id();
329  auto const t = chrono::duration<double>{now() - d.eventStart}.count();
330  timeSourceTable_.insert(
331  d.eventID.run(), d.eventID.subRun(), d.eventID.event(), sourceType_, t);
332  }
333 
334  void
335  TimeTracker::preEventProcessing(Event const& e [[maybe_unused]],
336  ScheduleContext const sc)
337  {
338  auto& d = data_[key(sc.id())];
339  assert(d.eventID == e.id());
340  d.eventStart = now();
341  }
342 
343  void
345  {
346  auto const& d = data_[key(sc.id())];
347  auto const t = chrono::duration<double>{now() - d.eventStart}.count();
348  timeEventTable_.insert(
349  d.eventID.run(), d.eventID.subRun(), d.eventID.event(), t);
350  }
351 
352  void
354  {
355  data_[key(mc)].eventID = data_[key(mc.scheduleID())].eventID;
356  data_[key(mc)].moduleStart = now();
357  }
358 
359  void
360  TimeTracker::recordTime(ModuleContext const& mc, string const& suffix)
361  {
362  auto const& d = data_[key(mc)];
363  auto const t = chrono::duration<double>{now() - d.moduleStart}.count();
364  timeModuleTable_.insert(d.eventID.run(),
365  d.eventID.subRun(),
366  d.eventID.event(),
367  mc.pathName(),
368  mc.moduleLabel(),
369  mc.moduleName() + suffix,
370  t);
371  }
372 
373  void
375  vector<Statistics> const& modules)
376  {
377  size_t width{30};
378  auto identifier_size = [](Statistics const& s) {
379  return s.path.size() + s.mod_label.size() + s.mod_type.size() +
380  2; // Don't forget the two ':'s.
381  };
382  cet::for_all(modules, [&identifier_size, &width](auto const& mod) {
383  width = max(width, identifier_size(mod));
384  });
385  ostringstream msgOss;
386  HorizontalRule const rule{width + 4 + 5 * 14 + 12};
387  msgOss << '\n'
388  << rule('=') << '\n'
389  << std::setw(width + 2) << std::left << "TimeTracker printout (sec)"
390  << boost::format(" %=12s ") % "Min"
391  << boost::format(" %=12s ") % "Avg"
392  << boost::format(" %=12s ") % "Max"
393  << boost::format(" %=12s ") % "Median"
394  << boost::format(" %=12s ") % "RMS"
395  << boost::format(" %=10s ") % "nEvts" << '\n';
396  msgOss << rule('=') << '\n';
397  if (evt.n == 0u) {
398  msgOss << "[ No processed events ]\n";
399  } else {
400  // N.B. setw(width) applies to the first field in
401  // ostream& operator<<(ostream&, Statistics const&).
402  msgOss << setw(width) << evt << '\n' << rule('-') << '\n';
403  for (auto const& mod : modules) {
404  msgOss << setw(width) << mod << '\n';
405  }
406  }
407  msgOss << rule('=');
408  mf::LogAbsolute("TimeTracker") << msgOss.str();
409  }
410 
411  bool
413  {
414  return timeSourceTable_.full() || timeEventTable_.full() ||
415  timeModuleTable_.full();
416  }
417 
418 } // namespace art
419 
TRandom r
Definition: spectrum.C:23
void logToDestination_(Statistics const &evt, vector< Statistics > const &modules)
void preEventReading(ScheduleContext)
tbb::concurrent_unordered_map< ConcurrentKey, PerScheduleData, ConcurrentKeyHasher > data_
auto const & pathName() const
Definition: ModuleContext.h:33
cet::sqlite::Ntuple< uint32_t, uint32_t, uint32_t, string, double > timeSource_t
void preEventProcessing(Event const &, ScheduleContext)
auto scheduleID() const
Definition: ModuleContext.h:28
STL namespace.
name_array< 4u > const timeEventColumnNames_
cet::sqlite::Ntuple< uint32_t, uint32_t, uint32_t, double > timeEvent_t
void postEventProcessing(Event const &, ScheduleContext)
void postSourceConstruction(ModuleDescription const &)
cet::sqlite::name_array< SIZE > name_array
std::string const & moduleName() const
auto const & moduleName() const
Definition: ModuleContext.h:48
void postEventReading(Event const &, ScheduleContext)
timeModule_t timeModuleTable_
#define DECLARE_ART_SERVICE(svc, scope)
Float_t d
Definition: plot.C:235
unique_ptr< cet::sqlite::Connection > const db_
name_array< 7u > const timeModuleColumnNames_
timeSource_t timeSourceTable_
steady_clock::time_point moduleStart
void recordTime(ModuleContext const &mc, string const &suffix)
name_array< 5u > const timeSourceColumnNames_
Stream & operator<<(Stream &&out, CallInfo_t const &info)
Helper operator to insert a call information in a stream with default options.
Definition: DebugUtils.h:389
#define DEFINE_ART_SERVICE(svc)
constexpr auto const & left(const_AssnsIter< L, R, D, Dir > const &a, const_AssnsIter< L, R, D, Dir > const &b)
Definition: AssnsIter.h:94
double mean(const std::vector< short > &wf, size_t start, size_t nsample)
Definition: UtilFunc.cxx:13
void startTime(ModuleContext const &mc)
bool anyTableFull_() const
Float_t sc
Definition: plot.C:23
auto const & moduleLabel() const
Definition: ModuleContext.h:43
static constexpr EventID invalidEvent() noexcept
Definition: EventID.h:202
Definition: MVAAlg.h:12
Char_t n[5]
TCEvent evt
Definition: DataStructs.cxx:8
Float_t e
Definition: plot.C:35
MaybeLogger_< ELseverityLevel::ELsev_severe, true > LogAbsolute
EventID id() const
Definition: Event.cc:23
cet::sqlite::Ntuple< uint32_t, uint32_t, uint32_t, string, string, string, double > timeModule_t