Semaphore信号量源码解析

2022-12-07,,,

介绍

Semaphore是什么

Semaphore可以称为信号量,这个原本是操作系统中的概念,是一种线程同步方法,配合PV操作实现线程之间的同步功能。信号量可以表示操作系统中某种资源的个数,因此可以用来控制同时访问该资源的最大线程数,以保证资源的合理使用

Java的JUC对Semaphore做了具体的实现,其功能和操作系统中的信号量基本一致,也是一种线程同步并发工具

使用场景

Semaphore常用于某些有限资源的并发使用场景,即限流

场景一

数据库连接池对于同时连接的线程数有限制,当连接数达到限制后,接下来的线程必须等待前面的线程释放连接才可以获得数据库连接

场景二

医院叫号,放出的号数是固定的,不然医院窗口来不及处理。因此只有取到号才能去门诊,没取到号的只能在外面等待放号

Semaphore常用方法介绍

构造函数

Semaphore(int permits):创建一个许可数为permitsSemaphore
Semaphore(int permits, boolean fair):创建一个许可数为permitsSemaphore,可以选择公平模式或非公平模式

获取许可

void acquire() throws InterruptedException:获取一个许可,响应中断,获取不到则阻塞等待
void acquire(int permits) throws InterruptedException:获取指定数量的许可,响应中断,获取不到则阻塞等待
void acquireUninterruptibly():获取一个许可,忽略中断,获取不到则阻塞等待
void acquireUninterruptibly(int permits):获取指定数量的许可,忽略中断,获取不到则阻塞等待
int drainPermits():获取当前所有可用的许可,并返回获得的许可数

释放许可

void release():释放一个许可
void release(int permits):释放指定数量的许可

尝试获取许可

boolean tryAcquire():尝试获取一个许可,如果获取失败不会被阻塞,而是返回false。成功则返回true
boolean tryAcquire(int permits):尝试获取指定数量的许可,如果获取失败不会被阻塞,而是返回false。成功则返回true
boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException:尝试获取一个许可,如果获取失败则会等待指定时间,如果超时还未获得,则返回false。获取成功则返回true。在等待过程中如果被中断,则会立即抛出中断异常
boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException:尝试获取指定数量的许可,如果获取失败则会等待指定时间,如果超时还未获得,则返回false。获取成功则返回true。在等待过程中如果被中断,则会立即抛出中断异常

其他方法

boolean isFair():判断信号量是否是公平模式
int availablePermits():返回当前可用的许可数
boolean hasQueuedThreads():判断是否有线程正在等待获取许可
int getQueueLength():返回当前等待获取许可的线程的估计值。该值并不是一个确定值,因为在执行该函数时,线程数可能已经发生了变化

Semaphore使用示例

针对“使用场景”的场景二,假设医院最多接待2个人,如果接待满了,那么所有人都必须在大厅等待(不能“忙等”)

代码如下:

无论如何,医院内最多有5个病人同时接诊。这也说明了Semaphore可以控制某种资源最多同时被指定数量的线程使用

作者:酒冽        出处:https://www.cnblogs.com/frankiedyz/p/15674098.html

版权:本文版权归作者和博客园共有

转载:欢迎转载,但未经作者同意,必须保留此段声明;必须在文章中给出原文连接;否则必究法律责任

构造函数

Semaphore有两个构造函数,都是需要设置许可总数,额外的另一个参数是用来控制公平模式or非公平模式的,如果不设置(默认)或设为false,则是非公平模式。如果设置true,则是公平模式

两种构造函数的源码如下:

private final Sync sync;

public Semaphore(int permits) {
sync = new NonfairSync(permits);
} public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

公平模式和非公平模式的区别,就是在于sync域是FairSync类还是NonfairSync类,这两种子类分别对应这两种模式。而permits参数代表许可的个数,作为这两个类的构造函数参数传入,源码如下:

FairSync(int permits) {
super(permits);
} NonfairSync(int permits) {
super(permits);
}

这两个类的构造函数都是进一步调用父类Sync的构造函数:

Sync(int permits) {
setState(permits);
}

从这里就可以明白,许可个数就是state的值。这里为AQS的state域赋了初值,为permits

重要结论:在Semaphore的语境中,AQS的state就表示许可的个数!对于Semaphore的任何获取、释放许可操作,本质上都是state的增减操作

作者:酒冽        出处:https://www.cnblogs.com/frankiedyz/p/15674098.html

版权:本文版权归作者和博客园共有

转载:欢迎转载,但未经作者同意,必须保留此段声明;必须在文章中给出原文连接;否则必究法律责任

获取许可

响应中断的获取

acquire响应中断的获取许可方法,有两个重载版本。如果获取成功,则返回;如果许可不够而导致获取失败,则会进入AQS的同步队列阻塞等待。在整个过程中如果被中断,则会抛出中断异常

acquire(int)

首先来看看可以获取指定数量许可的acquire方法,其源码如下:

public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

该方法会先检查permits是否合法(非负数),再将后续的执行过程委托给Sync类的父类AQS的acquireSharedInterruptibly方法来执行,其源码如下:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

该方法响应中断,首先检查中断状态,如果已经被中断则会抛出中断异常。接下来调用钩子方法tryAcquireShared。如果返回值小于0,说明获取许可失败,会调用doAcquireSharedInterruptibly进入同步队列阻塞等待,等待过程中响应中断

tryAcquireSharedSync的两个子类实现,分别对应公平模式、非公平模式下的获取。这里由于许可是共享资源,所以使用到的AQS的钩子方法都是针对共享资源的获取、释放的。这也很合理,因为许可是可以被多个线程同时持有的,所以Semaphore中的许可是一种共享资源!

接下来分别看一看公平模式和非公平模式下,tryAcquireShared的实现方式是怎样的

公平模式

FairSync类实现的tryAcquireShared方法如下:

protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors()) // 如果发现有线程在等待获取许可,就选择谦让
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
// 如果获取许可数大于当前已有的许可数,获取失败,且返回值小于0
// 如果CAS失败,则重新循环直到获取成功
// 如果CAS成功,说明获取成功,返回剩余的许可数
return remaining;
}
}

总体上是一个for循环,这是为了应对CAS失败的情况。首先判断是否有线程在等待获取许可,如果有就选择谦让,这里体现了公平性

接下来判断获取之后剩余的许可数是否合法,如果小于0,说明没有足够的许可,获取失败,返回值小于0;如果大于0且CAS修改state成功,说明获取许可成功,返回剩余的许可数

这里需要说明一下tryAcquireShared的返回值含义,这个其实在《全网最详细的ReentrantReadWriteLock源码剖析(万字长文)》也有讲解过:

负数:获取失败,线程会进入同步队列阻塞等待
0:获取成功,但是后续以共享模式获取的线程都不可能获取成功
正数:获取成功,且后续以共享模式获取的线程也可能获取成功

非公平模式

NonfairSync类实现的tryAcquireShared方法如下:

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

实际上委托给了父类SyncnonfairTryAcquireShared方法来执行:

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

该方法是Sync类中的一个final方法,禁止子类重写。逻辑上和公平模式的tryAcquireShared基本一致,只是没有调用hasQueuedPredecessors,即使有其他线程在等待获取许可,也不会谦让,而是直接CAS竞争。这里体现了非公平性

acquire()

这个不带参数的acquire方法,默认获取一个许可:

public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1); // 默认获取一个许可
}

acquire(int)基本上是一样的,只是这里获取数固定为1。在公平模式和非公平模式下的获取方式也不相同。这里不再解释

忽略中断的获取

acquireUninterruptibly是忽略中断的获取许可方法,也有两个重载版本。如果获取成功,则返回;如果许可不够而导致获取失败,则会进入AQS的同步队列阻塞等待。在整个过程中如果被中断,不会抛出中断异常,只会记录中断状态

acquireUninterruptibly(int)

首先来看看可以获取指定数量许可的acquireUninterruptibly方法:

public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}

该方法会先检查permits是否合法(非负数),再将后续的执行过程委托给Sync类的父类AQS的acquireShared方法来执行,其源码如下:

public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

这里同样会调用子类实现的tryAcquireShared方法,对于公平模式和非公平模式下的获取方式略有不同,在上面都已经分析过,这里不重复解释了

如果tryAcquireShared获取成功,则直接返回;如果获取失败,则调用doAcquireShared方法,进入同步队列阻塞等待,在等待过程中忽略中断,只记录中断状态<

和响应中断的acquire方法相比,唯一的区别就在于如果获取失败,acquire调用的是doAcquireSharedInterruptibly,响应中断;而则这里的acquireUninterruptibly调用的是doAcquireShared,忽略中断

acquireUninterruptibly()

这个不带参数的acquireUninterruptibly方法,默认获取一个许可:

public void acquireUninterruptibly() {
sync.acquireShared(1); // 默认获取一个许可
}

acquireUninterruptibly(int)基本上是一样的,只是这里获取数固定为1。在公平模式和非公平模式下的获取方式也不相同。这里也不再解释

获取剩余所有许可

这个是Semaphore中比较特殊的一个获取资源的方式,它提供了drainPermits方法,可以直接获取当前剩余的所有许可(资源),并返回获得的个数。其源码如下:

public int drainPermits() {
return sync.drainPermits();
}

该方法实际上委托给了Sync类的drainPermits方法来执行:

final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}

该方法是一个final方法,禁止子类重写。整体上是一个for循环,为了应对CAS失败的情况

首先通过AQS的getState方法获取当前可用的许可数,再一次性全部获取,并返回获得的许可数,很简单~

释放许可

Semaphore提供了release作为释放许可的方法,和获取许可一样,release也有两个重载版本。但是,释放许可和获取有两点不同

释放许可的方法都是忽略异常的,没有响应异常的版本
对于公平模式和非公平模式来说,释放许可的方式都是一样的,因此在Sync类这一层就提供了统一的实现

release(int)

首先来看看释放指定数量许可的release方法:

public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}

该方法会先检查permits是否合法(非负数),再将后续的执行过程委托给Sync类的父类AQS的releaseShared方法来执行,其源码如下:

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

releaseShared会先调用子类重写的tryReleaseShared方法,在公平模式和非公平模式下的释放许可逻辑是一致的,因此在Sync类就对其进行了统一的实现,而没有下放到子类中实现。Sync类的tryReleaseShared方法如下:

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

这也是一个final方法,禁止子类重写。整体上是一个for循环,为了应对CAS失败的情况

循环体内对AQS的state进行修改,不过需要避免释放许可后导致state溢出,否则会抛出错误

使用Semaphore的一点注意事项

relsease(int)的逻辑中,并没有发现对释放的数量有所检查,即一个线程可以释放任意数量的许可,而不管它真正拥有多少许可

比如:一个线程可以释放100个许可,即使它没有获得任何许可,这样必然会导致程序错误

因此,使用Semaphore必须遵守“释放许可的数量一定不能超过当前持有许可的数量”这一规定,即使Semaphore不会对其进行检查!

源码中的注释也指出了这一点:

There is no requirement that a thread that releases a permit must have acquired that permit by calling acquire. Correct usage of a semaphore is established by programming convention in the application.

release()

这个版本的release默认释放一个许可:

public void release() {
sync.releaseShared(1);
}

其他和release(int)一致,这里不再解释,不过使用release()也必须注意遵守上面的规定,Semaphore也不会主动进行检查

作者:酒冽        出处:https://www.cnblogs.com/frankiedyz/p/15674098.html

版权:本文版权归作者和博客园共有

转载:欢迎转载,但未经作者同意,必须保留此段声明;必须在文章中给出原文连接;否则必究法律责任

尝试获取许可

Semaphore虽然提供了阻塞版本的获取方式,也提供了四个版本的尝试获取方式,包括两种:一种是非计时等待版本,一种是计时等待版本

非计时等待

tryAcquire(int)

public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}

该方法会先检查permits是否合法(非负数),再将后续的执行过程委托给Sync类的nonfairTryAcquireShared方法来执行。此方法就是非公平版本的尝试获取许可方式,即使当前Semaphore使用的是公平策略。如果返回值不小于0,说明获取成功,返回true;否则获取失败,返回false。不管成功与否,都会立即返回,不会阻塞等待

tryAcquire()

public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}

这个就是默认获取一个许可的tryAcquire版本,跟上面基本一样,不解释

计时等待

tryAcquire(int, long, TimeUnit)

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

该方法会先检查permits是否合法(非负数),再将后续的执行过程委托给Sync类的父类AQS的tryAcquireSharedNanos方法来执行。该方法会尝试获取许可,如果获取成功,则立即返回;如果获取不到,则阻塞一段时间。如果等待过程中被中断,则会抛出中断异常

tryAcquire(long, TimeUnit)

public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

这个就是默认获取一个许可的计时等待tryAcquire版本,跟上面基本一样,不解释

作者:酒冽        出处:https://www.cnblogs.com/frankiedyz/p/15674098.html

版权:本文版权归作者和博客园共有

转载:欢迎转载,但未经作者同意,必须保留此段声明;必须在文章中给出原文连接;否则必究法律责任

其他方法

Semaphore还提供了一些方法以获取信号量的状态,比如:

当前信号量使用的公平策略
当前可获取的许可数量
是否有线程正在等待获取许可
因为获取许可而被阻塞的线程数

下面一一来看

isFair

public boolean isFair() {
return sync instanceof FairSync;
}

如果Semaphoresync域是FariSync类对象,则说明使用的是公平策略,返回true,否则返回false

availablePermits

public int availablePermits() {
return sync.getPermits();
} final int getPermits() {
return getState();
}

本质上调用的就是AQS的getState方法,返回state的值,而state就代表了当前可获取的许可数量

hasQueuedThreads

public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
} // AQS.hasQueuedThreads
public final boolean hasQueuedThreads() {
return head != tail;
}

本质上调用的是AQS的hasQueuedThreads方法,判断同步队列的headtail是否相同,如果相同,则说明队列为空,没有线程正在等待;否则说明有线程正在等待

getQueueLength

public final int getQueueLength() {
return sync.getQueueLength();
} // AQS.getQueueLength
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}

这个方法实际调用的是AQS.getQueueLength方法。此方法会对同步队列进行一个反向全遍历,返回当前队列长度的估计值。该值并不是一个确定值,因为在执行该函数时,其中的线程数可能已经发生了变化

Semaphore信号量源码解析的相关教程结束。

《Semaphore信号量源码解析.doc》

下载本文的Word格式文档,以方便收藏与打印。