File indexing completed on 2025-01-18 10:01:14
0001
0002
0003
0004
0005
0006 #ifndef HEPMC3_READERMT_H
0007 #define HEPMC3_READERMT_H
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #include <set>
0018 #include <string>
0019 #include <fstream>
0020 #include <istream>
0021 #include <iterator>
0022 #include <thread>
0023 #include "HepMC3/Reader.h"
0024 #include "HepMC3/GenEvent.h"
0025 namespace HepMC3 {
0026 template <class T, size_t m_number_of_threads> class ReaderMT : public Reader
0027 {
0028 private:
0029 bool m_go_try_cache;
0030 std::vector< std::shared_ptr<T> > m_readers;
0031 std::vector< std::pair<GenEvent, bool> > m_events;
0032 std::vector< std::thread > m_threads;
0033
0034 static void read_function(std::pair<GenEvent,bool>& e, std::shared_ptr<T> r)
0035 {
0036 e.second = r->read_event(e.first);
0037 r->skip(m_number_of_threads-1);
0038 if (r->failed()) r->close();
0039 }
0040 public:
0041
0042 ReaderMT(const std::string& filename): m_go_try_cache(true) {
0043 m_events.reserve(m_number_of_threads);
0044 m_readers.reserve(m_number_of_threads);
0045 m_threads.reserve(m_number_of_threads);
0046 for (size_t i = 0; i < m_number_of_threads; ++i) {
0047 m_readers.push_back(std::make_shared<T>(filename));
0048 m_readers.back()->skip(m_number_of_threads-1-i);
0049 }
0050 }
0051
0052 ~ReaderMT() {
0053 m_readers.clear();
0054 m_events.clear();
0055 m_threads.clear();
0056 }
0057
0058 bool skip(const int) override {
0059 return false;
0060 }
0061
0062 bool read_event(GenEvent& evt) override {
0063 if ( !m_events.empty() ) {
0064 evt = m_events.back().first;
0065 m_events.pop_back();
0066 return true;
0067 }
0068 m_events.clear();
0069 m_threads.clear();
0070 m_go_try_cache = true;
0071 m_threads.reserve(m_number_of_threads);
0072 m_events.reserve(m_number_of_threads);
0073 for (size_t i = 0; i < m_number_of_threads; ++i) {
0074 m_events.push_back(std::pair<GenEvent, bool>(GenEvent(Units::GEV,Units::MM), true));
0075 m_threads.push_back(std::thread(read_function, std::ref(m_events.at(i)), m_readers.at(i)));
0076 }
0077 for (auto& th : m_threads) {
0078 th.join();
0079 }
0080 m_threads.clear();
0081
0082 m_events.erase(std::remove_if(m_events.begin(), m_events.end(),[](std::pair<GenEvent, bool>& x) {
0083 return !x.second;
0084 }), m_events.end());
0085
0086 if (m_events.empty()) {
0087 m_go_try_cache = false;
0088 return false;
0089 }
0090 evt = m_events.back().first;
0091 m_events.pop_back();
0092 return true;
0093 }
0094
0095 bool failed() override {
0096 for (auto& reader: m_readers) if (reader && !reader->failed()) return false;
0097 if ( !m_events.empty() ) return false;
0098 if ( m_go_try_cache ) return false;
0099 return true;
0100 }
0101
0102 void close() override {
0103 for (auto& reader: m_readers) if (reader) reader->close();
0104 }
0105 };
0106 }
0107 #endif