JUC(六)堵塞队列与线程池

2023-07-11,,

堵塞队列

简介

def:在多线程中实现高效、安全的数据传输,主要是通过一个共享的队列,使得数据能够从一端输入,从另一端输出
当队列是空的,取数据的线程就会被堵塞,直到其他线程往空的队列中添加数据
当队列是满的,放数据的线程就会被堵塞,直到其他线程移除数据
使用堵塞队列能够实现简化线程之间的协作,使用生产者消费者模型而不必实现线程间的同步和协作

堵塞队列的分类

1 ArrayBlockingQueue:由数组结构组成的有界堵塞队列
2 LinkedBlockingQueue:由链表结构组成的有界堵塞队列

大小默认值为Integer.MAX_VALUE

3 DelayQueue:使用优先级队列实现的延迟无界的堵塞队列
4 PriorityBlockingQueue:支持优先级排序的无界堵塞队列
5 SynchronousQueue:不存储元素的堵塞队列,也即是单个元素的队列
6 LinkedTransferQueue:由链表结构组成的无界堵塞队列
7 LinkedTransferDeque:由链表结构组成的双向堵塞队列

堵塞队列的常用方法

Thread Pool线程池

简介

线程池是一种线程使用模式,线程池维护者多个线程,可以避免在执行短时间任务创建和销毁线程的代价。
线程池能够保证内核的充分利用,防止过分调度
线程池的主要工作是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建之后启动这些任务,当任务的数量超过线程数量时,就会将任务放入队列排队等候,等其他线程执行完毕,再从队列中取出任务执行

线程池的特点

降低资源消耗:重复利用自己创建的线程减少创建、销毁的资源消耗
提高响应速度:任务到达的时候,不需要等待线程创建就能直接执行
提高可管理型

线程池的使用

Executors.newFilexedThreadPool(int nThreads),创建指定线程数量的线程池

ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
try {
for (int i = 0; i < 10; i++) {
threadPool1.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is working...");
});
}
} catch (Exception e) { } finally {
threadPool1.shutdown();
}

pool-1-thread-3 is working...

pool-1-thread-3 is working...

pool-1-thread-3 is working...

pool-1-thread-3 is working...

pool-1-thread-3 is working...

pool-1-thread-3 is working...

pool-1-thread-5 is working...

pool-1-thread-4 is working...

pool-1-thread-1 is working...

pool-1-thread-2 is working...

Process finished with exit code 0

只会使用线程池中创建的五个线程去执行任务

Executors.newWorkStealingPool(),创建只含有一个线程的线程池

public static void main(String[] args) {
ExecutorService threadPool1 = Executors.newSingleThreadExecutor();
try {
for (int i = 0; i < 10; i++) {
threadPool1.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is working...");
});
}
} catch (Exception e) { } finally {
threadPool1.shutdown();
} }

pool-1-thread-1 is working...

pool-1-thread-1 is working...

pool-1-thread-1 is working...

pool-1-thread-1 is working...

pool-1-thread-1 is working...

pool-1-thread-1 is working...

pool-1-thread-1 is working...

pool-1-thread-1 is working...

pool-1-thread-1 is working...

pool-1-thread-1 is working...

Process finished with exit code 0

Executors.newCachedThreadPool(),创建可扩容的线程池

public static void main(String[] args) {
ExecutorService threadPool1 = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
threadPool1.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is working...");
});
}
} catch (Exception e) { } finally {
threadPool1.shutdown();
}
}

pool-1-thread-3 is working...

pool-1-thread-9 is working...

pool-1-thread-4 is working...

pool-1-thread-1 is working...

pool-1-thread-6 is working...

pool-1-thread-7 is working...

pool-1-thread-5 is working...

pool-1-thread-8 is working...

pool-1-thread-2 is working...

pool-1-thread-10 is working...

三种线程池本质都是使用的:

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>())

线程池的七个参数

    public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
    corePoolSize:核心常驻线程的数量
    maximumPoolSize:线程池最大线程数量
    keepAliveTime:非核心线程的存活时间
    unit:存活时间的单位
    workQueue:任务数量超过常驻线程的堵塞队列
    threadFactory:线程工厂
    handler:线程池拒接策略,即任务数量超过线程池最大线程数量的处理策略

线程池的工作流程和拒绝策略

工作流程

主线程首先通过线程池的execute()方法创建线程
线程数量达到常驻线程的数量,任务就会被放入堵塞队列中
如果堵塞队列也满了,主线程又通过execute()方法创建线程的时候,线程池就会为此任务创建线程并分配(而不是堵塞队列?这里很疑惑)
堵塞队列满了,线程池也满了的时候,主线程再次执行execute()方法,线程池就会去执行拒绝策略

拒绝策略
    AbortPolicy(默认):直接抛出RejectExecutionException异常
    CallerRunsPolicy:将任务回退给调用者,以降低新任务的流量
    DisgardOldestPolicy:抛弃队列中等待最久的任务,而将当前任务添加到队列,尝试再次提交
    DiscardPolicy:装死

自定义线程池

为什么要自定义线程池

    FixedThreadPool和SingleThreadPool允许的请求队列的大小为Integer.MAX_VALUE,容易造成请求堆积,从而OOM
    CachedThreadPool则是允许创建的最大线程数为Integer.MAX_VALUE,会创建大量的线程,从而OOM
public static void main(String[] args) {
var threadPool = new ThreadPoolExecutor(2,
5,
5,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " is working.");
});
}
}
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.hikaru.juc.threadPool.ThreadPoolDemo$$Lambda$14/0x00000008010031f0@7e6cbb7a rejected from java.util.concurrent.ThreadPoolExecutor@4769b07b[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
at com.hikaru.juc.threadPool.ThreadPoolDemo.main(ThreadPoolDemo.java:15)
pool-1-thread-4 is working.
pool-1-thread-4 is working.
pool-1-thread-4 is working.
pool-1-thread-4 is working.
pool-1-thread-1 is working.
pool-1-thread-5 is working.
pool-1-thread-3 is working.
pool-1-thread-2 is working.

由于超过了最大线程数5 + 堵塞队列 3,触发了拒绝策略抛出了异常

JUC(六)堵塞队列与线程池的相关教程结束。

《JUC(六)堵塞队列与线程池.doc》

下载本文的Word格式文档,以方便收藏与打印。