File indexing completed on 2025-07-12 07:52:26
0001
0002
0003
0004
0005
0006
0007
0008
0009 #include "ActsExamples/Io/HepMC3/HepMC3Writer.hpp"
0010
0011 #include "ActsExamples/Framework/ProcessCode.hpp"
0012 #include "ActsExamples/Utilities/Paths.hpp"
0013
0014 #include <filesystem>
0015 #include <stdexcept>
0016
0017 #include <HepMC3/Version.h>
0018 #include <HepMC3/WriterAscii.h>
0019
0020 #if HEPMC3_VERSION_CODE == 3002007
0021 #include "./CompressedIO.h"
0022 #endif
0023
0024 #ifdef HEPMC3_USE_COMPRESSION
0025 #include <HepMC3/WriterGZ.h>
0026 #endif
0027
0028 #ifdef ACTS_HEPMC3_ROOT_SUPPORT
0029 #include <HepMC3/WriterRootTree.h>
0030 #endif
0031
0032 #include <boost/algorithm/string/join.hpp>
0033
0034 namespace ActsExamples {
0035
0036 HepMC3Writer::HepMC3Writer(const Config& config, Acts::Logging::Level level)
0037 : WriterT(config.inputEvent, "HepMC3Writer", level),
0038 m_cfg(config),
0039 m_queueSemaphore{static_cast<long>(m_cfg.maxEventsPending + 1)} {
0040 if (m_cfg.outputPath.empty()) {
0041 throw std::invalid_argument("Missing output file path");
0042 }
0043
0044 if (std::ranges::none_of(HepMC3Util::availableCompressionModes(),
0045 [this](HepMC3Util::Compression c) {
0046 return c == this->m_cfg.compression;
0047 })) {
0048 std::stringstream ss;
0049 ss << "Unsupported compression mode: " << m_cfg.compression;
0050 throw std::invalid_argument(ss.str());
0051 }
0052
0053 if (!m_cfg.perEvent) {
0054 auto absolute = std::filesystem::absolute(m_cfg.outputPath);
0055 if (std::filesystem::exists(absolute) &&
0056 std::filesystem::is_directory(absolute)) {
0057 throw std::invalid_argument("Output path is a directory: " +
0058 absolute.string());
0059 }
0060
0061 if (!std::filesystem::exists(absolute.parent_path())) {
0062 throw std::invalid_argument("Directory to write into does not exist: " +
0063 absolute.parent_path().string());
0064 }
0065
0066 m_writer = createWriter(m_cfg.outputPath);
0067 }
0068
0069 if (m_cfg.perEvent &&
0070 HepMC3Util::formatFromFilename(m_cfg.outputPath.string()) ==
0071 HepMC3Util::Format::root) {
0072 ACTS_WARNING(
0073 "Per-event output is enabled and the output format is ROOT. This is "
0074 "likely not what you want");
0075 }
0076 }
0077
0078 std::unique_ptr<HepMC3::Writer> HepMC3Writer::createWriter(
0079 const std::filesystem::path& target) {
0080 ACTS_DEBUG("Creating writer for: " << target);
0081
0082 auto format = HepMC3Util::formatFromFilename(target.string());
0083
0084 if (format == HepMC3Util::Format::root) {
0085 ACTS_DEBUG("~> Root");
0086 if (m_cfg.compression != HepMC3Util::Compression::none) {
0087 ACTS_ERROR("~~> Compression not supported for Root");
0088 throw std::invalid_argument("Compression not supported for Root");
0089 }
0090 #ifdef ACTS_HEPMC3_ROOT_SUPPORT
0091 return std::make_unique<HepMC3::WriterRootTree>(target);
0092 #else
0093 ACTS_ERROR("~~> Root support not enabled in HepMC3");
0094 throw std::runtime_error("Root support not enabled in HepMC3");
0095 #endif
0096 } else {
0097 std::filesystem::path path =
0098 target.string() +
0099 std::string{HepMC3Util::compressionExtension(m_cfg.compression)};
0100 ACTS_DEBUG("~> Ascii (=> " << path << ")");
0101
0102 switch (m_cfg.compression) {
0103 case HepMC3Util::Compression::none:
0104 ACTS_DEBUG("~~> uncompressed");
0105 return std::make_unique<HepMC3::WriterAscii>(path);
0106 #ifdef HEPMC3_USE_COMPRESSION
0107 case HepMC3Util::Compression::zlib:
0108 ACTS_DEBUG("~~> GZ");
0109 return std::make_unique<
0110 HepMC3::WriterGZ<HepMC3::WriterAscii, HepMC3::Compression::z>>(
0111 path);
0112 case HepMC3Util::Compression::lzma:
0113 ACTS_DEBUG("~~> LZMA");
0114 return std::make_unique<
0115 HepMC3::WriterGZ<HepMC3::WriterAscii, HepMC3::Compression::lzma>>(
0116 path);
0117 case HepMC3Util::Compression::bzip2:
0118 ACTS_DEBUG("~~> BZ2");
0119 return std::make_unique<
0120 HepMC3::WriterGZ<HepMC3::WriterAscii, HepMC3::Compression::bz2>>(
0121 path);
0122 case HepMC3Util::Compression::zstd:
0123 ACTS_DEBUG("~~> ZSTD");
0124 return std::make_unique<
0125 HepMC3::WriterGZ<HepMC3::WriterAscii, HepMC3::Compression::zstd>>(
0126 path);
0127 #endif
0128 default:
0129 throw std::invalid_argument{"Unknown compression value"};
0130 }
0131 }
0132 }
0133
0134 HepMC3Writer::~HepMC3Writer() = default;
0135
0136 ProcessCode HepMC3Writer::beginEvent(std::size_t threadId) {
0137 ACTS_VERBOSE("Begin event, next_event=" << m_nextEvent);
0138 if (!m_cfg.writeEventsInOrder) {
0139
0140 return ProcessCode::SUCCESS;
0141 }
0142
0143 ACTS_DEBUG("thread=" << threadId << ", next_event=" << m_nextEvent
0144 << " waiting for semaphore");
0145 m_waiting++;
0146 std::chrono::seconds timeout{10};
0147 if (!m_queueSemaphore.try_acquire_for(timeout)) {
0148 ACTS_ERROR("thread=" << threadId << ", next_event=" << m_nextEvent
0149 << " failed to acquire semaphore after "
0150 << timeout.count() << "s");
0151 return ProcessCode::ABORT;
0152 }
0153 m_waiting--;
0154 ACTS_DEBUG("thread=" << threadId << ", next_event=" << m_nextEvent
0155 << " have semaphore");
0156
0157 return ProcessCode::SUCCESS;
0158 }
0159
0160 ProcessCode HepMC3Writer::writeT(
0161 const AlgorithmContext& ctx,
0162 const std::shared_ptr<HepMC3::GenEvent>& event) {
0163 ACTS_VERBOSE("Write: event_nr=" << ctx.eventNumber
0164 << ", thread=" << ctx.threadId);
0165
0166 if (m_cfg.perEvent) {
0167 std::filesystem::path perEventFile =
0168 perEventFilepath(m_cfg.outputPath.parent_path(),
0169 m_cfg.outputPath.filename().string(), ctx.eventNumber);
0170
0171 ACTS_VERBOSE("Writing per-event file " << perEventFile);
0172 auto writer = createWriter(perEventFile);
0173
0174 writer->write_event(*event);
0175 auto result = ProcessCode::SUCCESS;
0176 if (writer->failed()) {
0177 ACTS_ERROR("Failed to write event number: " << ctx.eventNumber);
0178 result = ProcessCode::ABORT;
0179 }
0180 writer->close();
0181 return result;
0182 }
0183
0184 if (!m_cfg.writeEventsInOrder) {
0185 std::scoped_lock lock{m_mutex};
0186
0187 m_writer->write_event(*event);
0188 if (m_writer->failed()) {
0189 ACTS_ERROR("Failed to write event number: " << ctx.eventNumber);
0190 return ProcessCode::ABORT;
0191 }
0192 return ProcessCode::SUCCESS;
0193 }
0194
0195 std::scoped_lock lock{m_mutex};
0196
0197 auto printQueue = [&]() {
0198 ACTS_VERBOSE("queue=[" << [&]() {
0199 std::vector<std::string> numbers;
0200 numbers.reserve(m_eventQueue.size());
0201 std::ranges::transform(
0202 m_eventQueue, std::back_inserter(numbers),
0203 [](const auto& pair) { return std::to_string(pair.first); });
0204
0205 return boost::algorithm::join(numbers, ", ");
0206 }() << "]");
0207 };
0208
0209 if (ctx.eventNumber == m_nextEvent) {
0210 ACTS_DEBUG("event_nr=" << ctx.eventNumber
0211 << " is the next event -> writing");
0212
0213
0214 m_writer->write_event(*event);
0215 if (m_writer->failed()) {
0216 ACTS_ERROR("Failed to write event number: " << ctx.eventNumber);
0217 return ProcessCode::ABORT;
0218 }
0219 m_nextEvent++;
0220
0221 std::size_t nWritten = 1;
0222
0223 printQueue();
0224
0225 while (!m_eventQueue.empty() &&
0226 m_eventQueue.front().first == static_cast<long long>(m_nextEvent)) {
0227 auto [nextEventNumber, nextEvent] = std::move(m_eventQueue.front());
0228 ACTS_VERBOSE("Writing event number: " << nextEventNumber);
0229 m_eventQueue.erase(m_eventQueue.begin());
0230
0231 m_writer->write_event(*nextEvent);
0232 if (m_writer->failed()) {
0233 ACTS_ERROR("Failed to write event number: " << nextEventNumber);
0234 return ProcessCode::ABORT;
0235 }
0236 m_nextEvent++;
0237 nWritten++;
0238 }
0239
0240 ACTS_VERBOSE("Wrote " << nWritten << " events, next_event=" << m_nextEvent
0241 << ", thread=" << ctx.threadId);
0242 m_queueSemaphore.release(nWritten);
0243 ACTS_VERBOSE("thread=" << ctx.threadId << ", released n=" << nWritten
0244 << ", waiting=" << m_waiting);
0245
0246 } else {
0247 ACTS_DEBUG("event_nr=" << ctx.eventNumber
0248 << " is not the next event -> queueing");
0249
0250 ACTS_VERBOSE(
0251 "Finding insert location for event number: " << ctx.eventNumber);
0252 auto it = std::ranges::upper_bound(m_eventQueue, ctx.eventNumber, {},
0253 [](const auto& v) { return v.first; });
0254 if (it == m_eventQueue.end()) {
0255 ACTS_VERBOSE("Insert location for " << ctx.eventNumber
0256 << " is at the end of the queue");
0257 } else {
0258 ACTS_VERBOSE("Insert location for "
0259 << ctx.eventNumber
0260 << " is before event number: " << it->first);
0261 }
0262
0263 m_eventQueue.emplace(it, ctx.eventNumber, event);
0264 printQueue();
0265 m_maxEventQueueSize = std::max(m_maxEventQueueSize, m_eventQueue.size());
0266 }
0267
0268 return ProcessCode::SUCCESS;
0269 }
0270
0271 ProcessCode HepMC3Writer::finalize() {
0272 ACTS_VERBOSE("Finalizing HepMC3Writer");
0273 if (m_writer) {
0274 m_writer->close();
0275 }
0276 ACTS_DEBUG("max_queue_size=" << m_maxEventQueueSize
0277 << " limit=" << m_cfg.maxEventsPending);
0278 return ProcessCode::SUCCESS;
0279 }
0280
0281 }