堵塞队列
简介
def:在多线程中实现高效、安全的数据传输,主要是通过一个共享的队列,使得数据能够从一端输入,从另一端输出
当队列是空的,取数据的线程就会被堵塞,直到其他线程往空的队列中添加数据
当队列是满的,放数据的线程就会被堵塞,直到其他线程移除数据
使用堵塞队列能够实现简化线程之间的协作,使用生产者消费者模型而不必实现线程间的同步和协作
堵塞队列的分类
1 ArrayBlockingQueue:由数组结构组成的有界堵塞队列
2 LinkedBlockingQueue:由链表结构组成的有界堵塞队列
大小默认值为Integer.MAX_VALUE
3 DelayQueue:使用优先级队列实现的延迟无界的堵塞队列
4 PriorityBlockingQueue:支持优先级排序的无界堵塞队列
5 SynchronousQueue:不存储元素的堵塞队列,也即是单个元素的队列
6 LinkedTransferQueue:由链表结构组成的无界堵塞队列
7 LinkedTransferDeque:由链表结构组成的双向堵塞队列
堵塞队列的常用方法
Thread Pool线程池
简介
线程池是一种线程使用模式,线程池维护者多个线程,可以避免在执行短时间任务创建和销毁线程的代价。
线程池能够保证内核的充分利用,防止过分调度
线程池的主要工作是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建之后启动这些任务,当任务的数量超过线程数量时,就会将任务放入队列排队等候,等其他线程执行完毕,再从队列中取出任务执行
线程池的特点
降低资源消耗:重复利用自己创建的线程减少创建、销毁的资源消耗
提高响应速度:任务到达的时候,不需要等待线程创建就能直接执行
提高可管理型
线程池的使用
Executors.newFilexedThreadPool(int nThreads),创建指定线程数量的线程池
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
try {
for (int i = 0; i < 10; i++) {
threadPool1.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is working...");
});
}
} catch (Exception e) {
} finally {
threadPool1.shutdown();
}
pool-1-thread-3 is working...
pool-1-thread-3 is working...
pool-1-thread-3 is working...
pool-1-thread-3 is working...
pool-1-thread-3 is working...
pool-1-thread-3 is working...
pool-1-thread-5 is working...
pool-1-thread-4 is working...
pool-1-thread-1 is working...
pool-1-thread-2 is working...
Process finished with exit code 0
只会使用线程池中创建的五个线程去执行任务
Executors.newWorkStealingPool(),创建只含有一个线程的线程池
public static void main(String[] args) {
ExecutorService threadPool1 = Executors.newSingleThreadExecutor();
try {
for (int i = 0; i < 10; i++) {
threadPool1.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is working...");
});
}
} catch (Exception e) {
} finally {
threadPool1.shutdown();
}
}
pool-1-thread-1 is working...
pool-1-thread-1 is working...
pool-1-thread-1 is working...
pool-1-thread-1 is working...
pool-1-thread-1 is working...
pool-1-thread-1 is working...
pool-1-thread-1 is working...
pool-1-thread-1 is working...
pool-1-thread-1 is working...
pool-1-thread-1 is working...
Process finished with exit code 0
Executors.newCachedThreadPool(),创建可扩容的线程池
public static void main(String[] args) {
ExecutorService threadPool1 = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
threadPool1.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is working...");
});
}
} catch (Exception e) {
} finally {
threadPool1.shutdown();
}
}
pool-1-thread-3 is working...
pool-1-thread-9 is working...
pool-1-thread-4 is working...
pool-1-thread-1 is working...
pool-1-thread-6 is working...
pool-1-thread-7 is working...
pool-1-thread-5 is working...
pool-1-thread-8 is working...
pool-1-thread-2 is working...
pool-1-thread-10 is working...
三种线程池本质都是使用的:
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>())
线程池的七个参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
- corePoolSize:核心常驻线程的数量
maximumPoolSize:线程池最大线程数量
keepAliveTime:非核心线程的存活时间
unit:存活时间的单位
workQueue:任务数量超过常驻线程的堵塞队列
threadFactory:线程工厂
handler:线程池拒接策略,即任务数量超过线程池最大线程数量的处理策略
线程池的工作流程和拒绝策略
工作流程
主线程首先通过线程池的execute()方法创建线程
线程数量达到常驻线程的数量,任务就会被放入堵塞队列中
如果堵塞队列也满了,主线程又通过execute()方法创建线程的时候,线程池就会为此任务创建线程并分配(而不是堵塞队列?这里很疑惑)
当堵塞队列满了,线程池也满了的时候,主线程再次执行execute()方法,线程池就会去执行拒绝策略
拒绝策略
- AbortPolicy(默认):直接抛出RejectExecutionException异常
CallerRunsPolicy:将任务回退给调用者,以降低新任务的流量
DisgardOldestPolicy:抛弃队列中等待最久的任务,而将当前任务添加到队列,尝试再次提交
DiscardPolicy:装死
自定义线程池
为什么要自定义线程池
- FixedThreadPool和SingleThreadPool允许的请求队列的大小为Integer.MAX_VALUE,容易造成请求堆积,从而OOM
CachedThreadPool则是允许创建的最大线程数为Integer.MAX_VALUE,会创建大量的线程,从而OOM
public static void main(String[] args) {
var threadPool = new ThreadPoolExecutor(2,
5,
5,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is working.");
});
}
}
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.hikaru.juc.threadPool.ThreadPoolDemo$$Lambda$14/0x00000008010031f0@7e6cbb7a rejected from java.util.concurrent.ThreadPoolExecutor@4769b07b[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
at com.hikaru.juc.threadPool.ThreadPoolDemo.main(ThreadPoolDemo.java:15)
pool-1-thread-4 is working.
pool-1-thread-4 is working.
pool-1-thread-4 is working.
pool-1-thread-4 is working.
pool-1-thread-1 is working.
pool-1-thread-5 is working.
pool-1-thread-3 is working.
pool-1-thread-2 is working.
由于超过了最大线程数5 + 堵塞队列 3,触发了拒绝策略抛出了异常