Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:49:44

0001 /**
0002 SPM.cu
0003 =======
0004 
0005 
0006 SPM::merge_partial_select_async
0007     Uses merge_partial_select and non-blockingly returns SPM_future<T>
0008 
0009 SPM::merge_partial_select
0010     Flagmask select hits from input photons and merge the hits by (id,timebucket)
0011     with incremented counts. Obviously this requires the photons and hits to
0012     simultaneously fit into VRAM.
0013 
0014 SPM::copy_device_to_host_async
0015 
0016 SPM::free
0017 
0018 SPM::free_async
0019 
0020 
0021 
0022 Background on async CUDA
0023 --------------------------
0024 
0025 "Safe, Seamless, And Scalable Integration Of Asynchronous GPU Streams In PETSc"
0026     https://arxiv.org/pdf/2306.17801
0027 
0028 "CUDA C/C++ Streams and Concurrency, Steve Rennich, NVIDIA"
0029     https://developer.download.nvidia.com/CUDA/training/StreamsAndConcurrencyWebinar.pdf
0030 
0031 
0032 par_nosync is quite recent (Feb 9, 2022)
0033 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0034 
0035 https://github.com/NVIDIA/thrust/blob/main/examples/cuda/explicit_cuda_stream.cu
0036     par_nosync does not stop sync when that is needed for correctness,
0037     ie to return num_selected to the host.
0038 
0039 https://github.com/NVIDIA/thrust/issues/1515
0040 https://github.com/petsc/petsc/commit/0fa675732414ab06118e41f905207ba3ccea9c4e
0041 
0042 
0043 https://github.com/NVIDIA/thrust/discussions/1616
0044     Feb 9, 2022
0045     Thrust 1.16.0 provides a new “nosync” hint
0046 
0047 
0048 
0049 **/
0050 
0051 #include "SPM.hh"
0052 #include <thrust/device_ptr.h>
0053 #include <thrust/sort.h>
0054 #include <thrust/reduce.h>
0055 #include <thrust/merge.h>
0056 #include <thrust/count.h>
0057 #include <thrust/copy.h>
0058 #include <thrust/execution_policy.h>
0059 #include <fstream>
0060 #include <vector>
0061 #include <cassert>
0062 
0063 
0064 using namespace thrust::placeholders;
0065 
0066 template<typename T>
0067 SPM_future<T> SPM::merge_partial_select_async(
0068     const T*          d_in,
0069     size_t            num_in,
0070     unsigned select_flagmask,
0071     float        time_window,
0072     cudaStream_t      stream )
0073 {
0074     SPM_future<T> result  ;
0075 
0076     if (num_in == 0 )
0077     {
0078         result.count = 0;
0079         result.ptr   = nullptr;
0080     }
0081     else
0082     {
0083         merge_partial_select<T>(
0084                 d_in,
0085                 num_in,
0086                 &result.ptr,
0087                 &result.count,
0088                 select_flagmask,
0089                 time_window,
0090                 stream );
0091     }
0092 
0093     cudaEventCreateWithFlags(&result.ready, cudaEventDisableTiming);
0094     cudaEventRecord(result.ready, stream);
0095     // Record event immediately after last operation
0096 
0097     return result ;
0098 }
0099 
0100 
0101 template SPM_future<sphotonlite> SPM::merge_partial_select_async( const sphotonlite* d_in, size_t num_in, unsigned select_flagmask, float time_window, cudaStream_t stream );
0102 template SPM_future<sphoton>     SPM::merge_partial_select_async( const sphoton*     d_in, size_t num_in, unsigned select_flagmask, float time_window, cudaStream_t stream );
0103 
0104 
0105 /**
0106 SPM::merge_partial_select
0107 -------------------------
0108 
0109 Flagmask select hits from input photons and merge the hits by (id,timebucket)
0110 with incremented counts. Obviously this requires the photons and hits to
0111 simultaneously fit into VRAM.
0112 
0113 0. apply selection using count_if, allocate, copy_if
0114 1. special case time_window:0.f to just return selected without merging
0115 2. allocate `(uint64_t*)d_keys`
0116 3. populate d_keys using T::key_functor and d_selected using copy_n
0117 4. sort_by_key arranging d_selected hits with same (id, timebucket) together
0118 5. allocate d_out_key d_out_val with space for num_in [HMM, COULD DOUBLE PASS TO REDUCE MEMORY PERHAPS?]
0119 6. reduce_by_key merging contiguous equal (id,timebucket) hits
0120 7. get number of merged hits
0121 8. allocate d_final to fit merged hits, d2d copy d_final from d_out_val
0122 9. free temporary buffers
0123 10. set d_out and num_out
0124 
0125 final merge special case
0126 ~~~~~~~~~~~~~~~~~~~~~~~~~~
0127 
0128 With the final merge of the concatenated per-launch select+merge results
0129 the selection has been done already and hence does not need to be repeated.
0130 As d_selected is then the same as d_in the free is skipped as d_in belongs
0131 to the caller.
0132 
0133 **/
0134 
0135 template<typename T> void SPM::merge_partial_select(
0136     const T*          d_in,
0137     size_t            num_in,
0138     T**               d_out,
0139     size_t*          num_out,
0140     unsigned select_flagmask,
0141     float        time_window,
0142     cudaStream_t      stream )
0143 {
0144     using select_pred   = typename T::select_pred;
0145     using reduce_op     = typename T::reduce_op;
0146     using key_functor   = typename T::key_functor;
0147 
0148 
0149     printf("[SPM::merge_partial_select num_in %d select_flagmask %d time_window %7.3f \n", num_in, select_flagmask, time_window );
0150 
0151     if (num_in == 0) { *d_out = nullptr; if (num_out) *num_out = 0; return; }
0152 
0153     auto policy = thrust::cuda::par_nosync.on(stream);
0154     auto in = thrust::device_ptr<const T>(d_in);
0155 
0156 
0157     // 0. apply selection using count_if, allocate, copy_if
0158 
0159     T* d_selected = nullptr;
0160     size_t num_selected = 0 ;
0161 
0162     bool apply_selection = select_flagmask != ALREADY_HITMASK_SELECTED ;
0163     if( apply_selection )
0164     {
0165         select_pred selector{select_flagmask} ;
0166         num_selected = thrust::count_if(policy, in, in + num_in, selector);
0167 
0168         if (num_selected == 0)
0169         {
0170             *d_out = nullptr;
0171             if (num_out) *num_out = 0;
0172             return;
0173         }
0174 
0175         cudaMallocAsync(&d_selected, num_selected * sizeof(T), stream);
0176         thrust::copy_if(policy, in, in + num_in, thrust::device_ptr<T>(d_selected), selector);
0177     }
0178     else
0179     {
0180          d_selected = (T*)d_in ;
0181          num_selected = num_in ;
0182     }
0183 
0184 
0185     // 1. special case time_window:0.f to just return selected without merging
0186     if (time_window == NOMERGE_TIME_WINDOW)
0187     {
0188         *d_out = d_selected;
0189         if (num_out) *num_out = num_selected ;
0190         return;
0191     }
0192 
0193 
0194     // 2. allocate `(uint64_t*)d_keys`
0195 
0196     uint64_t* d_keys = nullptr;
0197     cudaMallocAsync(&d_keys, num_selected * sizeof(uint64_t),    stream);
0198 
0199 
0200 
0201     // 3. populate d_keys using key_functor and d_vals using copy_n
0202     auto selected = thrust::device_ptr<const T>(d_selected);
0203     auto keys_it = thrust::make_transform_iterator(selected, key_functor{time_window});
0204     thrust::copy(  policy, keys_it , keys_it + num_selected, thrust::device_ptr<uint64_t>(d_keys));
0205 
0206 
0207 
0208     // 4. sort_by_key arranging d_selected hits with same (id, timebucket) to be contiguous
0209 
0210     thrust::sort_by_key(policy,
0211                         thrust::device_ptr<uint64_t>(d_keys),
0212                         thrust::device_ptr<uint64_t>(d_keys + num_selected),
0213                         thrust::device_ptr<T>(d_selected));
0214 
0215     cudaFreeAsync(d_keys, stream);
0216 
0217 
0218 
0219     // 5. allocate d_out_key d_out_val with space for num_selected
0220 
0221     uint64_t*    d_out_key = nullptr;
0222     T*           d_out_val = nullptr;
0223     cudaMallocAsync(&d_out_key, num_selected * sizeof(uint64_t),   stream);
0224     cudaMallocAsync(&d_out_val, num_selected * sizeof(T),          stream);
0225 
0226     // 6. reduce_by_key merging contiguous equal (id,timebucket) hits
0227 
0228     auto d_out_key_begin = thrust::device_ptr<uint64_t>(d_out_key);
0229 
0230     auto ends = thrust::reduce_by_key(policy,
0231                 thrust::device_ptr<uint64_t>(d_keys),
0232                 thrust::device_ptr<uint64_t>(d_keys + num_selected),
0233                 thrust::device_ptr<T>(d_selected),
0234                 d_out_key_begin,                      // output keys
0235                 thrust::device_ptr<T>(d_out_val),
0236                 thrust::equal_to<uint64_t>{},
0237                 reduce_op{});
0238 
0239 
0240     if(apply_selection )
0241     {
0242         cudaFreeAsync(d_selected, stream);
0243         // omitting this caused leak steps of 0.9GB in the whopper 8.25 billion test
0244     }
0245     else
0246     {
0247         assert( d_selected == d_in && num_selected == num_in );
0248         // for apply_selection:false d_selected is same as d_in which belongs to caller
0249     }
0250 
0251 
0252     // Synchronize the stream here to ensure reduce_by_key results are ready for host access
0253     cudaError_t sync_err = cudaStreamSynchronize(stream);
0254     if (sync_err != cudaSuccess) {
0255         fprintf(stderr, "cudaStreamSynchronize failed: %s\n", cudaGetErrorString(sync_err));
0256         return ;
0257     }
0258 
0259     // 7. get number of merged hits
0260     size_t merged = ends.first.get() - d_out_key ;
0261     // (Proceed to step 8 as-is; the sync ensures d_out_val is also ready for the subsequent cudaMemcpyAsync)
0262 
0263     cudaFreeAsync(d_out_key, stream);
0264 
0265     // 8. allocate d_final to fit merged hits, d2d copy d_final from d_out_val
0266     T* d_final = nullptr;
0267     if (merged > 0) {
0268         cudaMallocAsync(&d_final, merged * sizeof(T), stream);
0269         cudaMemcpyAsync(d_final, d_out_val, merged * sizeof(T),
0270                         cudaMemcpyDeviceToDevice, stream);
0271     }
0272 
0273     // 9. free temporary buffers
0274 
0275     cudaFreeAsync(d_out_val, stream);
0276 
0277 
0278     // 10. set d_out and num_out
0279 
0280     *d_out = d_final;
0281     if(num_out) *num_out = merged;
0282 
0283     //printf("]SPM::merge_partial_select merged %d  \n", merged );
0284 
0285     float select_frac = float(num_selected)/float(num_in);
0286     float merge_frac = float(merged)/float(num_selected) ;
0287     printf("]SPM::merge_partial_select select_flagmask %d time_window %7.3f in %d selected %d merged %d selected/in %7.3f merged/selected %7.3f \n",
0288                 select_flagmask, time_window, num_in, num_selected, merged, select_frac, merge_frac );
0289 
0290 }
0291 // sed -n '/^template<typename T> void SPM::merge_partial_select/,/^}/p' ~/o/sysrap/SPM.cu | pbcopy
0292 
0293 template void SPM::merge_partial_select<sphotonlite>( const sphotonlite* d_in, size_t num_in, sphotonlite** d_out, size_t* num_out, unsigned select_flagmask, float time_window, cudaStream_t stream );
0294 template void SPM::merge_partial_select<sphoton>(     const sphoton*     d_in, size_t num_in, sphoton**     d_out, size_t* num_out, unsigned select_flagmask, float time_window, cudaStream_t stream );
0295 
0296 
0297 template<typename T>
0298 int SPM::copy_device_to_host_async( T* h, T* d,  size_t num_items, cudaStream_t stream )
0299 {
0300     if( d == nullptr ) std::cerr
0301         << "SPM::copy_device_to_host_async"
0302         << " ERROR : device pointer is null "
0303         << std::endl
0304         ;
0305 
0306     if( d == nullptr ) return 1 ;
0307 
0308     size_t size = num_items*sizeof(T) ;
0309 
0310     cudaMemcpyAsync(h, d, size, cudaMemcpyDeviceToHost, stream);
0311 
0312     return 0 ;
0313 }
0314 
0315 template int SPM::copy_device_to_host_async<sphotonlite>( sphotonlite* h, sphotonlite* d,  size_t num_items, cudaStream_t stream );
0316 template int SPM::copy_device_to_host_async<sphoton>(     sphoton* h,     sphoton* d,      size_t num_items, cudaStream_t stream );
0317 
0318 
0319 void SPM::free( void* d_ptr )   // static
0320 {
0321     cudaFree(d_ptr);
0322 }
0323 
0324 void SPM::free_async(void* d_ptr, cudaStream_t stream)  // static
0325 {
0326     if (d_ptr) cudaFreeAsync(d_ptr, stream);
0327 }
0328 
0329