Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:53

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 #pragma once
0018 
0019 #include <string_view>
0020 
0021 #include "arrow/acero/exec_plan.h"
0022 #include "arrow/acero/task_util.h"
0023 #include "arrow/acero/util.h"
0024 #include "arrow/compute/exec.h"
0025 #include "arrow/io/interfaces.h"
0026 #include "arrow/util/async_util.h"
0027 #include "arrow/util/type_fwd.h"
0028 
0029 namespace arrow {
0030 
0031 using compute::default_exec_context;
0032 using io::IOContext;
0033 
0034 namespace acero {
0035 
0036 class ARROW_ACERO_EXPORT QueryContext {
0037  public:
0038   QueryContext(QueryOptions opts = {},
0039                ExecContext exec_context = *default_exec_context());
0040 
0041   Status Init(arrow::util::AsyncTaskScheduler* scheduler);
0042 
0043   const ::arrow::internal::CpuInfo* cpu_info() const;
0044   int64_t hardware_flags() const;
0045   const QueryOptions& options() const { return options_; }
0046   MemoryPool* memory_pool() const { return exec_context_.memory_pool(); }
0047   ::arrow::internal::Executor* executor() const { return exec_context_.executor(); }
0048   ExecContext* exec_context() { return &exec_context_; }
0049   IOContext* io_context() { return &io_context_; }
0050   TaskScheduler* scheduler() { return task_scheduler_.get(); }
0051   arrow::util::AsyncTaskScheduler* async_scheduler() { return async_scheduler_; }
0052 
0053   size_t GetThreadIndex();
0054   size_t max_concurrency() const;
0055 
0056   /// \brief Start an external task
0057   ///
0058   /// This should be avoided if possible.  It is kept in for now for legacy
0059   /// purposes.  This should be called before the external task is started.  If
0060   /// a valid future is returned then it should be marked complete when the
0061   /// external task has finished.
0062   ///
0063   /// \param name A name to give the task for traceability and debugging
0064   ///
0065   /// \return an invalid future if the plan has already ended, otherwise this
0066   ///         returns a future that must be completed when the external task
0067   ///         finishes.
0068   Result<Future<>> BeginExternalTask(std::string_view name);
0069 
0070   /// \brief Add a single function as a task to the query's task group
0071   ///        on the compute threadpool.
0072   ///
0073   /// \param fn The task to run. Takes no arguments and returns a Status.
0074   /// \param name A name to give the task for traceability and debugging
0075   void ScheduleTask(std::function<Status()> fn, std::string_view name);
0076   /// \brief Add a single function as a task to the query's task group
0077   ///        on the compute threadpool.
0078   ///
0079   /// \param fn The task to run. Takes the thread index and returns a Status.
0080   /// \param name A name to give the task for traceability and debugging
0081   void ScheduleTask(std::function<Status(size_t)> fn, std::string_view name);
0082   /// \brief Add a single function as a task to the query's task group on
0083   ///        the IO thread pool
0084   ///
0085   /// \param fn The task to run. Returns a status.
0086   /// \param name A name to give the task for traceability and debugging
0087   void ScheduleIOTask(std::function<Status()> fn, std::string_view name);
0088 
0089   // Register/Start TaskGroup is a way of performing a "Parallel For" pattern:
0090   // - The task function takes the thread index and the index of the task
0091   // - The on_finished function takes the thread index
0092   // Returns an integer ID that will be used to reference the task group in
0093   // StartTaskGroup. At runtime, call StartTaskGroup with the ID and the number of times
0094   // you'd like the task to be executed. The need to register a task group before use will
0095   // be removed after we rewrite the scheduler.
0096   /// \brief Register a "parallel for" task group with the scheduler
0097   ///
0098   /// \param task The function implementing the task. Takes the thread_index and
0099   ///             the task index.
0100   /// \param on_finished The function that gets run once all tasks have been completed.
0101   /// Takes the thread_index.
0102   ///
0103   /// Must be called inside of ExecNode::Init.
0104   int RegisterTaskGroup(std::function<Status(size_t, int64_t)> task,
0105                         std::function<Status(size_t)> on_finished);
0106 
0107   /// \brief Start the task group with the specified ID. This can only
0108   ///        be called once per task_group_id.
0109   ///
0110   /// \param task_group_id The ID  of the task group to run
0111   /// \param num_tasks The number of times to run the task
0112   Status StartTaskGroup(int task_group_id, int64_t num_tasks);
0113 
0114   // This is an RAII class for keeping track of in-flight file IO. Useful for getting
0115   // an estimate of memory use, and how much memory we expect to be freed soon.
0116   // Returned by ReportTempFileIO.
0117   struct [[nodiscard]] TempFileIOMark {
0118     QueryContext* ctx_;
0119     size_t bytes_;
0120 
0121     TempFileIOMark(QueryContext* ctx, size_t bytes) : ctx_(ctx), bytes_(bytes) {
0122       ctx_->in_flight_bytes_to_disk_.fetch_add(bytes_, std::memory_order_acquire);
0123     }
0124 
0125     ARROW_DISALLOW_COPY_AND_ASSIGN(TempFileIOMark);
0126 
0127     ~TempFileIOMark() {
0128       ctx_->in_flight_bytes_to_disk_.fetch_sub(bytes_, std::memory_order_release);
0129     }
0130   };
0131 
0132   TempFileIOMark ReportTempFileIO(size_t bytes) { return {this, bytes}; }
0133 
0134   size_t GetCurrentTempFileIO() { return in_flight_bytes_to_disk_.load(); }
0135 
0136  private:
0137   QueryOptions options_;
0138   // To be replaced with Acero-specific context once scheduler is done and
0139   // we don't need ExecContext for kernels
0140   ExecContext exec_context_;
0141   IOContext io_context_;
0142 
0143   arrow::util::AsyncTaskScheduler* async_scheduler_ = NULLPTR;
0144   std::unique_ptr<TaskScheduler> task_scheduler_ = TaskScheduler::Make();
0145 
0146   ThreadIndexer thread_indexer_;
0147 
0148   std::atomic<size_t> in_flight_bytes_to_disk_{0};
0149 };
0150 }  // namespace acero
0151 }  // namespace arrow