线程池

服务器接受大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率,实际在开发中,如果需要创建 5 个以上的线程,那么就可以使用线程池来管理

为什么要使用线程池

  • 反复创建线程开销大
  • 过多的线程会占用太多内存

使用线程池的好处

  • 加快响应速度
  • 合理利用 CPU 和内存
  • 统一管理

线程池构相关参数

参数名 含义
corePoolSize 核心线程数,线程池中保留的线程数,即使它们是空闲的也不会被终止,除非设置了 allowCoreThreadTimeOut
maximumPoolSize 最大线程数
keepAliveTime 保持存活时间,当线程数大于核心数时,这是多余的空闲线程在终止之前等待新任务的最大时间,如果设置了 allowCoreThreadTimeOut,则核心线程也会回收
unit keepAliveTime 的时间单位
workQueue 任务存储队列,用于在任务执行之前保存它们的队列,这个队列将只保存由 execute 方法提交的 Runnable 任务
threadFactory 线程工厂,执行程序创建新线程时要使用的工厂
handle 拒绝策略,当执行因达到线程边界和队列容量而阻塞时要使用的处理程序
workQueue
  • SynchronousQueue:不存储元素的阻塞队列,直接把任务交给线程池的线程来执行,如果没有可用的线程则会拒绝任务,使用此阻塞队列一般要求 maximumPoolSizes 为无界,避免线程拒绝执行操作(每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态)
  • LinkedBlockingQueue:基于链表结构的无界阻塞队列,按 FIFO 排序任务,默认大小为 Integer.MAX_VALUE,为了避免队列过大,一般需要自定义队列的大小,此阻塞队列内部分别使用了 takeLock 和 putLock 对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量
  • LinkedBlockingDeque:使用双向队列实现的双端阻塞队列,双端意味着可以像普通队列一样 FIFO,可以像栈一样 FILO
  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按 FIFO 排序任务
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列,优先级高的任务会被先执行,如果多个任务具有相同的优先级,就会按照插入的顺序执行
  • DelayQueue:使用优先级队列实现的无界阻塞队列,队列中每个元素都有过期时间,当从队列获取元素时,只有过期元素才会出队列,而队列头部的元素是过期最快的元素
  • DelayedWorkQueue:使用优先级队列实现的无界阻塞队列,它并没有像 DelayQueue 那样,将队列操作委托给 PriorityQueue,而是自己重新实现了一遍堆的核心操作(入队元素必须实现 RunnableScheduledFuture 接口)
  • LinkedTransferQueue:SynchronousQueue 和 LinkedBlockingQueue 的合体,性能比 LinkedBlockingQueue 更高(没有锁操作),比 SynchronousQueue能存储更多的元素,当 put 时,如果有等待的线程,就直接将元素交给等待者, 否则直接进入队列
threadFactory
  • DefaultThreadFactory(默认):创建的线程拥有相同优先级、非守护线程、有线程名称
  • PrivilegedThreadFactory:在 defaultThreadFactory 的基础上,可以让运行在这个线程中的任务拥有和这个线程相同的访问控制和 ClassLoader
handle
  • AbortPolicy:直接抛出异常,默认策略
  • CallerRunsPolicy:用调用者所在的线程来执行任务
  • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务
  • DiscardPolicy:直接丢弃任务

添加线程的规则

  • 如果线程小于 corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务
  • 如果线程等于(或大于)corePoolSize 但小于 maximumPoolSize,则将任务放入队列
  • 如果队列已满,并且线程数小于 maximumPoolSize,则创建一个新线程来运行任务
  • 如果队列已满,并且线程数大于或等于 maxPoolSize,则使用拒绝策略

增减线程的特点

  • 通过设置 corePoolSize 和 maxPoolSize 相同,就可以创建固定大小的线程池
  • 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它
  • 通过设置 maxPoolSize 为很高的值,例如 Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务
  • 只有队列填满时才创建多于 corePoolSize 的线程,所以如果使用的是无界队列(例如 LinkedBlockingQueue)那么线程数就不会超过 corePoolSize

创建线程池方式

  • ThreadPoolExecutor:最原始的创建线程池的方式
  • newFixedThreadPool:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待,由于传进去的 LinkedBlockingQueue 是没有容量上限的,所以当请求数越来越多,并且无法及时处理完毕的时候也就是请求堆积的时候,会容易造成占用大量的内存,可能会发生 OОM
  • newSingleThreadExecutor:创建单个线程数的线程池,即只创建唯一的工作者线程来执行任务,单工作线程最大的特点是可以保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的,和 newFixedThreadPool 的原理基本一样,只不过把线程数直接设置成了 1,所以这也会导致同样的问题,也就是当请求堆积的时候,可能会占用大量的内存
  • newCachedThreadPool:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程,这里的弊端在于第二个参数 maximumPoolSize 被设置为了 Integer.MAX_VALUE,这可能会创建数量非常多的线程甚至导致 OOM
  • newScheduledThreadPool:创建一个定长的线程池,而且支持定时的以及周期性的任务执行
  • newSingleThreadScheduledExecutor:创建一个单线程的可以执行延迟任务的线程池
  • newWorkStealingPool:创建一个具有抢占式操作的线程池

相关方法

  • execute:提交任务,交给线程池来执行
  • submit:提交任务,能够返回执行结果
  • getTaskCount:返回线程池已执行和未执行的任务总数
  • getCompletedTaskCount:已完成的任务数量
  • getPoolSize:线程池当前的线程数量
  • getActiveCount:线程池中正在执行任务的线程数量
  • shutdown:平滑的关闭 ExecutorService,当调用这个方法时,ExecutorService 会停止接受任何新的任务且等待已经提交的任务执行完成,当所有已经提交的任务执行完毕后将会关闭 ExecutorService
  • isShutdown:线程池是否关闭,当调用 shutdown() 方法后返回为true
  • isTerminated:判断线程池关闭后所有的任务是否都执行完了,所有提交的任务完成后返回为true
  • awaitTermination:两个参数,一个是 timeout 即超时时间,另一个是 unit 即时间单位,这个方法会使线程等待 timeout 时长,当超过 timeout 时间后,会监测 ExecutorService 是否已经关闭,若关闭则返回 true,否则返回 false
  • shutdownNow:强制关闭 ExecutorService,它将取消所有运行中的任务和在工作队列中等待的任务,这个方法返回一个 List 列表,列表中返回的是等待在工作队列中的任务

线程池状态

image.png

  • RUNNING:接受新任务并处理排队任务
  • SHUTDOWN:不接受新任务,但处理排队任务
  • STOP:不接受新任务,也不处理排队任务,并中断正在进行的任务
  • TIDYING:所有任务都已终止,workerCount 为 0 时,线程会转换到 TIDYING 状态,并将运行 terminate() 钩子方法
  • TERMINATED:terminate() 运行完成

线程池调优

  • CPU 密集任务:N + 1
  • IO 密集任务:2N
  • 混合型任务:N * U * (1 + WT/ST)

N:CPU 核心数
U:期望 CPU 利用率
WT:线程等待时间
ST:线程运行时间
使用 jvisualvm 分析,点击抽样器,总时间-自用时间=线程等待时间,自用时间=线程运行时间

参考:https://www.javacodegeeks.com/2012/03/threading-stories-about-robust-thread.html

1
2
3
4
5
6
7
8
9
10
11
Target queue memory usage (bytes): 100000
createTask() produced ink.ckx.consurrency.threadPool.AsynchronousTask which took 40 bytes in a queue
Formula: 100000 / 40
* Recommended queue capacity (bytes): 2500
Number of CPU: 16
Target utilization: 1
Elapsed time (nanos): 3000000000
Compute time (nanos): 2968750000
Wait time (nanos): 31250000
Formula: 16 * 1 * (1 + 31250000 / 2968750000)
* Optimal thread count: 16