Warning, file /acts/Examples/Io/HepMC3/src/HepMC3Writer.cpp was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005
0006
0007
0008
0009 #include "ActsExamples/Io/HepMC3/HepMC3Writer.hpp"
0010
0011 #include "ActsExamples/Framework/ProcessCode.hpp"
0012 #include "ActsExamples/Io/HepMC3/HepMC3Metadata.hpp"
0013 #include "ActsExamples/Utilities/Paths.hpp"
0014
0015 #include <filesystem>
0016 #include <stdexcept>
0017
0018 #include <HepMC3/Version.h>
0019 #include <HepMC3/WriterAscii.h>
0020 #include <boost/algorithm/string/join.hpp>
0021
0022 namespace ActsExamples {
0023
0024 HepMC3Writer::HepMC3Writer(const Config& config, Acts::Logging::Level level)
0025 : WriterT(config.inputEvent, "HepMC3Writer", level),
0026 m_cfg(config),
0027 m_queueSemaphore{static_cast<long>(m_cfg.maxEventsPending + 1)} {
0028 if (m_cfg.outputPath.empty()) {
0029 throw std::invalid_argument("Missing output file path");
0030 }
0031
0032
0033 auto [compression, basePath] = resolveCompression(m_cfg.outputPath);
0034 m_compression = compression;
0035
0036
0037 if (std::ranges::none_of(HepMC3Util::availableCompressionModes(),
0038 [this](HepMC3Util::Compression c) {
0039 return c == this->m_compression;
0040 })) {
0041 std::stringstream ss;
0042 ss << "Unsupported compression mode: " << m_compression;
0043 throw std::invalid_argument(ss.str());
0044 }
0045
0046
0047 auto format = HepMC3Util::formatFromFilename(basePath);
0048 if (format == HepMC3Util::Format::root) {
0049 m_actualOutputPath = basePath;
0050 } else {
0051 m_actualOutputPath =
0052 basePath.string() +
0053 std::string{HepMC3Util::compressionExtension(m_compression)};
0054 }
0055
0056
0057 auto absolute = std::filesystem::absolute(m_actualOutputPath);
0058 if (std::filesystem::exists(absolute) &&
0059 std::filesystem::is_directory(absolute)) {
0060 throw std::invalid_argument("Output path is a directory: " +
0061 absolute.string());
0062 }
0063
0064 if (!std::filesystem::exists(absolute.parent_path())) {
0065 throw std::invalid_argument("Directory to write into does not exist: " +
0066 absolute.parent_path().string());
0067 }
0068
0069
0070 m_writer =
0071 HepMC3Util::createWriter(m_actualOutputPath, format, m_compression);
0072 }
0073
0074 HepMC3Writer::~HepMC3Writer() = default;
0075
0076 std::pair<HepMC3Util::Compression, std::filesystem::path>
0077 HepMC3Writer::resolveCompression(const std::filesystem::path& path) const {
0078 using Compression = HepMC3Util::Compression;
0079
0080
0081 Compression pathCompression = HepMC3Util::compressionFromFilename(path);
0082
0083
0084 std::filesystem::path basePath = path;
0085 std::string_view ext = HepMC3Util::compressionExtension(pathCompression);
0086 if (!ext.empty() && path.string().ends_with(ext)) {
0087
0088 basePath = path.string().substr(0, path.string().size() - ext.size());
0089 }
0090
0091
0092 Compression resolvedCompression = Compression::none;
0093
0094 if (m_cfg.compression.has_value()) {
0095
0096 resolvedCompression = m_cfg.compression.value();
0097
0098
0099 if (pathCompression != Compression::none &&
0100 pathCompression != resolvedCompression) {
0101 std::stringstream ss;
0102 ss << "Compression mismatch: config specifies " << resolvedCompression
0103 << ", but path '" << path << "' implies " << pathCompression;
0104 throw std::invalid_argument(ss.str());
0105 }
0106 } else {
0107
0108 resolvedCompression = pathCompression;
0109 }
0110
0111 return {resolvedCompression, basePath};
0112 }
0113
0114 ProcessCode HepMC3Writer::beginEvent(std::size_t threadId) {
0115 ACTS_VERBOSE("Begin event, next_event=" << m_nextEvent);
0116 if (!m_cfg.writeEventsInOrder) {
0117
0118 return ProcessCode::SUCCESS;
0119 }
0120
0121 ACTS_DEBUG("thread=" << threadId << ", next_event=" << m_nextEvent
0122 << " waiting for semaphore");
0123 m_waiting++;
0124 std::chrono::seconds timeout{10};
0125 if (!m_queueSemaphore.try_acquire_for(timeout)) {
0126 ACTS_ERROR("thread=" << threadId << ", next_event=" << m_nextEvent
0127 << " failed to acquire semaphore after "
0128 << timeout.count() << "s");
0129 return ProcessCode::ABORT;
0130 }
0131 m_waiting--;
0132 ACTS_DEBUG("thread=" << threadId << ", next_event=" << m_nextEvent
0133 << " have semaphore");
0134
0135 return ProcessCode::SUCCESS;
0136 }
0137
0138 ProcessCode HepMC3Writer::writeT(
0139 const AlgorithmContext& ctx,
0140 const std::shared_ptr<HepMC3::GenEvent>& event) {
0141 ACTS_VERBOSE("Write: event_nr=" << ctx.eventNumber
0142 << ", thread=" << ctx.threadId);
0143
0144 if (!m_cfg.writeEventsInOrder) {
0145 std::scoped_lock lock{m_mutex};
0146
0147 m_writer->write_event(*event);
0148 if (m_writer->failed()) {
0149 ACTS_ERROR("Failed to write event number: " << ctx.eventNumber);
0150 return ProcessCode::ABORT;
0151 }
0152 m_eventsWritten++;
0153 return ProcessCode::SUCCESS;
0154 }
0155
0156 std::scoped_lock lock{m_mutex};
0157
0158 auto printQueue = [&]() {
0159 ACTS_VERBOSE("queue=[" << [&]() {
0160 std::vector<std::string> numbers;
0161 numbers.reserve(m_eventQueue.size());
0162 std::ranges::transform(
0163 m_eventQueue, std::back_inserter(numbers),
0164 [](const auto& pair) { return std::to_string(pair.first); });
0165
0166 return boost::algorithm::join(numbers, ", ");
0167 }() << "]");
0168 };
0169
0170 if (ctx.eventNumber == m_nextEvent) {
0171 ACTS_DEBUG("event_nr=" << ctx.eventNumber
0172 << " is the next event -> writing");
0173
0174
0175 m_writer->write_event(*event);
0176 if (m_writer->failed()) {
0177 ACTS_ERROR("Failed to write event number: " << ctx.eventNumber);
0178 return ProcessCode::ABORT;
0179 }
0180 m_nextEvent++;
0181
0182 std::size_t nWritten = 1;
0183
0184 printQueue();
0185
0186 while (!m_eventQueue.empty() &&
0187 m_eventQueue.front().first == static_cast<long long>(m_nextEvent)) {
0188 auto [nextEventNumber, nextEvent] = std::move(m_eventQueue.front());
0189 ACTS_VERBOSE("Writing event number: " << nextEventNumber);
0190 m_eventQueue.erase(m_eventQueue.begin());
0191
0192 m_writer->write_event(*nextEvent);
0193 if (m_writer->failed()) {
0194 ACTS_ERROR("Failed to write event number: " << nextEventNumber);
0195 return ProcessCode::ABORT;
0196 }
0197 m_nextEvent++;
0198 nWritten++;
0199 }
0200
0201 m_eventsWritten += nWritten;
0202
0203 ACTS_VERBOSE("Wrote " << nWritten << " events, next_event=" << m_nextEvent
0204 << ", thread=" << ctx.threadId);
0205 m_queueSemaphore.release(nWritten);
0206 ACTS_VERBOSE("thread=" << ctx.threadId << ", released n=" << nWritten
0207 << ", waiting=" << m_waiting);
0208
0209 } else {
0210 ACTS_DEBUG("event_nr=" << ctx.eventNumber
0211 << " is not the next event -> queueing");
0212
0213 ACTS_VERBOSE(
0214 "Finding insert location for event number: " << ctx.eventNumber);
0215 auto it = std::ranges::upper_bound(m_eventQueue, ctx.eventNumber, {},
0216 [](const auto& v) { return v.first; });
0217 if (it == m_eventQueue.end()) {
0218 ACTS_VERBOSE("Insert location for " << ctx.eventNumber
0219 << " is at the end of the queue");
0220 } else {
0221 ACTS_VERBOSE("Insert location for "
0222 << ctx.eventNumber
0223 << " is before event number: " << it->first);
0224 }
0225
0226 m_eventQueue.emplace(it, ctx.eventNumber, event);
0227 printQueue();
0228 m_maxEventQueueSize = std::max(m_maxEventQueueSize, m_eventQueue.size());
0229 }
0230
0231 return ProcessCode::SUCCESS;
0232 }
0233
0234 ProcessCode HepMC3Writer::finalize() {
0235 ACTS_VERBOSE("Finalizing HepMC3Writer");
0236 if (m_writer) {
0237 m_writer->close();
0238 }
0239 ACTS_DEBUG("max_queue_size=" << m_maxEventQueueSize
0240 << " limit=" << m_cfg.maxEventsPending);
0241
0242
0243 ACTS_DEBUG("Writing sidecar metadata for " << m_actualOutputPath << " with "
0244 << m_eventsWritten << " events");
0245 if (!HepMC3Metadata::writeSidecar(
0246 m_actualOutputPath,
0247 HepMC3Metadata::HepMC3Metadata{.numEvents = m_eventsWritten},
0248 logger())) {
0249 ACTS_WARNING("Failed to write sidecar metadata file for "
0250 << m_actualOutputPath);
0251 }
0252
0253 return ProcessCode::SUCCESS;
0254 }
0255
0256 }