File indexing completed on 2025-01-18 09:29:55
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_COMPUTE_ALGORITHM_DETAIL_REDUCE_BY_KEY_WITH_SCAN_HPP
0012 #define BOOST_COMPUTE_ALGORITHM_DETAIL_REDUCE_BY_KEY_WITH_SCAN_HPP
0013
0014 #include <algorithm>
0015 #include <iterator>
0016
0017 #include <boost/compute/command_queue.hpp>
0018 #include <boost/compute/functional.hpp>
0019 #include <boost/compute/algorithm/inclusive_scan.hpp>
0020 #include <boost/compute/container/vector.hpp>
0021 #include <boost/compute/container/detail/scalar.hpp>
0022 #include <boost/compute/detail/meta_kernel.hpp>
0023 #include <boost/compute/detail/iterator_range_size.hpp>
0024 #include <boost/compute/detail/read_write_single_value.hpp>
0025 #include <boost/compute/type_traits.hpp>
0026 #include <boost/compute/utility/program_cache.hpp>
0027
0028 namespace boost {
0029 namespace compute {
0030 namespace detail {
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052 template<class InputKeyIterator, class BinaryPredicate>
0053 inline void generate_uint_keys(InputKeyIterator keys_first,
0054 size_t number_of_keys,
0055 BinaryPredicate predicate,
0056 vector<uint_>::iterator new_keys_first,
0057 size_t preferred_work_group_size,
0058 command_queue &queue)
0059 {
0060 typedef typename
0061 std::iterator_traits<InputKeyIterator>::value_type key_type;
0062
0063 detail::meta_kernel k("reduce_by_key_new_key_flags");
0064 k.add_set_arg<const uint_>("count", uint_(number_of_keys));
0065
0066 k <<
0067 k.decl<const uint_>("gid") << " = get_global_id(0);\n" <<
0068 k.decl<uint_>("value") << " = 0;\n" <<
0069 "if(gid >= count){\n return;\n}\n" <<
0070 "if(gid > 0){ \n" <<
0071 k.decl<key_type>("key") << " = " <<
0072 keys_first[k.var<const uint_>("gid")] << ";\n" <<
0073 k.decl<key_type>("previous_key") << " = " <<
0074 keys_first[k.var<const uint_>("gid - 1")] << ";\n" <<
0075 " value = " << predicate(k.var<key_type>("previous_key"),
0076 k.var<key_type>("key")) <<
0077 " ? 0 : 1;\n" <<
0078 "}\n else {\n" <<
0079 " value = 0;\n" <<
0080 "}\n" <<
0081 new_keys_first[k.var<const uint_>("gid")] << " = value;\n";
0082
0083 const context &context = queue.get_context();
0084 kernel kernel = k.compile(context);
0085
0086 size_t work_group_size = preferred_work_group_size;
0087 size_t work_groups_no = static_cast<size_t>(
0088 std::ceil(float(number_of_keys) / work_group_size)
0089 );
0090
0091 queue.enqueue_1d_range_kernel(kernel,
0092 0,
0093 work_groups_no * work_group_size,
0094 work_group_size);
0095
0096 inclusive_scan(new_keys_first, new_keys_first + number_of_keys,
0097 new_keys_first, queue);
0098 }
0099
0100
0101
0102
0103
0104 template<class InputValueIterator, class OutputValueIterator, class BinaryFunction>
0105 inline void carry_outs(vector<uint_>::iterator keys_first,
0106 InputValueIterator values_first,
0107 size_t count,
0108 vector<uint_>::iterator carry_out_keys_first,
0109 OutputValueIterator carry_out_values_first,
0110 BinaryFunction function,
0111 size_t work_group_size,
0112 command_queue &queue)
0113 {
0114 typedef typename
0115 std::iterator_traits<OutputValueIterator>::value_type value_out_type;
0116
0117 detail::meta_kernel k("reduce_by_key_with_scan_carry_outs");
0118 k.add_set_arg<const uint_>("count", uint_(count));
0119 size_t local_keys_arg = k.add_arg<uint_ *>(memory_object::local_memory, "lkeys");
0120 size_t local_vals_arg = k.add_arg<value_out_type *>(memory_object::local_memory, "lvals");
0121
0122 k <<
0123 k.decl<const uint_>("gid") << " = get_global_id(0);\n" <<
0124 k.decl<const uint_>("wg_size") << " = get_local_size(0);\n" <<
0125 k.decl<const uint_>("lid") << " = get_local_id(0);\n" <<
0126 k.decl<const uint_>("group_id") << " = get_group_id(0);\n" <<
0127
0128 k.decl<uint_>("key") << ";\n" <<
0129 k.decl<value_out_type>("value") << ";\n" <<
0130 "if(gid < count){\n" <<
0131 k.var<uint_>("key") << " = " <<
0132 keys_first[k.var<const uint_>("gid")] << ";\n" <<
0133 k.var<value_out_type>("value") << " = " <<
0134 values_first[k.var<const uint_>("gid")] << ";\n" <<
0135 "lkeys[lid] = key;\n" <<
0136 "lvals[lid] = value;\n" <<
0137 "}\n" <<
0138
0139
0140
0141 k.decl<value_out_type>("result") << " = value;\n" <<
0142 k.decl<uint_>("other_key") << ";\n" <<
0143 k.decl<value_out_type>("other_value") << ";\n" <<
0144
0145 "for(" << k.decl<uint_>("offset") << " = 1; " <<
0146 "offset < wg_size; offset *= 2){\n"
0147 " barrier(CLK_LOCAL_MEM_FENCE);\n" <<
0148 " if(lid >= offset){\n"
0149 " other_key = lkeys[lid - offset];\n" <<
0150 " if(other_key == key){\n" <<
0151 " other_value = lvals[lid - offset];\n" <<
0152 " result = " << function(k.var<value_out_type>("result"),
0153 k.var<value_out_type>("other_value")) << ";\n" <<
0154 " }\n" <<
0155 " }\n" <<
0156 " barrier(CLK_LOCAL_MEM_FENCE);\n" <<
0157 " lvals[lid] = result;\n" <<
0158 "}\n" <<
0159
0160
0161 "if(lid == (wg_size - 1)){\n" <<
0162 carry_out_keys_first[k.var<const uint_>("group_id")] << " = key;\n" <<
0163 carry_out_values_first[k.var<const uint_>("group_id")] << " = result;\n" <<
0164 "}\n";
0165
0166 size_t work_groups_no = static_cast<size_t>(
0167 std::ceil(float(count) / work_group_size)
0168 );
0169
0170 const context &context = queue.get_context();
0171 kernel kernel = k.compile(context);
0172 kernel.set_arg(local_keys_arg, local_buffer<uint_>(work_group_size));
0173 kernel.set_arg(local_vals_arg, local_buffer<value_out_type>(work_group_size));
0174
0175 queue.enqueue_1d_range_kernel(kernel,
0176 0,
0177 work_groups_no * work_group_size,
0178 work_group_size);
0179 }
0180
0181
0182
0183 template<class OutputValueIterator, class BinaryFunction>
0184 inline void carry_ins(vector<uint_>::iterator carry_out_keys_first,
0185 OutputValueIterator carry_out_values_first,
0186 OutputValueIterator carry_in_values_first,
0187 size_t carry_out_size,
0188 BinaryFunction function,
0189 size_t work_group_size,
0190 command_queue &queue)
0191 {
0192 typedef typename
0193 std::iterator_traits<OutputValueIterator>::value_type value_out_type;
0194
0195 uint_ values_pre_work_item = static_cast<uint_>(
0196 std::ceil(float(carry_out_size) / work_group_size)
0197 );
0198
0199 detail::meta_kernel k("reduce_by_key_with_scan_carry_ins");
0200 k.add_set_arg<const uint_>("carry_out_size", uint_(carry_out_size));
0201 k.add_set_arg<const uint_>("values_per_work_item", values_pre_work_item);
0202 size_t local_keys_arg = k.add_arg<uint_ *>(memory_object::local_memory, "lkeys");
0203 size_t local_vals_arg = k.add_arg<value_out_type *>(memory_object::local_memory, "lvals");
0204
0205 k <<
0206 k.decl<uint_>("id") << " = get_global_id(0) * values_per_work_item;\n" <<
0207 k.decl<uint_>("idx") << " = id;\n" <<
0208 k.decl<const uint_>("wg_size") << " = get_local_size(0);\n" <<
0209 k.decl<const uint_>("lid") << " = get_local_id(0);\n" <<
0210 k.decl<const uint_>("group_id") << " = get_group_id(0);\n" <<
0211
0212 k.decl<uint_>("key") << ";\n" <<
0213 k.decl<value_out_type>("value") << ";\n" <<
0214 k.decl<uint_>("previous_key") << ";\n" <<
0215 k.decl<value_out_type>("result") << ";\n" <<
0216
0217 "if(id < carry_out_size){\n" <<
0218 k.var<uint_>("previous_key") << " = " <<
0219 carry_out_keys_first[k.var<const uint_>("id")] << ";\n" <<
0220 k.var<value_out_type>("result") << " = " <<
0221 carry_out_values_first[k.var<const uint_>("id")] << ";\n" <<
0222 carry_in_values_first[k.var<const uint_>("id")] << " = result;\n" <<
0223 "}\n" <<
0224
0225 k.decl<const uint_>("end") << " = (id + values_per_work_item) <= carry_out_size" <<
0226 " ? (values_per_work_item + id) : carry_out_size;\n" <<
0227
0228 "for(idx = idx + 1; idx < end; idx += 1){\n" <<
0229 " key = " << carry_out_keys_first[k.var<const uint_>("idx")] << ";\n" <<
0230 " value = " << carry_out_values_first[k.var<const uint_>("idx")] << ";\n" <<
0231 " if(previous_key == key){\n" <<
0232 " result = " << function(k.var<value_out_type>("result"),
0233 k.var<value_out_type>("value")) << ";\n" <<
0234 " }\n else { \n" <<
0235 " result = value;\n"
0236 " }\n" <<
0237 " " << carry_in_values_first[k.var<const uint_>("idx")] << " = result;\n" <<
0238 " previous_key = key;\n"
0239 "}\n" <<
0240
0241
0242 "lkeys[lid] = previous_key;\n" <<
0243 "lvals[lid] = result;\n" <<
0244
0245
0246 "for(" << k.decl<uint_>("offset") << " = 1; " <<
0247 "offset < wg_size; offset *= 2){\n"
0248 " barrier(CLK_LOCAL_MEM_FENCE);\n" <<
0249 " if(lid >= offset){\n"
0250 " key = lkeys[lid - offset];\n" <<
0251 " if(previous_key == key){\n" <<
0252 " value = lvals[lid - offset];\n" <<
0253 " result = " << function(k.var<value_out_type>("result"),
0254 k.var<value_out_type>("value")) << ";\n" <<
0255 " }\n" <<
0256 " }\n" <<
0257 " barrier(CLK_LOCAL_MEM_FENCE);\n" <<
0258 " lvals[lid] = result;\n" <<
0259 "}\n" <<
0260 "barrier(CLK_LOCAL_MEM_FENCE);\n" <<
0261
0262 "if(lid > 0){\n" <<
0263
0264 " previous_key = lkeys[lid - 1];\n" <<
0265 " result = lvals[lid - 1];\n" <<
0266 "}\n" <<
0267
0268
0269 "for(idx = id; idx < id + values_per_work_item; idx += 1){\n" <<
0270
0271 " barrier( CLK_GLOBAL_MEM_FENCE );\n" <<
0272 " if(lid > 0 && idx < carry_out_size) {\n"
0273 " key = " << carry_out_keys_first[k.var<const uint_>("idx")] << ";\n" <<
0274 " value = " << carry_in_values_first[k.var<const uint_>("idx")] << ";\n" <<
0275 " if(previous_key == key){\n" <<
0276 " value = " << function(k.var<value_out_type>("result"),
0277 k.var<value_out_type>("value")) << ";\n" <<
0278 " }\n" <<
0279 " " << carry_in_values_first[k.var<const uint_>("idx")] << " = value;\n" <<
0280 " }\n" <<
0281 "}\n";
0282
0283
0284 const context &context = queue.get_context();
0285 kernel kernel = k.compile(context);
0286 kernel.set_arg(local_keys_arg, local_buffer<uint_>(work_group_size));
0287 kernel.set_arg(local_vals_arg, local_buffer<value_out_type>(work_group_size));
0288
0289 queue.enqueue_1d_range_kernel(kernel,
0290 0,
0291 work_group_size,
0292 work_group_size);
0293 }
0294
0295
0296
0297
0298
0299
0300
0301 template<class InputKeyIterator, class InputValueIterator,
0302 class OutputKeyIterator, class OutputValueIterator,
0303 class BinaryFunction>
0304 inline void final_reduction(InputKeyIterator keys_first,
0305 InputValueIterator values_first,
0306 OutputKeyIterator keys_result,
0307 OutputValueIterator values_result,
0308 size_t count,
0309 BinaryFunction function,
0310 vector<uint_>::iterator new_keys_first,
0311 vector<uint_>::iterator carry_in_keys_first,
0312 OutputValueIterator carry_in_values_first,
0313 size_t carry_in_size,
0314 size_t work_group_size,
0315 command_queue &queue)
0316 {
0317 typedef typename
0318 std::iterator_traits<OutputValueIterator>::value_type value_out_type;
0319
0320 detail::meta_kernel k("reduce_by_key_with_scan_final_reduction");
0321 k.add_set_arg<const uint_>("count", uint_(count));
0322 size_t local_keys_arg = k.add_arg<uint_ *>(memory_object::local_memory, "lkeys");
0323 size_t local_vals_arg = k.add_arg<value_out_type *>(memory_object::local_memory, "lvals");
0324
0325 k <<
0326 k.decl<const uint_>("gid") << " = get_global_id(0);\n" <<
0327 k.decl<const uint_>("wg_size") << " = get_local_size(0);\n" <<
0328 k.decl<const uint_>("lid") << " = get_local_id(0);\n" <<
0329 k.decl<const uint_>("group_id") << " = get_group_id(0);\n" <<
0330
0331 k.decl<uint_>("key") << ";\n" <<
0332 k.decl<value_out_type>("value") << ";\n"
0333
0334 "if(gid < count){\n" <<
0335 k.var<uint_>("key") << " = " <<
0336 new_keys_first[k.var<const uint_>("gid")] << ";\n" <<
0337 k.var<value_out_type>("value") << " = " <<
0338 values_first[k.var<const uint_>("gid")] << ";\n" <<
0339 "lkeys[lid] = key;\n" <<
0340 "lvals[lid] = value;\n" <<
0341 "}\n" <<
0342
0343
0344 k.decl<value_out_type>("result") << " = value;\n" <<
0345 k.decl<uint_>("other_key") << ";\n" <<
0346 k.decl<value_out_type>("other_value") << ";\n" <<
0347
0348 "for(" << k.decl<uint_>("offset") << " = 1; " <<
0349 "offset < wg_size ; offset *= 2){\n"
0350 " barrier(CLK_LOCAL_MEM_FENCE);\n" <<
0351 " if(lid >= offset) {\n" <<
0352 " other_key = lkeys[lid - offset];\n" <<
0353 " if(other_key == key){\n" <<
0354 " other_value = lvals[lid - offset];\n" <<
0355 " result = " << function(k.var<value_out_type>("result"),
0356 k.var<value_out_type>("other_value")) << ";\n" <<
0357 " }\n" <<
0358 " }\n" <<
0359 " barrier(CLK_LOCAL_MEM_FENCE);\n" <<
0360 " lvals[lid] = result;\n" <<
0361 "}\n" <<
0362
0363 "if(gid >= count) {\n return;\n};\n" <<
0364
0365 k.decl<const bool>("save") << " = (gid < (count - 1)) ?"
0366 << new_keys_first[k.var<const uint_>("gid + 1")] << " != key" <<
0367 ": true;\n" <<
0368
0369
0370 k.decl<uint_>("carry_in_key") << ";\n" <<
0371 "if(group_id > 0 && save) {\n" <<
0372 " carry_in_key = " << carry_in_keys_first[k.var<const uint_>("group_id - 1")] << ";\n" <<
0373 " if(key == carry_in_key){\n" <<
0374 " other_value = " << carry_in_values_first[k.var<const uint_>("group_id - 1")] << ";\n" <<
0375 " result = " << function(k.var<value_out_type>("result"),
0376 k.var<value_out_type>("other_value")) << ";\n" <<
0377 " }\n" <<
0378 "}\n" <<
0379
0380
0381 "if(save){\n" <<
0382 keys_result[k.var<uint_>("key")] << " = " << keys_first[k.var<const uint_>("gid")] << ";\n" <<
0383 values_result[k.var<uint_>("key")] << " = result;\n" <<
0384 "}\n"
0385 ;
0386
0387 size_t work_groups_no = static_cast<size_t>(
0388 std::ceil(float(count) / work_group_size)
0389 );
0390
0391 const context &context = queue.get_context();
0392 kernel kernel = k.compile(context);
0393 kernel.set_arg(local_keys_arg, local_buffer<uint_>(work_group_size));
0394 kernel.set_arg(local_vals_arg, local_buffer<value_out_type>(work_group_size));
0395
0396 queue.enqueue_1d_range_kernel(kernel,
0397 0,
0398 work_groups_no * work_group_size,
0399 work_group_size);
0400 }
0401
0402
0403
0404 template<class KeyType, class ValueType>
0405 inline size_t get_work_group_size(const device& device)
0406 {
0407 std::string cache_key = std::string("__boost_reduce_by_key_with_scan")
0408 + "k_" + type_name<KeyType>() + "_v_" + type_name<ValueType>();
0409
0410
0411 boost::shared_ptr<parameter_cache> parameters =
0412 detail::parameter_cache::get_global_cache(device);
0413
0414 return (std::max)(
0415 static_cast<size_t>(parameters->get(cache_key, "wgsize", 256)),
0416 static_cast<size_t>(device.get_info<CL_DEVICE_MAX_WORK_GROUP_SIZE>())
0417 );
0418 }
0419
0420
0421
0422
0423
0424
0425
0426
0427
0428
0429 template<class InputKeyIterator, class InputValueIterator,
0430 class OutputKeyIterator, class OutputValueIterator,
0431 class BinaryFunction, class BinaryPredicate>
0432 inline size_t reduce_by_key_with_scan(InputKeyIterator keys_first,
0433 InputKeyIterator keys_last,
0434 InputValueIterator values_first,
0435 OutputKeyIterator keys_result,
0436 OutputValueIterator values_result,
0437 BinaryFunction function,
0438 BinaryPredicate predicate,
0439 command_queue &queue)
0440 {
0441 typedef typename
0442 std::iterator_traits<InputValueIterator>::value_type value_type;
0443 typedef typename
0444 std::iterator_traits<InputKeyIterator>::value_type key_type;
0445 typedef typename
0446 std::iterator_traits<OutputValueIterator>::value_type value_out_type;
0447
0448 const context &context = queue.get_context();
0449 size_t count = detail::iterator_range_size(keys_first, keys_last);
0450
0451 if(count == 0){
0452 return size_t(0);
0453 }
0454
0455 const device &device = queue.get_device();
0456 size_t work_group_size = get_work_group_size<value_type, key_type>(device);
0457
0458
0459
0460
0461 vector<uint_> new_keys(count, context);
0462 vector<uint_>::iterator new_keys_first = new_keys.begin();
0463 generate_uint_keys(keys_first, count, predicate, new_keys_first,
0464 work_group_size, queue);
0465
0466
0467 const size_t carry_out_size = static_cast<size_t>(
0468 std::ceil(float(count) / work_group_size)
0469 );
0470 vector<uint_> carry_out_keys(carry_out_size, context);
0471 vector<value_out_type> carry_out_values(carry_out_size, context);
0472 carry_outs(new_keys_first, values_first, count, carry_out_keys.begin(),
0473 carry_out_values.begin(), function, work_group_size, queue);
0474
0475 vector<value_out_type> carry_in_values(carry_out_size, context);
0476 carry_ins(carry_out_keys.begin(), carry_out_values.begin(),
0477 carry_in_values.begin(), carry_out_size, function, work_group_size,
0478 queue);
0479
0480 final_reduction(keys_first, values_first, keys_result, values_result,
0481 count, function, new_keys_first, carry_out_keys.begin(),
0482 carry_in_values.begin(), carry_out_size, work_group_size,
0483 queue);
0484
0485 const size_t result = read_single_value<uint_>(new_keys.get_buffer(),
0486 count - 1, queue);
0487 return result + 1;
0488 }
0489
0490
0491
0492
0493 template<class InputKeyIterator, class InputValueIterator,
0494 class OutputKeyIterator, class OutputValueIterator>
0495 bool reduce_by_key_with_scan_requirements_met(InputKeyIterator keys_first,
0496 InputValueIterator values_first,
0497 OutputKeyIterator keys_result,
0498 OutputValueIterator values_result,
0499 const size_t count,
0500 command_queue &queue)
0501 {
0502 typedef typename
0503 std::iterator_traits<InputValueIterator>::value_type value_type;
0504 typedef typename
0505 std::iterator_traits<InputKeyIterator>::value_type key_type;
0506 typedef typename
0507 std::iterator_traits<OutputValueIterator>::value_type value_out_type;
0508
0509 (void) keys_first;
0510 (void) values_first;
0511 (void) keys_result;
0512 (void) values_result;
0513
0514 const device &device = queue.get_device();
0515
0516 if(device.get_info<CL_DEVICE_LOCAL_MEM_TYPE>() != CL_LOCAL)
0517 {
0518 return false;
0519 }
0520
0521
0522 const size_t local_mem_size = device.get_info<CL_DEVICE_LOCAL_MEM_SIZE>();
0523
0524
0525 size_t work_group_size = get_work_group_size<key_type, value_type>(device);
0526
0527
0528 size_t required_local_mem_size = 0;
0529
0530 required_local_mem_size += sizeof(uint_) * work_group_size;
0531
0532 required_local_mem_size += sizeof(value_out_type) * work_group_size;
0533
0534 return (required_local_mem_size <= local_mem_size);
0535 }
0536
0537 }
0538 }
0539 }
0540
0541 #endif