20 #include "hep_concurrency/WaitingTask.h" 22 #include "range/v3/view.hpp" 25 #include <type_traits> 38 views::filter([](
auto const& worker) {
return worker.isUnique(); });
54 for (
auto const& worker :
56 assert(sid == worker->scheduleID());
57 if (
auto owp = std::dynamic_pointer_cast<OutputWorker>(worker)) {
62 outputCallbacks.registerCallback(
70 worker.beginJob(resources);
85 error <<
"cet::exception caught in Schedule::endJob\n" 90 error <<
"Standard library exception caught in Schedule::endJob\n" 95 error <<
"Unknown exception caught in Schedule::endJob\n";
105 ow->selectProducts(tables);
113 worker.respondToOpenInputFile(fb);
121 worker.respondToCloseInputFile(fb);
129 worker.respondToOpenOutputFiles(fb);
137 worker.respondToCloseOutputFiles(fb);
145 return ow->fileIsOpen();
167 ow->setRunAuxiliaryRangeSetID(rangeSet);
194 ow->setSubRunAuxiliaryRangeSetID(rs);
202 ow->writeSubRun(srp);
226 <<
"an exception occurred during current event processing\n" 231 <<
"an exception occurred during current event processing\n";
240 WaitingTaskPtr
const finalizeEventTask,
242 : endPathExec_{endPathExec}
243 , finalizeEventTask_{finalizeEventTask}
250 auto const scheduleID = endPathExec_->sc_.id();
257 rethrow_exception(ex);
261 tmp <<
"an exception occurred during current event processing\n" <<
e;
264 <<
"end path processing terminate because of EXCEPTION";
270 <<
"end path processing terminate because of EXCEPTION";
275 endPathExec_->endPathInfo_.incrementPassedEventCount();
293 auto const sid =
sc_.
id();
299 make_waiting_task<PathsDoneTask>(
this, finalizeEventTask,
taskGroup_);
320 ow->writeEvent(ep, pc);
322 auto const& eid = ep.
eventID();
325 <<
"eid: " << eid.run() <<
", " << eid.subRun() <<
", " << eid.event();
351 if (!ow->fileIsOpen()) {
372 outputWorkersToOpen_.clear();
389 ow->setFileStatus(ofs);
398 if (atBoundary < ow->fileGranularity()) {
403 if (ow->requestsToCloseFile()) {
413 ow->incrementInputFileNumber();
#define TDEBUG_BEGIN_TASK_SI(LEVEL, SI)
EndPathExecutor *const endPathExec_
std::vector< Path > & paths()
bool outputsToClose() const
void seedRunRangeSet(RangeSetHandler const &)
#define TDEBUG_END_TASK_SI(LEVEL, SI)
PathsDoneTask(EndPathExecutor *const endPathExec, WaitingTaskPtr const finalizeEventTask, GlobalTaskGroup &taskGroup)
void writeSubRun(SubRunPrincipal &srp)
GlobalTaskGroup & taskGroup_
MaybeLogger_< ELseverityLevel::ELsev_error, false > LogError
void respondToOpenInputFile(FileBlock const &fb)
void respondToOpenOutputFiles(FileBlock const &fb)
std::map< std::string, std::shared_ptr< Worker > > & workers()
void selectProducts(ProductTables const &)
void operator()(exception_ptr const ex)
void closeAllOutputFiles()
void recordOutputClosureRequests(Granularity)
void respondToCloseOutputFiles(FileBlock const &fb)
void openSomeOutputFiles(FileBlock const &fb)
void setOutputFileStatus(OutputFileStatus)
decltype(auto) values(Coll &&coll)
Range-for loop helper iterating across the values of the specified collection.
std::atomic< OutputFileStatus > fileStatus_
std::set< OutputWorker * > outputWorkersToClose_
EventID const & eventID() const
void seedSubRunRangeSet(RangeSetHandler const &)
bool someOutputsOpen() const
bool isLastInSubRun() const
ScheduleContext const sc_
#define TDEBUG_FUNC_SI(LEVEL, SI)
std::unique_ptr< RangeSetHandler > subRunRangeSetHandler_
static auto end_path_spec()
void writeRun(RunPrincipal &rp)
void beginJob(detail::SharedResources const &resources)
bool outputsToOpen() const
void setRunAuxiliaryRangeSetID(RangeSet const &rs)
void setSubRunAuxiliaryRangeSetID(RangeSet const &rs)
void closeSomeOutputFiles()
std::set< OutputWorker * > outputWorkersToOpen_
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
#define TDEBUG_END_FUNC_SI(LEVEL, SI)
WaitingTaskPtr const finalizeEventTask_
void respondToCloseInputFile(FileBlock const &fb)
void may_run(hep::concurrency::WaitingTaskPtr task, std::exception_ptr ex_ptr={})
void writeEvent(EventPrincipal &)
void incrementPassedEventCount()
ActionTable const & actionTable_
#define TDEBUG_BEGIN_FUNC_SI(LEVEL, SI)
RangeSetHandler * clone() const
void process(Transition, Principal &)
GlobalTaskGroup & taskGroup_
void incrementInputFileNumber()
void process_event(hep::concurrency::WaitingTaskPtr finalizeEventTask, EventPrincipal &)
std::vector< OutputWorker * > outputWorkers_
std::unique_ptr< RangeSetHandler > runRangeSetHandler_
void incrementTotalEventCount()
cet::coded_exception< error, detail::translate > exception