File indexing completed on 2025-04-26 09:01:12
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #pragma once
0013
0014
0015
0016
0017 #include <Gaudi/Algorithm.h>
0018 #include <GaudiKernel/IHiveWhiteBoard.h>
0019
0020 #ifdef GAUDI_USE_CUDA
0021 # include <Gaudi/CUDAAsynchronousAlgHelper.cuh>
0022 #endif
0023
0024 #include <atomic>
0025 #include <boost/fiber/all.hpp>
0026 #include <boost/unordered/unordered_flat_set.hpp>
0027 #include <chrono>
0028 #include <fmt/format.h>
0029
0030 namespace Gaudi {
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044 #define ACCALG_AWAIT( stmt ) \
0045 stmt; \
0046 if ( restoreAfterSuspend().isFailure() ) return StatusCode::FAILURE;
0047
0048 class GAUDI_API AsynchronousAlgorithm : virtual public Gaudi::Algorithm {
0049 protected:
0050
0051 boost::fibers::fiber_specific_ptr<std::size_t> s_currentSlot{};
0052
0053
0054 virtual StatusCode restoreAfterSuspend() const {
0055 if ( !whiteboard()->selectStore( *s_currentSlot ).isSuccess() ) {
0056 msg() << MSG::ERROR << "Resetting slot from fiber_specific_ptr failed" << endmsg;
0057 return StatusCode::FAILURE;
0058 }
0059 return StatusCode::SUCCESS;
0060 }
0061
0062 public:
0063 StatusCode sysInitialize() override {
0064 setAsynchronous( true );
0065 msg() << MSG::DEBUG << "Starting sysInitialize for AsynchronousAlgorithm" << endmsg;
0066 return Gaudi::Algorithm::sysInitialize();
0067 }
0068
0069 StatusCode sysExecute( const EventContext& ctx ) override {
0070 msg() << MSG::DEBUG << "Starting sysExecute for AsynchronousAlgorithm on slot " << ctx.slot()
0071 << "with s_currentSlot = " << fmt::to_string( fmt::ptr( s_currentSlot.get() ) ) << endmsg;
0072 if ( s_currentSlot.get() == nullptr ) {
0073 s_currentSlot.reset( new std::size_t( ctx.slot() ) );
0074 } else if ( *s_currentSlot != ctx.slot() ) {
0075 error() << "Current slot is " << ctx.slot() << " but s_currentSlot exists and is " << *s_currentSlot << endmsg;
0076 return StatusCode::FAILURE;
0077 }
0078 return Gaudi::Algorithm::sysExecute( ctx );
0079 }
0080
0081
0082 StatusCode yield() const {
0083 boost::this_fiber::yield();
0084 return restoreAfterSuspend();
0085 }
0086
0087
0088 template <typename Clock, typename Duration>
0089 StatusCode sleep_until( std::chrono::time_point<Clock, Duration> const& sleep_time ) const {
0090 boost::this_fiber::sleep_until( sleep_time );
0091 return restoreAfterSuspend();
0092 }
0093
0094
0095 template <typename Rep, typename Period>
0096 StatusCode sleep_for( std::chrono::duration<Rep, Period> const& dur ) const {
0097 boost::this_fiber::sleep_for( dur );
0098 return restoreAfterSuspend();
0099 }
0100
0101 #ifdef GAUDI_USE_CUDA
0102
0103 StatusCode cuda_stream_await( cudaStream_t cudaStream ) const {
0104 CUDA_CHECK( Gaudi::CUDA::cuda_stream_await( cudaStream ) );
0105 return restoreAfterSuspend();
0106 }
0107
0108
0109 void print_cuda_error( std::string msg_ ) const {
0110 msg() << MSG::ERROR << msg_ << endmsg;
0111 throw GaudiException( msg_, "CUDA_EXCEPTION", StatusCode::FAILURE );
0112 }
0113 #endif
0114 };
0115
0116 #ifdef GAUDI_USE_CUDA
0117 namespace CUDA {
0118 using namespace std::chrono_literals;
0119
0120 class CUDAStream {
0121 private:
0122 cudaStream_t stream;
0123 const Gaudi::AsynchronousAlgorithm* parent;
0124 int nth_stream = 0;
0125 boost::unordered_flat_set<void*> allocations{};
0126
0127 public:
0128 CUDAStream( const Gaudi::AsynchronousAlgorithm* parent, std::string file = __FILE__, int line = __LINE__ );
0129
0130 operator cudaStream_t() { return stream; }
0131
0132 template <typename T>
0133 T* malloc( std::size_t len ) {
0134 void* devPtr = nullptr;
0135 cudaError_t err = cudaSuccess;
0136 if constexpr ( !std::is_same_v<T, void> ) { len *= sizeof( T ); }
0137 const auto starttime = std::chrono::steady_clock::now();
0138 do {
0139 err = cudaMallocAsync( &devPtr, len, stream );
0140 if ( err == cudaErrorMemoryAllocation ) {
0141 StatusCode sc = parent->sleep_for( 10ms );
0142 if ( sc.isFailure() ) { parent->print_cuda_error( "Yield error" ); }
0143 }
0144 } while ( err == cudaErrorMemoryAllocation );
0145 const double waittime =
0146 std::chrono::duration_cast<std::chrono::microseconds>( std::chrono::steady_clock::now() - starttime )
0147 .count() /
0148 1e6;
0149 if ( waittime >= 0.01 ) {
0150 fmt::print( "Waited {} to allocate {} of GPU memory\n", SI( waittime, "s" ), SI( len, "B" ) );
0151 }
0152 allocations.insert( devPtr );
0153 return static_cast<T*>( devPtr );
0154 }
0155
0156 template <typename T>
0157 void free( T* d_ptr ) {
0158 auto iter = allocations.find( d_ptr );
0159 if ( iter == allocations.end() ) {
0160 parent->print_cuda_error( "Called stream.free on an allocation not created by this stream" );
0161 }
0162 cudaFreeAsync( static_cast<void*>( d_ptr ), stream );
0163 allocations.erase( iter );
0164 }
0165
0166 ~CUDAStream();
0167 };
0168 }
0169 #endif
0170 }