Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-12 07:52:26

0001 // This file is part of the ACTS project.
0002 //
0003 // Copyright (C) 2016 CERN for the benefit of the ACTS project
0004 //
0005 // This Source Code Form is subject to the terms of the Mozilla Public
0006 // License, v. 2.0. If a copy of the MPL was not distributed with this
0007 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
0008 
0009 #include "ActsExamples/Io/HepMC3/HepMC3Writer.hpp"
0010 
0011 #include "ActsExamples/Framework/ProcessCode.hpp"
0012 #include "ActsExamples/Utilities/Paths.hpp"
0013 
0014 #include <filesystem>
0015 #include <stdexcept>
0016 
0017 #include <HepMC3/Version.h>
0018 #include <HepMC3/WriterAscii.h>
0019 
0020 #if HEPMC3_VERSION_CODE == 3002007
0021 #include "./CompressedIO.h"
0022 #endif
0023 
0024 #ifdef HEPMC3_USE_COMPRESSION
0025 #include <HepMC3/WriterGZ.h>
0026 #endif
0027 
0028 #ifdef ACTS_HEPMC3_ROOT_SUPPORT
0029 #include <HepMC3/WriterRootTree.h>
0030 #endif
0031 
0032 #include <boost/algorithm/string/join.hpp>
0033 
0034 namespace ActsExamples {
0035 
0036 HepMC3Writer::HepMC3Writer(const Config& config, Acts::Logging::Level level)
0037     : WriterT(config.inputEvent, "HepMC3Writer", level),
0038       m_cfg(config),
0039       m_queueSemaphore{static_cast<long>(m_cfg.maxEventsPending + 1)} {
0040   if (m_cfg.outputPath.empty()) {
0041     throw std::invalid_argument("Missing output file path");
0042   }
0043 
0044   if (std::ranges::none_of(HepMC3Util::availableCompressionModes(),
0045                            [this](HepMC3Util::Compression c) {
0046                              return c == this->m_cfg.compression;
0047                            })) {
0048     std::stringstream ss;
0049     ss << "Unsupported compression mode: " << m_cfg.compression;
0050     throw std::invalid_argument(ss.str());
0051   }
0052 
0053   if (!m_cfg.perEvent) {
0054     auto absolute = std::filesystem::absolute(m_cfg.outputPath);
0055     if (std::filesystem::exists(absolute) &&
0056         std::filesystem::is_directory(absolute)) {
0057       throw std::invalid_argument("Output path is a directory: " +
0058                                   absolute.string());
0059     }
0060 
0061     if (!std::filesystem::exists(absolute.parent_path())) {
0062       throw std::invalid_argument("Directory to write into does not exist: " +
0063                                   absolute.parent_path().string());
0064     }
0065     // Create a single file writer
0066     m_writer = createWriter(m_cfg.outputPath);
0067   }
0068 
0069   if (m_cfg.perEvent &&
0070       HepMC3Util::formatFromFilename(m_cfg.outputPath.string()) ==
0071           HepMC3Util::Format::root) {
0072     ACTS_WARNING(
0073         "Per-event output is enabled and the output format is ROOT. This is "
0074         "likely not what you want");
0075   }
0076 }
0077 
0078 std::unique_ptr<HepMC3::Writer> HepMC3Writer::createWriter(
0079     const std::filesystem::path& target) {
0080   ACTS_DEBUG("Creating writer for: " << target);
0081 
0082   auto format = HepMC3Util::formatFromFilename(target.string());
0083 
0084   if (format == HepMC3Util::Format::root) {
0085     ACTS_DEBUG("~> Root");
0086     if (m_cfg.compression != HepMC3Util::Compression::none) {
0087       ACTS_ERROR("~~> Compression not supported for Root");
0088       throw std::invalid_argument("Compression not supported for Root");
0089     }
0090 #ifdef ACTS_HEPMC3_ROOT_SUPPORT
0091     return std::make_unique<HepMC3::WriterRootTree>(target);
0092 #else
0093     ACTS_ERROR("~~> Root support not enabled in HepMC3");
0094     throw std::runtime_error("Root support not enabled in HepMC3");
0095 #endif
0096   } else {
0097     std::filesystem::path path =
0098         target.string() +
0099         std::string{HepMC3Util::compressionExtension(m_cfg.compression)};
0100     ACTS_DEBUG("~> Ascii (=> " << path << ")");
0101 
0102     switch (m_cfg.compression) {
0103       case HepMC3Util::Compression::none:
0104         ACTS_DEBUG("~~> uncompressed");
0105         return std::make_unique<HepMC3::WriterAscii>(path);
0106 #ifdef HEPMC3_USE_COMPRESSION
0107       case HepMC3Util::Compression::zlib:
0108         ACTS_DEBUG("~~> GZ");
0109         return std::make_unique<
0110             HepMC3::WriterGZ<HepMC3::WriterAscii, HepMC3::Compression::z>>(
0111             path);
0112       case HepMC3Util::Compression::lzma:
0113         ACTS_DEBUG("~~> LZMA");
0114         return std::make_unique<
0115             HepMC3::WriterGZ<HepMC3::WriterAscii, HepMC3::Compression::lzma>>(
0116             path);
0117       case HepMC3Util::Compression::bzip2:
0118         ACTS_DEBUG("~~> BZ2");
0119         return std::make_unique<
0120             HepMC3::WriterGZ<HepMC3::WriterAscii, HepMC3::Compression::bz2>>(
0121             path);
0122       case HepMC3Util::Compression::zstd:
0123         ACTS_DEBUG("~~> ZSTD");
0124         return std::make_unique<
0125             HepMC3::WriterGZ<HepMC3::WriterAscii, HepMC3::Compression::zstd>>(
0126             path);
0127 #endif
0128       default:
0129         throw std::invalid_argument{"Unknown compression value"};
0130     }
0131   }
0132 }
0133 
0134 HepMC3Writer::~HepMC3Writer() = default;
0135 
0136 ProcessCode HepMC3Writer::beginEvent(std::size_t threadId) {
0137   ACTS_VERBOSE("Begin event, next_event=" << m_nextEvent);
0138   if (!m_cfg.writeEventsInOrder) {
0139     // Nothing to do if we don't write in order
0140     return ProcessCode::SUCCESS;
0141   }
0142 
0143   ACTS_DEBUG("thread=" << threadId << ", next_event=" << m_nextEvent
0144                        << " waiting for semaphore");
0145   m_waiting++;
0146   std::chrono::seconds timeout{10};
0147   if (!m_queueSemaphore.try_acquire_for(timeout)) {
0148     ACTS_ERROR("thread=" << threadId << ", next_event=" << m_nextEvent
0149                          << " failed to acquire semaphore after "
0150                          << timeout.count() << "s");
0151     return ProcessCode::ABORT;
0152   }
0153   m_waiting--;
0154   ACTS_DEBUG("thread=" << threadId << ", next_event=" << m_nextEvent
0155                        << " have semaphore");
0156 
0157   return ProcessCode::SUCCESS;
0158 }
0159 
0160 ProcessCode HepMC3Writer::writeT(
0161     const AlgorithmContext& ctx,
0162     const std::shared_ptr<HepMC3::GenEvent>& event) {
0163   ACTS_VERBOSE("Write: event_nr=" << ctx.eventNumber
0164                                   << ", thread=" << ctx.threadId);
0165 
0166   if (m_cfg.perEvent) {
0167     std::filesystem::path perEventFile =
0168         perEventFilepath(m_cfg.outputPath.parent_path(),
0169                          m_cfg.outputPath.filename().string(), ctx.eventNumber);
0170 
0171     ACTS_VERBOSE("Writing per-event file " << perEventFile);
0172     auto writer = createWriter(perEventFile);
0173 
0174     writer->write_event(*event);
0175     auto result = ProcessCode::SUCCESS;
0176     if (writer->failed()) {
0177       ACTS_ERROR("Failed to write event number: " << ctx.eventNumber);
0178       result = ProcessCode::ABORT;
0179     }
0180     writer->close();
0181     return result;
0182   }
0183 
0184   if (!m_cfg.writeEventsInOrder) {
0185     std::scoped_lock lock{m_mutex};
0186     // Unconditionally write events in whatever order they come in
0187     m_writer->write_event(*event);
0188     if (m_writer->failed()) {
0189       ACTS_ERROR("Failed to write event number: " << ctx.eventNumber);
0190       return ProcessCode::ABORT;
0191     }
0192     return ProcessCode::SUCCESS;
0193   }
0194 
0195   std::scoped_lock lock{m_mutex};
0196 
0197   auto printQueue = [&]() {
0198     ACTS_VERBOSE("queue=[" << [&]() {
0199       std::vector<std::string> numbers;
0200       numbers.reserve(m_eventQueue.size());
0201       std::ranges::transform(
0202           m_eventQueue, std::back_inserter(numbers),
0203           [](const auto& pair) { return std::to_string(pair.first); });
0204 
0205       return boost::algorithm::join(numbers, ", ");
0206     }() << "]");
0207   };
0208 
0209   if (ctx.eventNumber == m_nextEvent) {
0210     ACTS_DEBUG("event_nr=" << ctx.eventNumber
0211                            << " is the next event -> writing");
0212 
0213     // write
0214     m_writer->write_event(*event);
0215     if (m_writer->failed()) {
0216       ACTS_ERROR("Failed to write event number: " << ctx.eventNumber);
0217       return ProcessCode::ABORT;
0218     }
0219     m_nextEvent++;
0220 
0221     std::size_t nWritten = 1;
0222 
0223     printQueue();
0224 
0225     while (!m_eventQueue.empty() &&
0226            m_eventQueue.front().first == static_cast<long long>(m_nextEvent)) {
0227       auto [nextEventNumber, nextEvent] = std::move(m_eventQueue.front());
0228       ACTS_VERBOSE("Writing event number: " << nextEventNumber);
0229       m_eventQueue.erase(m_eventQueue.begin());
0230 
0231       m_writer->write_event(*nextEvent);
0232       if (m_writer->failed()) {
0233         ACTS_ERROR("Failed to write event number: " << nextEventNumber);
0234         return ProcessCode::ABORT;
0235       }
0236       m_nextEvent++;
0237       nWritten++;
0238     }
0239 
0240     ACTS_VERBOSE("Wrote " << nWritten << " events, next_event=" << m_nextEvent
0241                           << ", thread=" << ctx.threadId);
0242     m_queueSemaphore.release(nWritten);
0243     ACTS_VERBOSE("thread=" << ctx.threadId << ", released n=" << nWritten
0244                            << ", waiting=" << m_waiting);
0245 
0246   } else {
0247     ACTS_DEBUG("event_nr=" << ctx.eventNumber
0248                            << " is not the next event -> queueing");
0249 
0250     ACTS_VERBOSE(
0251         "Finding insert location for event number: " << ctx.eventNumber);
0252     auto it = std::ranges::upper_bound(m_eventQueue, ctx.eventNumber, {},
0253                                        [](const auto& v) { return v.first; });
0254     if (it == m_eventQueue.end()) {
0255       ACTS_VERBOSE("Insert location for " << ctx.eventNumber
0256                                           << " is at the end of the queue");
0257     } else {
0258       ACTS_VERBOSE("Insert location for "
0259                    << ctx.eventNumber
0260                    << " is before event number: " << it->first);
0261     }
0262 
0263     m_eventQueue.emplace(it, ctx.eventNumber, event);
0264     printQueue();
0265     m_maxEventQueueSize = std::max(m_maxEventQueueSize, m_eventQueue.size());
0266   }
0267 
0268   return ProcessCode::SUCCESS;
0269 }
0270 
0271 ProcessCode HepMC3Writer::finalize() {
0272   ACTS_VERBOSE("Finalizing HepMC3Writer");
0273   if (m_writer) {
0274     m_writer->close();
0275   }
0276   ACTS_DEBUG("max_queue_size=" << m_maxEventQueueSize
0277                                << " limit=" << m_cfg.maxEventsPending);
0278   return ProcessCode::SUCCESS;
0279 }
0280 
0281 }  // namespace ActsExamples