Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-17 08:35:01

0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements. See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership. The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License. You may obtain a copy of the License at
0009  *
0010  *   http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing,
0013  * software distributed under the License is distributed on an
0014  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0015  * KIND, either express or implied. See the License for the
0016  * specific language governing permissions and limitations
0017  * under the License.
0018  */
0019 
0020 #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_
0021 #define _THRIFT_CONCURRENCY_THREADMANAGER_H_ 1
0022 
0023 #include <functional>
0024 #include <memory>
0025 #include <thrift/concurrency/ThreadFactory.h>
0026 
0027 namespace apache {
0028 namespace thrift {
0029 namespace concurrency {
0030 
0031 /**
0032  * Thread Pool Manager and related classes
0033  *
0034  * @version $Id:$
0035  */
0036 class ThreadManager;
0037 
0038 /**
0039  * ThreadManager class
0040  *
0041  * This class manages a pool of threads. It uses a ThreadFactory to create
0042  * threads. It never actually creates or destroys worker threads, rather
0043  * it maintains statistics on number of idle threads, number of active threads,
0044  * task backlog, and average wait and service times and informs the PoolPolicy
0045  * object bound to instances of this manager of interesting transitions. It is
0046  * then up the PoolPolicy object to decide if the thread pool size needs to be
0047  * adjusted and call this object addWorker and removeWorker methods to make
0048  * changes.
0049  *
0050  * This design allows different policy implementations to use this code to
0051  * handle basic worker thread management and worker task execution and focus on
0052  * policy issues. The simplest policy, StaticPolicy, does nothing other than
0053  * create a fixed number of threads.
0054  */
0055 class ThreadManager {
0056 
0057 protected:
0058   ThreadManager() = default;
0059 
0060 public:
0061   typedef std::function<void(std::shared_ptr<Runnable>)> ExpireCallback;
0062 
0063   virtual ~ThreadManager() = default;
0064 
0065   /**
0066    * Starts the thread manager. Verifies all attributes have been properly
0067    * initialized, then allocates necessary resources to begin operation
0068    */
0069   virtual void start() = 0;
0070 
0071   /**
0072    * Stops the thread manager. Aborts all remaining unprocessed task, shuts
0073    * down all created worker threads, and releases all allocated resources.
0074    * This method blocks for all worker threads to complete, thus it can
0075    * potentially block forever if a worker thread is running a task that
0076    * won't terminate.
0077    *
0078    * Worker threads will be joined depending on the threadFactory's detached
0079    * disposition.
0080    */
0081   virtual void stop() = 0;
0082 
0083   enum STATE { UNINITIALIZED, STARTING, STARTED, JOINING, STOPPING, STOPPED };
0084 
0085   virtual STATE state() const = 0;
0086 
0087   /**
0088    * \returns the current thread factory
0089    */
0090   virtual std::shared_ptr<ThreadFactory> threadFactory() const = 0;
0091 
0092   /**
0093    * Set the thread factory.
0094    * \throws InvalidArgumentException if the new thread factory has a different
0095    *                                  detached disposition than the one replacing it
0096    */
0097   virtual void threadFactory(std::shared_ptr<ThreadFactory> value) = 0;
0098 
0099   /**
0100    * Adds worker thread(s).
0101    */
0102   virtual void addWorker(size_t value = 1) = 0;
0103 
0104   /**
0105    * Removes worker thread(s).
0106    * Threads are joined if the thread factory detached disposition allows it.
0107    * Blocks until the number of worker threads reaches the new limit.
0108    * \param[in]  value  the number to remove
0109    * \throws InvalidArgumentException if the value is greater than the number
0110    *                                  of workers
0111    */
0112   virtual void removeWorker(size_t value = 1) = 0;
0113 
0114   /**
0115    * Gets the current number of idle worker threads
0116    */
0117   virtual size_t idleWorkerCount() const = 0;
0118 
0119   /**
0120    * Gets the current number of total worker threads
0121    */
0122   virtual size_t workerCount() const = 0;
0123 
0124   /**
0125    * Gets the current number of pending tasks
0126    */
0127   virtual size_t pendingTaskCount() const = 0;
0128 
0129   /**
0130    * Gets the current number of pending and executing tasks
0131    */
0132   virtual size_t totalTaskCount() const = 0;
0133 
0134   /**
0135    * Gets the maximum pending task count.  0 indicates no maximum
0136    */
0137   virtual size_t pendingTaskCountMax() const = 0;
0138 
0139   /**
0140    * Gets the number of tasks which have been expired without being run
0141    * since start() was called.
0142    */
0143   virtual size_t expiredTaskCount() const = 0;
0144 
0145   /**
0146    * Adds a task to be executed at some time in the future by a worker thread.
0147    *
0148    * This method will block if pendingTaskCountMax() in not zero and pendingTaskCount()
0149    * is greater than or equalt to pendingTaskCountMax().  If this method is called in the
0150    * context of a ThreadManager worker thread it will throw a
0151    * TooManyPendingTasksException
0152    *
0153    * @param task  The task to queue for execution
0154    *
0155    * @param timeout Time to wait in milliseconds to add a task when a pending-task-count
0156    * is specified. Specific cases:
0157    * timeout = 0  : Wait forever to queue task.
0158    * timeout = -1 : Return immediately if pending task count exceeds specified max
0159    * @param expiration when nonzero, the number of milliseconds the task is valid
0160    * to be run; if exceeded, the task will be dropped off the queue and not run.
0161    *
0162    * @throws TooManyPendingTasksException Pending task count exceeds max pending task count
0163    */
0164   virtual void add(std::shared_ptr<Runnable> task,
0165                    int64_t timeout = 0LL,
0166                    int64_t expiration = 0LL) = 0;
0167 
0168   /**
0169    * Removes a pending task
0170    */
0171   virtual void remove(std::shared_ptr<Runnable> task) = 0;
0172 
0173   /**
0174    * Remove the next pending task which would be run.
0175    *
0176    * @return the task removed.
0177    */
0178   virtual std::shared_ptr<Runnable> removeNextPending() = 0;
0179 
0180   /**
0181    * Remove tasks from front of task queue that have expired.
0182    */
0183   virtual void removeExpiredTasks() = 0;
0184 
0185   /**
0186    * Set a callback to be called when a task is expired and not run.
0187    *
0188    * @param expireCallback a function called with the shared_ptr<Runnable> for
0189    * the expired task.
0190    */
0191   virtual void setExpireCallback(ExpireCallback expireCallback) = 0;
0192 
0193   static std::shared_ptr<ThreadManager> newThreadManager();
0194 
0195   /**
0196    * Creates a simple thread manager the uses count number of worker threads and has
0197    * a pendingTaskCountMax maximum pending tasks. The default, 0, specified no limit
0198    * on pending tasks
0199    */
0200   static std::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count = 4,
0201                                                                  size_t pendingTaskCountMax = 0);
0202 
0203   class Task;
0204 
0205   class Worker;
0206 
0207   class Impl;
0208 };
0209 }
0210 }
0211 } // apache::thrift::concurrency
0212 
0213 #endif // #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_