![]() |
|
|||
File indexing completed on 2025-08-28 08:26:53
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 #pragma once 0018 0019 #include <string_view> 0020 0021 #include "arrow/acero/exec_plan.h" 0022 #include "arrow/acero/task_util.h" 0023 #include "arrow/acero/util.h" 0024 #include "arrow/compute/exec.h" 0025 #include "arrow/io/interfaces.h" 0026 #include "arrow/util/async_util.h" 0027 #include "arrow/util/type_fwd.h" 0028 0029 namespace arrow { 0030 0031 using compute::default_exec_context; 0032 using io::IOContext; 0033 0034 namespace acero { 0035 0036 class ARROW_ACERO_EXPORT QueryContext { 0037 public: 0038 QueryContext(QueryOptions opts = {}, 0039 ExecContext exec_context = *default_exec_context()); 0040 0041 Status Init(arrow::util::AsyncTaskScheduler* scheduler); 0042 0043 const ::arrow::internal::CpuInfo* cpu_info() const; 0044 int64_t hardware_flags() const; 0045 const QueryOptions& options() const { return options_; } 0046 MemoryPool* memory_pool() const { return exec_context_.memory_pool(); } 0047 ::arrow::internal::Executor* executor() const { return exec_context_.executor(); } 0048 ExecContext* exec_context() { return &exec_context_; } 0049 IOContext* io_context() { return &io_context_; } 0050 TaskScheduler* scheduler() { return task_scheduler_.get(); } 0051 arrow::util::AsyncTaskScheduler* async_scheduler() { return async_scheduler_; } 0052 0053 size_t GetThreadIndex(); 0054 size_t max_concurrency() const; 0055 0056 /// \brief Start an external task 0057 /// 0058 /// This should be avoided if possible. It is kept in for now for legacy 0059 /// purposes. This should be called before the external task is started. If 0060 /// a valid future is returned then it should be marked complete when the 0061 /// external task has finished. 0062 /// 0063 /// \param name A name to give the task for traceability and debugging 0064 /// 0065 /// \return an invalid future if the plan has already ended, otherwise this 0066 /// returns a future that must be completed when the external task 0067 /// finishes. 0068 Result<Future<>> BeginExternalTask(std::string_view name); 0069 0070 /// \brief Add a single function as a task to the query's task group 0071 /// on the compute threadpool. 0072 /// 0073 /// \param fn The task to run. Takes no arguments and returns a Status. 0074 /// \param name A name to give the task for traceability and debugging 0075 void ScheduleTask(std::function<Status()> fn, std::string_view name); 0076 /// \brief Add a single function as a task to the query's task group 0077 /// on the compute threadpool. 0078 /// 0079 /// \param fn The task to run. Takes the thread index and returns a Status. 0080 /// \param name A name to give the task for traceability and debugging 0081 void ScheduleTask(std::function<Status(size_t)> fn, std::string_view name); 0082 /// \brief Add a single function as a task to the query's task group on 0083 /// the IO thread pool 0084 /// 0085 /// \param fn The task to run. Returns a status. 0086 /// \param name A name to give the task for traceability and debugging 0087 void ScheduleIOTask(std::function<Status()> fn, std::string_view name); 0088 0089 // Register/Start TaskGroup is a way of performing a "Parallel For" pattern: 0090 // - The task function takes the thread index and the index of the task 0091 // - The on_finished function takes the thread index 0092 // Returns an integer ID that will be used to reference the task group in 0093 // StartTaskGroup. At runtime, call StartTaskGroup with the ID and the number of times 0094 // you'd like the task to be executed. The need to register a task group before use will 0095 // be removed after we rewrite the scheduler. 0096 /// \brief Register a "parallel for" task group with the scheduler 0097 /// 0098 /// \param task The function implementing the task. Takes the thread_index and 0099 /// the task index. 0100 /// \param on_finished The function that gets run once all tasks have been completed. 0101 /// Takes the thread_index. 0102 /// 0103 /// Must be called inside of ExecNode::Init. 0104 int RegisterTaskGroup(std::function<Status(size_t, int64_t)> task, 0105 std::function<Status(size_t)> on_finished); 0106 0107 /// \brief Start the task group with the specified ID. This can only 0108 /// be called once per task_group_id. 0109 /// 0110 /// \param task_group_id The ID of the task group to run 0111 /// \param num_tasks The number of times to run the task 0112 Status StartTaskGroup(int task_group_id, int64_t num_tasks); 0113 0114 // This is an RAII class for keeping track of in-flight file IO. Useful for getting 0115 // an estimate of memory use, and how much memory we expect to be freed soon. 0116 // Returned by ReportTempFileIO. 0117 struct [[nodiscard]] TempFileIOMark { 0118 QueryContext* ctx_; 0119 size_t bytes_; 0120 0121 TempFileIOMark(QueryContext* ctx, size_t bytes) : ctx_(ctx), bytes_(bytes) { 0122 ctx_->in_flight_bytes_to_disk_.fetch_add(bytes_, std::memory_order_acquire); 0123 } 0124 0125 ARROW_DISALLOW_COPY_AND_ASSIGN(TempFileIOMark); 0126 0127 ~TempFileIOMark() { 0128 ctx_->in_flight_bytes_to_disk_.fetch_sub(bytes_, std::memory_order_release); 0129 } 0130 }; 0131 0132 TempFileIOMark ReportTempFileIO(size_t bytes) { return {this, bytes}; } 0133 0134 size_t GetCurrentTempFileIO() { return in_flight_bytes_to_disk_.load(); } 0135 0136 private: 0137 QueryOptions options_; 0138 // To be replaced with Acero-specific context once scheduler is done and 0139 // we don't need ExecContext for kernels 0140 ExecContext exec_context_; 0141 IOContext io_context_; 0142 0143 arrow::util::AsyncTaskScheduler* async_scheduler_ = NULLPTR; 0144 std::unique_ptr<TaskScheduler> task_scheduler_ = TaskScheduler::Make(); 0145 0146 ThreadIndexer thread_indexer_; 0147 0148 std::atomic<size_t> in_flight_bytes_to_disk_{0}; 0149 }; 0150 } // namespace acero 0151 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |