Java 线程池 Java并发之线程池详解 小奥 2024-02-28 2024-02-28 Java并发之线程池详解 一、线程池概述 1.1 什么是线程池 程序运行的本质就是占用系统的资源,优化资源的使用从而衍生出来的技术就是池化技术。
线程池其实就是一种多线程的处理方式 ,处理过程中可以将任务添加到队列中,然后创建线程处理这些任务。
线程池的优点:
降低资源消耗。 线程池通过复用利用已经创建的线程来降低线程的创建和销毁造成的消耗。
提高响应速度。 当有新的任务到达时,如果线程池中有空闲状态的线程,任务可以不需要等待线程的创建就可以立即执行。
提高线程的可管理性。 线程是一种稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以对线程进行统一的分配和管理。
1.2 线程池的执行过程 线程池的处理流程如下:
线程池判断核心线程池里的线程是否都在执行任务。
如果不是,则创建一个新的工作线程来执行任务。
如果核心线程池里的线程都在执行任务,则进入下一个流程。
线程池判断工作队列是否已满。
如果工作队列没有满,则将新提交的任务存储在这个工作队列里面。
如果工作队列满了,则进入下个流程。
线程池判断线程池的线程是否都处于工作状态。
如果没有,则创建一个新的工作线程来执行任务。
如果已经满了,则交饱和策略(拒绝策略)来处理这个任务。
二、 自定义线程池 2.1 阻塞队列 public class BlockingQueue <T> { private Deque<T> queue = new ArrayDeque <>(); private ReentrantLock lock = new ReentrantLock (); private Condition fullWaitSet = lock.newCondition(); private Condition emptyWaitSet = lock.newCondition(); private int cap; public BlockingQueue (int cap) { this .cap = cap; } public T take () { lock.lock(); try { while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } public T poll (long timeout, TimeUnit timeUnit) { lock.lock(); try { long l = timeUnit.toMillis(timeout); while (queue.isEmpty()) { try { if (l <= 0 ) return null ; l = emptyWaitSet.awaitNanos(l); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } public void put (T ele) { lock.lock(); try { while (queue.size() == cap) { try { fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(ele); emptyWaitSet.signal(); } finally { lock.unlock(); } } public boolean offer (T ele, long time, TimeUnit unit) { lock.lock(); try { long l = unit.toNanos(time); while (queue.size() == cap) { try { if (l <= 0 ) { return false ; } l = fullWaitSet.awaitNanos(l); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(ele); emptyWaitSet.signal(); return true ; } finally { lock.unlock(); } } public int size () { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } public void tryPut (RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if (queue.size() == cap) { rejectPolicy.reject(this , task); } else { queue.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } }
2.2 自定义线程池 public class ThreadPool { private BlockingQueue<Runnable> taskQueue; private HashSet<Worker> workers = new HashSet (); private int coreSize; private long timeout; private TimeUnit unit; private RejectPolicy<Runnable> rejectPolicy; public ThreadPool (int coreSize, long timeout, TimeUnit unit, int queueCap, RejectPolicy<Runnable> rejectPolicy) { this .coreSize = coreSize; this .timeout = timeout; this .unit = unit; this .taskQueue = new BlockingQueue <>(queueCap); this .rejectPolicy = rejectPolicy; } public void execute (Runnable task) { synchronized (workers) { if (workers.size() < coreSize) { Worker worker = new Worker (task); System.out.println("新增worker对象:" + worker + ", 任务对象: " + task); workers.add(worker); worker.start(); } else { System.out.println("加入任务队列:" + task); taskQueue.put(task); taskQueue.tryPut(rejectPolicy, task); } } } class Worker extends Thread { private Runnable task; public Worker (Runnable task) { this .task = task; } @Override public void run () { while (task != null || (task = taskQueue.poll(timeout, unit)) != null ) { try { System.out.println("正在执行:" + task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null ; } } synchronized (workers) { System.out.println("worker被移除:" + this ); workers.remove(this ); } } } }
2.3 拒绝策略实现 使用策略模式:
@FunctionalInterface public interface RejectPolicy <T> { void reject (BlockingQueue<T> queue, T task) ; }
2.4 测试 public class Main { public static void main (String[] args) { ThreadPool threadPool = new ThreadPool (1 , 1000 , TimeUnit.MILLISECONDS, 1 , (queue, task) -> { task.run(); }); for (int i = 0 ; i < 3 ; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - " + j); }); } } }
三、ThreadPoolExecutor 3.1 线程状态 ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量。
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
状态名
高3位
接收新任务
处理阻塞队列任务
说明
RUNNING
111
Y
Y
SHUTDOWN
000
N
Y
不会接收新任务,但会处理阻塞队列中剩余任务
STOP
001
N
N
会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING
010
-
-
任务全执行完毕,活动线程为0即将进入终结
TERMINATED
011
-
-
终结状态
注意,从数字上比较,TERMINATED
>TIDYING
>STOP
>SHUTDOWN
>RUNNING
。
为什么不使用两个整数来表示线程池状态和线程数量呢?
因为目的是将线程池状态和线程个数合二为一,就可以使用一次CAS原子操作进行赋值了。 这些信息存储在原子变量ctl中。
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) private static int ctlOf (int rs, int wc) { return rs | wc; }
(2) 构造方法 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
七大参数:
corePoolSize
:核心线程数。
maximumPoolSize
:最大线程数。
keepAliveTime
:当线程数大于核心时,这是多余的空闲线程将在终止前等待新任务的最大时间。
unit
:时间单位。
workQueue
:在执行任务之前用来保存任务的队列(阻塞队列)。
threadFactory
:线程工厂。
RejectedExecutionHandler
:拒绝策略。
① 核心参数解释
corePoolSize线程池核心线程数大小
线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut
。这里的最小线程数量即是corePoolSize
。任务提交到线程池后,首先会检查当前线程数是否达到了corePoolSize
,如果没有达到的话,则会创建一个新线程来处理这个任务。
maximumPoolSize 线程池最大线程数量
当前线程数达到corePoolSize
后,如果继续有任务被提交到线程池,会将任务缓存到工作队列中。如果队列也已满,则会去创建一个新线程来出来处理。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize
指定。
keepAliveTime 空闲线程存活时间
一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize
,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime
来设定。
unit 空闲线程存活时间单位
unit
空闲线程存活时间单位。
workQueue 工作队列
新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。
几种阻塞队列:
ArrayBlockingQueue
:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先 进先出)原则对元素进行排序。
LinkedBlockingQueue
:一个基于链表结构的阻塞队列,此队列按 FIFO 排序元 素,吞吐量通常要高于 ArrayBlockingQueue
。静态工厂方法 Executors.newFixedThreadPool()
使用了这个队列。
SynchronousQueue
:一个不存储元素的阻塞队列。每个插入操作必须等到另一个 线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 Linked-BlockingQueue
,静态工厂方法 Executors.newCachedThreadPool
使用了这 个队列。
PriorityBlockingQueue
:一个具有优先级的无限阻塞队列。
threadFactory 线程工厂
创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等。
handler 拒绝策略
当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,这时就会采用拒绝策略的方式来处理这些任务。
② 拒绝策略源码 四种拒绝策略:
AbortPolicy:中止策略。 默认的拒绝策略,抛出未检查的异常RejectedExecutionException
。调用者可以通过捕获这个异常,来实现自己的处理逻辑。
DiscardPolicy:丢弃策略。 当新提交的任务无法保存到队列中等待执行时,该策略会丢弃该任务。
DiscardOldestPolicy:丢弃最旧策略。 当新提交的任务无法保存到队列中执行时,则会丢弃最先进入队列的任务,然后尝试提交新的任务。
CallerRunsPolicy:调用者运行策略。 该策略实现了一种调节机制,既不会抛弃任务,也不会抛出异常,而是将任务回退到调用者(调用线程池执行任务的主线程)。
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException ("Task " + r.toString() + " rejected from " + e.toString()); } }
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { } }
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy () { } public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
(3) 使用 ① 创建线程池 通过ThreadPoolExecutor
构造方法创建一个线程池:
new ThreadPoolExecutor (corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);
参数在上面已经介绍过了。
② 提交任务 可以使用两个方法向线程池提交任务,分别为**execute()和 submit()**方法。
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
executor.execute(new Runnable () { @Override public void run () { } });
submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过 future 的 get()方法来获取 返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
Future<Object> future = executor.submit(task); try { Object o = future.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { } finally { executor.shutdown(); }
③ 关闭线程池 可以通过调用线程池的 shutdown
或 shutdownNow
方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法 响应中断的任务可能永远无法终止 。
shutdownNow
首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown
只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown 方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow 方 法。
(4) 源码分析 ① 成员变量 private final HashSet<Worker> workers = new HashSet <Worker>(); private final ReentrantLock mainLock = new ReentrantLock ();private final Condition termination = mainLock.newCondition();private volatile int corePoolSize;private volatile int maximumPoolSize;private volatile long keepAliveTime;private final BlockingQueue<Runnable> workQueue;private volatile ThreadFactory threadFactory;private volatile RejectedExecutionHandler handler;private int largestPoolSize; private long completedTaskCount; private volatile boolean allowCoreThreadTimeOut;
② 内部类Worker private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() != 0 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
public Thread newThread (Runnable r) { Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon() != daemon) { t.setDaemon(daemon); } if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { } return t; }
③ submit() AbstractExecutorService#submit()
:提交任务,把 Runnable 或 Callable 任务封装成 FutureTask 执行 ,可以通过方法返回的任务对象,调用 get 阻塞获取任务执行的结果或者异常。
public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException (); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; } public <T> Future<T> submit (Runnable task, T result) { if (task == null ) throw new NullPointerException (); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
protected <T> RunnableFuture<T> newTaskFor (Runnable runnable, T value) { return new FutureTask <T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor (Callable<T> callable) { return new FutureTask <T>(callable); }
④ execute() execute():执行任务,但是没有返回值,没办法获取任务执行结果 ,出现异常会直接抛出任务执行时的异常。根据线程池中的线程数,选择添加任务时的处理方式。
ThreadPoolExecutor执行execute方法分下面四种情况:
如果当前运行的线程少于corePoolSize,则创建新的线程来执行任务(注意,执行这一步骤需要获取全局锁)。
如果运行的线程数大于等于corePoolSize,则将任务加入BlockingQueue。
如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
如果创建新线程使当前运行的线程超过maximumPoolSize,任务将会被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
⑤ 添加线程方法 prestartAllCoreThreads():提前预热 ,创建所有的核心线程。
public int prestartAllCoreThreads () { int n = 0 ; while (addWorker(null , true )) ++n; return n; }
addWorker():添加线程到线程池 ,返回 true 表示创建 Worker 成功,且线程启动。首先判断线程池是否允许添加线程,允许就让线程数量 + 1,然后去创建 Worker 加入线程池。
注意:SHUTDOWN 状态也能添加线程,但是要求新加的 Woker 没有 firstTask,而且当前 queue 不为空,所以创建一个线程来帮助线程池执行队列中的任务。
private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWorkerFailed():清理任务
private void addWorkerFailed (Worker w) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (w != null ) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
⑥ 运行方法 runWorker() Worker#run:Worker 实现了 Runnable 接口,当线程启动时,会调用 Worker 的 run() 方法。
public void run () { runWorker(this ); }
runWorker():线程启动就要执行任务,会一直 while 循环获取任务并执行。
final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
getTask():获取任务 ,线程空闲时间超过 keepAliveTime 就会被回收,判断的依据是当前线程阻塞获取任务超过保活时间 ,方法返回 null 就代表当前线程要被回收了,返回到 runWorker 执行线程退出逻辑。线程池具有担保机制,对于 RUNNING 状态下的超时回收,要保证线程池中最少有一个线程运行,或者任务阻塞队列已经是空。
private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
processWorkerExit():线程退出线程池 ,也有担保机制,保证队列中的任务被执行
private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
⑦ 停止方法 shutdown() shutdown():停止线程池
public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
interruptIdleWorkers():shutdown 方法会中断所有空闲线程 ,根据是否可以获取 AQS 独占锁判断是否处于工作状态。线程之所以空闲是因为阻塞队列没有任务,不会中断正在运行的线程,所以 shutdown 方法会让所有的任务执行完毕。
private void interruptIdleWorkers () { interruptIdleWorkers(false ); } private void interruptIdleWorkers (boolean onlyOne) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break ; } } finally { mainLock.unlock(); } }
shutdownNow() shutdownNow():直接关闭线程池,不会等待任务执行完成。
public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
tryTerminate():设置为 TERMINATED 状态 if either (SHUTDOWN and pool and queue empty) or (STOP and pool empty)。
final void tryTerminate () { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return ; if (workerCountOf(c) != 0 ) { interruptIdleWorkers(ONLY_ONE); return ; } final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0 )); termination.signalAll(); } return ; } } finally { mainLock.unlock(); } } }
(5) 合理配置线程池 要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。
任务的优先级:高、中和低。
任务的执行时间:长、中和短。
任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。CPU 密集型任务应配置尽可能 小的线程,如配置 Ncpu+1 个线程的线程池。由于 IO 密集型任务线程并不是一直在执行 任务,则应配置尽可能多的线程,如 2*Ncpu。混合型的任务,如果可以拆分,将其拆分 成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差 太大,则没必要进行分解。可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的 CPU 个数。
优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优 先级高的任务先执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,等待的 时间越长,则 CPU 空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用 CPU。
(6) 线程池的监控 如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时, 可以根据线程池的使用状况快速定位问题。
可以通过线程池提供的参数进行监控, 在监控线程池的时候可以使用以下属性。
taskCount
:线程池需要执行的任务数量
completedTaskCount
:线程池在运行过程中已经完成的任务数量,小于或者等于taskCount。
largestPoolSize
:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
getPoolSize
:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。
getActiveCount
:获取活动的线程数。
通过扩展线程池进行监控。 可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute 和 terminated 方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最 小执行时间等。