Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-04-26 09:01:12

0001 
0002 /***********************************************************************************\
0003 * (c) Copyright 2023-2024 CERN for the benefit of the LHCb and ATLAS collaborations *
0004 *                                                                                   *
0005 * This software is distributed under the terms of the Apache version 2 licence,     *
0006 * copied verbatim in the file "LICENSE".                                            *
0007 *                                                                                   *
0008 * In applying this licence, CERN does not waive the privileges and immunities       *
0009 * granted to it by virtue of its status as an Intergovernmental Organization        *
0010 * or submit itself to any jurisdiction.                                             *
0011 \***********************************************************************************/
0012 #pragma once
0013 // ============================================================================
0014 // Include files
0015 // ============================================================================
0016 // Gaudi
0017 #include <Gaudi/Algorithm.h>
0018 #include <GaudiKernel/IHiveWhiteBoard.h>
0019 // Gaudi CUDA
0020 #ifdef GAUDI_USE_CUDA
0021 #  include <Gaudi/CUDAAsynchronousAlgHelper.cuh>
0022 #endif
0023 // Others
0024 #include <atomic>
0025 #include <boost/fiber/all.hpp>
0026 #include <boost/unordered/unordered_flat_set.hpp>
0027 #include <chrono>
0028 #include <fmt/format.h>
0029 
0030 namespace Gaudi {
0031   /** Base class for asynchronous algorithms.
0032    *
0033    *  Augments Gaudi::Algorithm by saving and restoring current slot whenever
0034    *  fiber is suspended and resumed. This requires using the member functions for
0035    *  suspending instead of the boost::fiber functions directly.
0036    *
0037    *  @author Beojan Stanislaus
0038    *  @date 2023
0039    */
0040 
0041 /// Macro for case where we don't yet have a subclass / member function for a given
0042 /// type of waiting
0043 ///
0044 #define ACCALG_AWAIT( stmt )                                                                                           \
0045   stmt;                                                                                                                \
0046   if ( restoreAfterSuspend().isFailure() ) return StatusCode::FAILURE;
0047 
0048   class GAUDI_API AsynchronousAlgorithm : virtual public Gaudi::Algorithm {
0049   protected:
0050     /// Contains current slot
0051     boost::fibers::fiber_specific_ptr<std::size_t> s_currentSlot{};
0052 
0053     /// Restore after suspend
0054     virtual StatusCode restoreAfterSuspend() const {
0055       if ( !whiteboard()->selectStore( *s_currentSlot ).isSuccess() ) {
0056         msg() << MSG::ERROR << "Resetting slot from fiber_specific_ptr failed" << endmsg;
0057         return StatusCode::FAILURE;
0058       }
0059       return StatusCode::SUCCESS;
0060     }
0061 
0062   public:
0063     StatusCode sysInitialize() override {
0064       setAsynchronous( true );
0065       msg() << MSG::DEBUG << "Starting sysInitialize for AsynchronousAlgorithm" << endmsg;
0066       return Gaudi::Algorithm::sysInitialize();
0067     }
0068 
0069     StatusCode sysExecute( const EventContext& ctx ) override {
0070       msg() << MSG::DEBUG << "Starting sysExecute for AsynchronousAlgorithm on slot " << ctx.slot()
0071             << "with s_currentSlot = " << fmt::to_string( fmt::ptr( s_currentSlot.get() ) ) << endmsg;
0072       if ( s_currentSlot.get() == nullptr ) {
0073         s_currentSlot.reset( new std::size_t( ctx.slot() ) );
0074       } else if ( *s_currentSlot != ctx.slot() ) {
0075         error() << "Current slot is " << ctx.slot() << " but s_currentSlot exists and is " << *s_currentSlot << endmsg;
0076         return StatusCode::FAILURE;
0077       }
0078       return Gaudi::Algorithm::sysExecute( ctx );
0079     }
0080 
0081     /// Forwards to boost::this_fiber::yield
0082     StatusCode yield() const {
0083       boost::this_fiber::yield();
0084       return restoreAfterSuspend();
0085     }
0086 
0087     /// Forwards to boost::this_fiber::sleep_until
0088     template <typename Clock, typename Duration>
0089     StatusCode sleep_until( std::chrono::time_point<Clock, Duration> const& sleep_time ) const {
0090       boost::this_fiber::sleep_until( sleep_time );
0091       return restoreAfterSuspend();
0092     }
0093 
0094     /// Forwards to boost::this_fiber::sleep_for
0095     template <typename Rep, typename Period>
0096     StatusCode sleep_for( std::chrono::duration<Rep, Period> const& dur ) const {
0097       boost::this_fiber::sleep_for( dur );
0098       return restoreAfterSuspend();
0099     }
0100 
0101 #ifdef GAUDI_USE_CUDA
0102     /// Wrapper for CUDA stream await
0103     StatusCode cuda_stream_await( cudaStream_t cudaStream ) const {
0104       CUDA_CHECK( Gaudi::CUDA::cuda_stream_await( cudaStream ) );
0105       return restoreAfterSuspend();
0106     }
0107 
0108     /// Helper to allow other classes to print error messages
0109     void print_cuda_error( std::string msg_ ) const {
0110       msg() << MSG::ERROR << msg_ << endmsg;
0111       throw GaudiException( msg_, "CUDA_EXCEPTION", StatusCode::FAILURE );
0112     }
0113 #endif
0114   };
0115 
0116 #ifdef GAUDI_USE_CUDA
0117   namespace CUDA {
0118     using namespace std::chrono_literals;
0119 
0120     class CUDAStream {
0121     private:
0122       cudaStream_t                        stream;
0123       const Gaudi::AsynchronousAlgorithm* parent;
0124       int                                 nth_stream = 0;
0125       boost::unordered_flat_set<void*>    allocations{};
0126 
0127     public:
0128       CUDAStream( const Gaudi::AsynchronousAlgorithm* parent, std::string file = __FILE__, int line = __LINE__ );
0129 
0130       operator cudaStream_t() { return stream; }
0131 
0132       template <typename T>
0133       T* malloc( std::size_t len ) {
0134         void*       devPtr = nullptr;
0135         cudaError_t err    = cudaSuccess;
0136         if constexpr ( !std::is_same_v<T, void> ) { len *= sizeof( T ); }
0137         const auto starttime = std::chrono::steady_clock::now();
0138         do {
0139           err = cudaMallocAsync( &devPtr, len, stream );
0140           if ( err == cudaErrorMemoryAllocation ) {
0141             StatusCode sc = parent->sleep_for( 10ms );
0142             if ( sc.isFailure() ) { parent->print_cuda_error( "Yield error" ); }
0143           }
0144         } while ( err == cudaErrorMemoryAllocation );
0145         const double waittime =
0146             std::chrono::duration_cast<std::chrono::microseconds>( std::chrono::steady_clock::now() - starttime )
0147                 .count() /
0148             1e6;
0149         if ( waittime >= 0.01 ) {
0150           fmt::print( "Waited {} to allocate {} of GPU memory\n", SI( waittime, "s" ), SI( len, "B" ) );
0151         }
0152         allocations.insert( devPtr );
0153         return static_cast<T*>( devPtr );
0154       }
0155 
0156       template <typename T>
0157       void free( T* d_ptr ) {
0158         auto iter = allocations.find( d_ptr );
0159         if ( iter == allocations.end() ) {
0160           parent->print_cuda_error( "Called stream.free on an allocation not created by this stream" );
0161         }
0162         cudaFreeAsync( static_cast<void*>( d_ptr ), stream );
0163         allocations.erase( iter );
0164       }
0165 
0166       ~CUDAStream();
0167     };
0168   } // namespace CUDA
0169 #endif
0170 } // namespace Gaudi