Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /acts/Examples/Io/HepMC3/src/HepMC3Writer.cpp was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 // This file is part of the ACTS project.
0002 //
0003 // Copyright (C) 2016 CERN for the benefit of the ACTS project
0004 //
0005 // This Source Code Form is subject to the terms of the Mozilla Public
0006 // License, v. 2.0. If a copy of the MPL was not distributed with this
0007 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
0008 
0009 #include "ActsExamples/Io/HepMC3/HepMC3Writer.hpp"
0010 
0011 #include "ActsExamples/Framework/ProcessCode.hpp"
0012 #include "ActsExamples/Io/HepMC3/HepMC3Metadata.hpp"
0013 #include "ActsExamples/Utilities/Paths.hpp"
0014 
0015 #include <filesystem>
0016 #include <stdexcept>
0017 
0018 #include <HepMC3/Version.h>
0019 #include <HepMC3/WriterAscii.h>
0020 #include <boost/algorithm/string/join.hpp>
0021 
0022 namespace ActsExamples {
0023 
0024 HepMC3Writer::HepMC3Writer(const Config& config, Acts::Logging::Level level)
0025     : WriterT(config.inputEvent, "HepMC3Writer", level),
0026       m_cfg(config),
0027       m_queueSemaphore{static_cast<long>(m_cfg.maxEventsPending + 1)} {
0028   if (m_cfg.outputPath.empty()) {
0029     throw std::invalid_argument("Missing output file path");
0030   }
0031 
0032   // Resolve compression from config and/or path
0033   auto [compression, basePath] = resolveCompression(m_cfg.outputPath);
0034   m_compression = compression;
0035 
0036   // Validate the resolved compression is supported
0037   if (std::ranges::none_of(HepMC3Util::availableCompressionModes(),
0038                            [this](HepMC3Util::Compression c) {
0039                              return c == this->m_compression;
0040                            })) {
0041     std::stringstream ss;
0042     ss << "Unsupported compression mode: " << m_compression;
0043     throw std::invalid_argument(ss.str());
0044   }
0045 
0046   // Compute the actual output path with compression extension
0047   auto format = HepMC3Util::formatFromFilename(basePath);
0048   if (format == HepMC3Util::Format::root) {
0049     m_actualOutputPath = basePath;
0050   } else {
0051     m_actualOutputPath =
0052         basePath.string() +
0053         std::string{HepMC3Util::compressionExtension(m_compression)};
0054   }
0055 
0056   // Validate output path
0057   auto absolute = std::filesystem::absolute(m_actualOutputPath);
0058   if (std::filesystem::exists(absolute) &&
0059       std::filesystem::is_directory(absolute)) {
0060     throw std::invalid_argument("Output path is a directory: " +
0061                                 absolute.string());
0062   }
0063 
0064   if (!std::filesystem::exists(absolute.parent_path())) {
0065     throw std::invalid_argument("Directory to write into does not exist: " +
0066                                 absolute.parent_path().string());
0067   }
0068 
0069   // Create the writer
0070   m_writer =
0071       HepMC3Util::createWriter(m_actualOutputPath, format, m_compression);
0072 }
0073 
0074 HepMC3Writer::~HepMC3Writer() = default;
0075 
0076 std::pair<HepMC3Util::Compression, std::filesystem::path>
0077 HepMC3Writer::resolveCompression(const std::filesystem::path& path) const {
0078   using Compression = HepMC3Util::Compression;
0079 
0080   // Deduce compression from the path
0081   Compression pathCompression = HepMC3Util::compressionFromFilename(path);
0082 
0083   // Get the base path without compression extension
0084   std::filesystem::path basePath = path;
0085   std::string_view ext = HepMC3Util::compressionExtension(pathCompression);
0086   if (!ext.empty() && path.string().ends_with(ext)) {
0087     // Remove the compression extension
0088     basePath = path.string().substr(0, path.string().size() - ext.size());
0089   }
0090 
0091   // Resolve the compression mode
0092   Compression resolvedCompression = Compression::none;
0093 
0094   if (m_cfg.compression.has_value()) {
0095     // Compression explicitly set in config
0096     resolvedCompression = m_cfg.compression.value();
0097 
0098     // Check consistency if path also specifies compression
0099     if (pathCompression != Compression::none &&
0100         pathCompression != resolvedCompression) {
0101       std::stringstream ss;
0102       ss << "Compression mismatch: config specifies " << resolvedCompression
0103          << ", but path '" << path << "' implies " << pathCompression;
0104       throw std::invalid_argument(ss.str());
0105     }
0106   } else {
0107     // Deduce compression from path
0108     resolvedCompression = pathCompression;
0109   }
0110 
0111   return {resolvedCompression, basePath};
0112 }
0113 
0114 ProcessCode HepMC3Writer::beginEvent(std::size_t threadId) {
0115   ACTS_VERBOSE("Begin event, next_event=" << m_nextEvent);
0116   if (!m_cfg.writeEventsInOrder) {
0117     // Nothing to do if we don't write in order
0118     return ProcessCode::SUCCESS;
0119   }
0120 
0121   ACTS_DEBUG("thread=" << threadId << ", next_event=" << m_nextEvent
0122                        << " waiting for semaphore");
0123   m_waiting++;
0124   std::chrono::seconds timeout{10};
0125   if (!m_queueSemaphore.try_acquire_for(timeout)) {
0126     ACTS_ERROR("thread=" << threadId << ", next_event=" << m_nextEvent
0127                          << " failed to acquire semaphore after "
0128                          << timeout.count() << "s");
0129     return ProcessCode::ABORT;
0130   }
0131   m_waiting--;
0132   ACTS_DEBUG("thread=" << threadId << ", next_event=" << m_nextEvent
0133                        << " have semaphore");
0134 
0135   return ProcessCode::SUCCESS;
0136 }
0137 
0138 ProcessCode HepMC3Writer::writeT(
0139     const AlgorithmContext& ctx,
0140     const std::shared_ptr<HepMC3::GenEvent>& event) {
0141   ACTS_VERBOSE("Write: event_nr=" << ctx.eventNumber
0142                                   << ", thread=" << ctx.threadId);
0143 
0144   if (!m_cfg.writeEventsInOrder) {
0145     std::scoped_lock lock{m_mutex};
0146     // Unconditionally write events in whatever order they come in
0147     m_writer->write_event(*event);
0148     if (m_writer->failed()) {
0149       ACTS_ERROR("Failed to write event number: " << ctx.eventNumber);
0150       return ProcessCode::ABORT;
0151     }
0152     m_eventsWritten++;
0153     return ProcessCode::SUCCESS;
0154   }
0155 
0156   std::scoped_lock lock{m_mutex};
0157 
0158   auto printQueue = [&]() {
0159     ACTS_VERBOSE("queue=[" << [&]() {
0160       std::vector<std::string> numbers;
0161       numbers.reserve(m_eventQueue.size());
0162       std::ranges::transform(
0163           m_eventQueue, std::back_inserter(numbers),
0164           [](const auto& pair) { return std::to_string(pair.first); });
0165 
0166       return boost::algorithm::join(numbers, ", ");
0167     }() << "]");
0168   };
0169 
0170   if (ctx.eventNumber == m_nextEvent) {
0171     ACTS_DEBUG("event_nr=" << ctx.eventNumber
0172                            << " is the next event -> writing");
0173 
0174     // write
0175     m_writer->write_event(*event);
0176     if (m_writer->failed()) {
0177       ACTS_ERROR("Failed to write event number: " << ctx.eventNumber);
0178       return ProcessCode::ABORT;
0179     }
0180     m_nextEvent++;
0181 
0182     std::size_t nWritten = 1;
0183 
0184     printQueue();
0185 
0186     while (!m_eventQueue.empty() &&
0187            m_eventQueue.front().first == static_cast<long long>(m_nextEvent)) {
0188       auto [nextEventNumber, nextEvent] = std::move(m_eventQueue.front());
0189       ACTS_VERBOSE("Writing event number: " << nextEventNumber);
0190       m_eventQueue.erase(m_eventQueue.begin());
0191 
0192       m_writer->write_event(*nextEvent);
0193       if (m_writer->failed()) {
0194         ACTS_ERROR("Failed to write event number: " << nextEventNumber);
0195         return ProcessCode::ABORT;
0196       }
0197       m_nextEvent++;
0198       nWritten++;
0199     }
0200 
0201     m_eventsWritten += nWritten;
0202 
0203     ACTS_VERBOSE("Wrote " << nWritten << " events, next_event=" << m_nextEvent
0204                           << ", thread=" << ctx.threadId);
0205     m_queueSemaphore.release(nWritten);
0206     ACTS_VERBOSE("thread=" << ctx.threadId << ", released n=" << nWritten
0207                            << ", waiting=" << m_waiting);
0208 
0209   } else {
0210     ACTS_DEBUG("event_nr=" << ctx.eventNumber
0211                            << " is not the next event -> queueing");
0212 
0213     ACTS_VERBOSE(
0214         "Finding insert location for event number: " << ctx.eventNumber);
0215     auto it = std::ranges::upper_bound(m_eventQueue, ctx.eventNumber, {},
0216                                        [](const auto& v) { return v.first; });
0217     if (it == m_eventQueue.end()) {
0218       ACTS_VERBOSE("Insert location for " << ctx.eventNumber
0219                                           << " is at the end of the queue");
0220     } else {
0221       ACTS_VERBOSE("Insert location for "
0222                    << ctx.eventNumber
0223                    << " is before event number: " << it->first);
0224     }
0225 
0226     m_eventQueue.emplace(it, ctx.eventNumber, event);
0227     printQueue();
0228     m_maxEventQueueSize = std::max(m_maxEventQueueSize, m_eventQueue.size());
0229   }
0230 
0231   return ProcessCode::SUCCESS;
0232 }
0233 
0234 ProcessCode HepMC3Writer::finalize() {
0235   ACTS_VERBOSE("Finalizing HepMC3Writer");
0236   if (m_writer) {
0237     m_writer->close();
0238   }
0239   ACTS_DEBUG("max_queue_size=" << m_maxEventQueueSize
0240                                << " limit=" << m_cfg.maxEventsPending);
0241 
0242   // Write sidecar metadata file with event count
0243   ACTS_DEBUG("Writing sidecar metadata for " << m_actualOutputPath << " with "
0244                                              << m_eventsWritten << " events");
0245   if (!HepMC3Metadata::writeSidecar(
0246           m_actualOutputPath,
0247           HepMC3Metadata::HepMC3Metadata{.numEvents = m_eventsWritten},
0248           logger())) {
0249     ACTS_WARNING("Failed to write sidecar metadata file for "
0250                  << m_actualOutputPath);
0251   }
0252 
0253   return ProcessCode::SUCCESS;
0254 }
0255 
0256 }  // namespace ActsExamples