File indexing completed on 2026-04-09 07:49:44
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051 #include "SPM.hh"
0052 #include <thrust/device_ptr.h>
0053 #include <thrust/sort.h>
0054 #include <thrust/reduce.h>
0055 #include <thrust/merge.h>
0056 #include <thrust/count.h>
0057 #include <thrust/copy.h>
0058 #include <thrust/execution_policy.h>
0059 #include <fstream>
0060 #include <vector>
0061 #include <cassert>
0062
0063
0064 using namespace thrust::placeholders;
0065
0066 template<typename T>
0067 SPM_future<T> SPM::merge_partial_select_async(
0068 const T* d_in,
0069 size_t num_in,
0070 unsigned select_flagmask,
0071 float time_window,
0072 cudaStream_t stream )
0073 {
0074 SPM_future<T> result ;
0075
0076 if (num_in == 0 )
0077 {
0078 result.count = 0;
0079 result.ptr = nullptr;
0080 }
0081 else
0082 {
0083 merge_partial_select<T>(
0084 d_in,
0085 num_in,
0086 &result.ptr,
0087 &result.count,
0088 select_flagmask,
0089 time_window,
0090 stream );
0091 }
0092
0093 cudaEventCreateWithFlags(&result.ready, cudaEventDisableTiming);
0094 cudaEventRecord(result.ready, stream);
0095
0096
0097 return result ;
0098 }
0099
0100
0101 template SPM_future<sphotonlite> SPM::merge_partial_select_async( const sphotonlite* d_in, size_t num_in, unsigned select_flagmask, float time_window, cudaStream_t stream );
0102 template SPM_future<sphoton> SPM::merge_partial_select_async( const sphoton* d_in, size_t num_in, unsigned select_flagmask, float time_window, cudaStream_t stream );
0103
0104
0105
0106
0107
0108
0109
0110
0111
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121
0122
0123
0124
0125
0126
0127
0128
0129
0130
0131
0132
0133
0134
0135 template<typename T> void SPM::merge_partial_select(
0136 const T* d_in,
0137 size_t num_in,
0138 T** d_out,
0139 size_t* num_out,
0140 unsigned select_flagmask,
0141 float time_window,
0142 cudaStream_t stream )
0143 {
0144 using select_pred = typename T::select_pred;
0145 using reduce_op = typename T::reduce_op;
0146 using key_functor = typename T::key_functor;
0147
0148
0149 printf("[SPM::merge_partial_select num_in %d select_flagmask %d time_window %7.3f \n", num_in, select_flagmask, time_window );
0150
0151 if (num_in == 0) { *d_out = nullptr; if (num_out) *num_out = 0; return; }
0152
0153 auto policy = thrust::cuda::par_nosync.on(stream);
0154 auto in = thrust::device_ptr<const T>(d_in);
0155
0156
0157
0158
0159 T* d_selected = nullptr;
0160 size_t num_selected = 0 ;
0161
0162 bool apply_selection = select_flagmask != ALREADY_HITMASK_SELECTED ;
0163 if( apply_selection )
0164 {
0165 select_pred selector{select_flagmask} ;
0166 num_selected = thrust::count_if(policy, in, in + num_in, selector);
0167
0168 if (num_selected == 0)
0169 {
0170 *d_out = nullptr;
0171 if (num_out) *num_out = 0;
0172 return;
0173 }
0174
0175 cudaMallocAsync(&d_selected, num_selected * sizeof(T), stream);
0176 thrust::copy_if(policy, in, in + num_in, thrust::device_ptr<T>(d_selected), selector);
0177 }
0178 else
0179 {
0180 d_selected = (T*)d_in ;
0181 num_selected = num_in ;
0182 }
0183
0184
0185
0186 if (time_window == NOMERGE_TIME_WINDOW)
0187 {
0188 *d_out = d_selected;
0189 if (num_out) *num_out = num_selected ;
0190 return;
0191 }
0192
0193
0194
0195
0196 uint64_t* d_keys = nullptr;
0197 cudaMallocAsync(&d_keys, num_selected * sizeof(uint64_t), stream);
0198
0199
0200
0201
0202 auto selected = thrust::device_ptr<const T>(d_selected);
0203 auto keys_it = thrust::make_transform_iterator(selected, key_functor{time_window});
0204 thrust::copy( policy, keys_it , keys_it + num_selected, thrust::device_ptr<uint64_t>(d_keys));
0205
0206
0207
0208
0209
0210 thrust::sort_by_key(policy,
0211 thrust::device_ptr<uint64_t>(d_keys),
0212 thrust::device_ptr<uint64_t>(d_keys + num_selected),
0213 thrust::device_ptr<T>(d_selected));
0214
0215 cudaFreeAsync(d_keys, stream);
0216
0217
0218
0219
0220
0221 uint64_t* d_out_key = nullptr;
0222 T* d_out_val = nullptr;
0223 cudaMallocAsync(&d_out_key, num_selected * sizeof(uint64_t), stream);
0224 cudaMallocAsync(&d_out_val, num_selected * sizeof(T), stream);
0225
0226
0227
0228 auto d_out_key_begin = thrust::device_ptr<uint64_t>(d_out_key);
0229
0230 auto ends = thrust::reduce_by_key(policy,
0231 thrust::device_ptr<uint64_t>(d_keys),
0232 thrust::device_ptr<uint64_t>(d_keys + num_selected),
0233 thrust::device_ptr<T>(d_selected),
0234 d_out_key_begin,
0235 thrust::device_ptr<T>(d_out_val),
0236 thrust::equal_to<uint64_t>{},
0237 reduce_op{});
0238
0239
0240 if(apply_selection )
0241 {
0242 cudaFreeAsync(d_selected, stream);
0243
0244 }
0245 else
0246 {
0247 assert( d_selected == d_in && num_selected == num_in );
0248
0249 }
0250
0251
0252
0253 cudaError_t sync_err = cudaStreamSynchronize(stream);
0254 if (sync_err != cudaSuccess) {
0255 fprintf(stderr, "cudaStreamSynchronize failed: %s\n", cudaGetErrorString(sync_err));
0256 return ;
0257 }
0258
0259
0260 size_t merged = ends.first.get() - d_out_key ;
0261
0262
0263 cudaFreeAsync(d_out_key, stream);
0264
0265
0266 T* d_final = nullptr;
0267 if (merged > 0) {
0268 cudaMallocAsync(&d_final, merged * sizeof(T), stream);
0269 cudaMemcpyAsync(d_final, d_out_val, merged * sizeof(T),
0270 cudaMemcpyDeviceToDevice, stream);
0271 }
0272
0273
0274
0275 cudaFreeAsync(d_out_val, stream);
0276
0277
0278
0279
0280 *d_out = d_final;
0281 if(num_out) *num_out = merged;
0282
0283
0284
0285 float select_frac = float(num_selected)/float(num_in);
0286 float merge_frac = float(merged)/float(num_selected) ;
0287 printf("]SPM::merge_partial_select select_flagmask %d time_window %7.3f in %d selected %d merged %d selected/in %7.3f merged/selected %7.3f \n",
0288 select_flagmask, time_window, num_in, num_selected, merged, select_frac, merge_frac );
0289
0290 }
0291
0292
0293 template void SPM::merge_partial_select<sphotonlite>( const sphotonlite* d_in, size_t num_in, sphotonlite** d_out, size_t* num_out, unsigned select_flagmask, float time_window, cudaStream_t stream );
0294 template void SPM::merge_partial_select<sphoton>( const sphoton* d_in, size_t num_in, sphoton** d_out, size_t* num_out, unsigned select_flagmask, float time_window, cudaStream_t stream );
0295
0296
0297 template<typename T>
0298 int SPM::copy_device_to_host_async( T* h, T* d, size_t num_items, cudaStream_t stream )
0299 {
0300 if( d == nullptr ) std::cerr
0301 << "SPM::copy_device_to_host_async"
0302 << " ERROR : device pointer is null "
0303 << std::endl
0304 ;
0305
0306 if( d == nullptr ) return 1 ;
0307
0308 size_t size = num_items*sizeof(T) ;
0309
0310 cudaMemcpyAsync(h, d, size, cudaMemcpyDeviceToHost, stream);
0311
0312 return 0 ;
0313 }
0314
0315 template int SPM::copy_device_to_host_async<sphotonlite>( sphotonlite* h, sphotonlite* d, size_t num_items, cudaStream_t stream );
0316 template int SPM::copy_device_to_host_async<sphoton>( sphoton* h, sphoton* d, size_t num_items, cudaStream_t stream );
0317
0318
0319 void SPM::free( void* d_ptr )
0320 {
0321 cudaFree(d_ptr);
0322 }
0323
0324 void SPM::free_async(void* d_ptr, cudaStream_t stream)
0325 {
0326 if (d_ptr) cudaFreeAsync(d_ptr, stream);
0327 }
0328
0329