Sentinel基本使用与源码分析

2023-07-29,,

系列文章目录和关于我

一丶什么是Sentinel

Sentinel官网

Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量路由流量控制流量整形熔断降级系统自适应过载保护热点流量防护等多个维度来帮助开发者保障微服务的稳定性。

流量整形:限制流出某一网络的某一连接的流量与突发,使这类报文以比较均匀的速度向外发送。流量整形通常使用缓冲区和令牌桶来完成,当报文的发送速度过快时,首先在缓冲区进行缓存,在令牌桶的控制下再均匀地发送这些被缓冲的报文

二丶主要功能

1.流量控制

任意时间到来的请求往往是随机不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制

流量控制有以下几个角度:

资源的调用关系,例如资源的调用链路,资源和资源之间的关系;
运行指标,例如 QPS、线程池、系统负载等;
控制的效果,例如直接限流、冷启动、排队等。

2.熔断降级

当调用链路中某个资源出现不稳定,例如,表现为 timeout,异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩的效果。

为何发生雪崩:如下图中服务D奄奄一息,其他服务对它具备依赖,对服务D发起调用的时候常常超时,RT很大,导致服务G线程都block在调用服务D的这一步中,导致服务G也奄奄一息,久而久之其他服务都处于不可服务的状态。

3.系统负载保护

当系统负载较高的时候,如果还持续让请求进入,可能会导致系统崩溃,无法响应。Sentinel 提供了对应的保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请求。

三丶基本使用

1.sentinel依赖引入

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>版本号</version>
</dependency>

2.初始化限流规则

 private static void initFlowQpsRule() {
//限流规则列表
List<FlowRule> rules = new ArrayList<FlowRule>();
//第一个规则
FlowRule rule1 = new FlowRule();
//rule1针对什么资源(资源:指你要对什么限流,一般是方法名称)
rule1.setResource(KEY);
// 设置限流阈值为20
rule1.setCount(20);
//限流策略:
// 0:线程数(那么就是20个线程并发访问的时候限流)
//1:qps:每秒访问资源的数量,超过20进行限流
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
//将受来源限制的应用程序名称
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}

3.限流使用

Entry entry = null;
try {
entry= SphU.entry("资源名称");
//执行正常业务逻辑
} catch (BlockException e) {
//执行被限流后的业务逻辑
} finally {
if (entry != null) {
entry.close();
}
}

4.控制台设置限流规则

5.控制台设置降级规则

四丶基本原理

1.基本概念

Entry

在Sentinel中,所有的资源都对应一个资源名称以及一个 Entry。

Entry负责记录当前调用的信息:

创建时间:用于统计rt
当前节点Node:当前上下文中资源的统计信息(Node是Sentinel中的一个接口,负责记录实时统计信息)。
来源Node:调用来源方信息。

Slot

每一个 Entry 创建的时候,同时也会创建一系列功能插槽(slot chain)。这些插槽有不同的职责。

2.原理图

五丶源码学习

Sentinel不仅支持单机流控,还支持集群流控制,个人认为在流量分配均匀的情况下,单机流控完全够用了,并且集群流控需要额外的资源来进行集群服务器信息同步,感觉用处不是很大。

0.Sentinel SPI

一个框架需要考虑到扩展性,实现扩展性的一个很好的方式就是SPI(Service Provider Interface)SPI可与实现将装配的控制权移到程序之外,实现使用方和提供方的解耦

Sentinel提供了SpiLoader方便进行SPI服务实现的加载,Sentinel中很多核心组件都依赖此类进行加载。常见使用如下

SpiLoader.of(xxx.class).loadInstanceListSorted()

0.1 of 方法创建SpiLoader实例

这种double check + synchronized实现线程安全的方式在Sentinel中非常常见。

0.2 SpiLoader对象加载服务提供者

SpiLoader会使用ClassLoader读取META-INF/services/服务提供者class全限定类名文件中的数据,并解析@Spi注解中的内容,决定是否单例,是否默认实现,以及顺序等内容,加载服务实现并返回结果。

1.FlowRuleManager 管理FlowRule

FlowRule: 表示对资源采取何种流控手段。
FlowRuleManager:提供FlowRule的存储,查询等功能。

FlowRuleManager内部使用map来存储管理资源和对应的流控规则(一个资源可存在多个流控规则)。

另外还提供PropertyListener来实现FlowRule变化后的回调。

2.SphU.entry 进行流控

2.1 Env 调用InitFunc#init

这里会使用Env中的static final单例CtSph对象进行流控规则,并且会进行InitFunc#init的调用

2.2 包装资源为StringResourceWrapper

ResourceWrapper是对资源的包装,存在两个实现,MethodResourceWrapper在调用SphU#entry(Method method)的时候使用到。

上面原理图中提到,Sentinel具备slot链条,在获取链条的时候,会根据ResourceWrapper从缓存map中获取,hash的规则是ResourceWrapper的名称,MethodResourceWrapper的名称是:方法定义类全限定名称:方法名称(参数类型全限定类名)

2.3 entryWithPriority 进行流控

2.3.1 获取Context

Context保存当前调用元数据,首先会通过ContextUtil从ThreadLocal中获取,如果不存在会new一个,并且设置到ThreadLocal中,随着Entry#close方法的时候会进行资源释放。

构建Context的时候,会设置其中的Node(名称为sentinel_default_context的EntranceNode),

Node在Sentinel中用来生成树状结构,描述方法的调用

2.3.2 lookProcessChain构建Slot执行链条

slot链条式sentinel实现流量统计,和限流的核心,获取slot链条ProcessorSlot的代码如下

从缓存map中获取

SPI获取SlotChainBuilder

这里是一个扩展点,我们可与实现基于配置中心的chainBuilder,实现slot链条配置化

DefaultSlotChainBuilder 构造ProcessorSlotChain

ProcessorSlotChain也是一个ProcessorSlot处理器插槽,内部使用net属性串联下一个AbstractLinkedProcessorSlot。

我们可扩展自己的ProcessorSlot,使用SPI机制轻松加入到ProcessorSlotChain中。

2.3.2 依次执行ProcessorSlot#entry

1.NodeSelectorSlot

NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级

2.ClusterBuilderSlot

ClusterBuilderSlot会生成用于存储资源的统计信息以及调用者信息的ClusterNode,ClusterNode会负责存储该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据。

3.StatisticSlot

用于记录、统计不同纬度的 runtime 指标监控信息

统计请求通过数量使用了StatisticNode#addPassRequest方法

最终都是使用ArrayMetric进行记录, ArrayMetric 内部使用OccupiableBucketLeapArray或者BucketLeapArray进行计数,具体如何记录在后续章节中分析

4.AuthoritySlot

基于白名单黑名单逻辑的权限校验,默认情况是没有启用的。

5.SystemSlot

通过系统的状态,例如 load,cpu使用率等,来控制总的入口流量。qps使用的是滑动窗口算法进行统计,load,cpu使用率这些指标通过com.sun.management.OperatingSystemMXBean获取。

6.FlowSlot

根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。

可看到,每一个配置的FlowRule,都会调用canPassCheck检查,只要存在任何一个不满足要求,都会抛出FlowException

这里会根据FlowRule#setControlBehavior的不同选择不同的TrafficShapingController进行校验,也可以通过FlowRule#setRater直接指定的实现。

DefaultController :默认流量整形控制器,超过任何规则的阈值后,新的请求就会立即拒绝,拒绝方式为抛出FlowException

这里代码逻辑较为简单,获取当前qps或者线程数,如果acquireCount加上当前qps大于count(阈值)那么返回false。

另外可以看到Sentinel提供了预占能力。

实现的难点在于怎么统计qps,统计线程数,这部分在后续章节中进行学习。

ThrottlingController :均速排队,严格控制请求通过的时间间隔,也即是让请求以均匀的速度通过,对应的是漏桶算法。下面是判断是否通过的代码:

private boolean checkPassUsingCachedMs(int acquireCount, double maxCountPerStat) {
//当前时间
long currentTime = TimeUtil.currentTimeMillis(); //statDurationMs = 1000ms 即1s(假设qps限制为20,maxCountPerStat=20)
//1000ms * 1(请求数量)/ maxCountPerStat = 产生1个令牌所需要耗费的时间
long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat); // costTime + 上一个请求通过的时间 = 什么时间,此令牌可以产生
long expectedTime = costTime + latestPassedTime.get(); //如果当前时间小于 满足令牌要求的时间(expectedTime)
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
//设置最后通过请求的时间(latestPassedTime是AtomicLong)
latestPassedTime.set(currentTime);
return true;
} else {
// 计算等待的时间
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
//等待的时间超过了排队等待的阈值(默认情况下500ms)那么返回false
if (waitTime > maxQueueingTimeMs) {
return false;
} //上一次请求通过的时间,加上产生令牌需要的时间(addAndGet是自旋+cas操作)
long oldTime = latestPassedTime.addAndGet(costTime);
//等待时间
waitTime = oldTime - TimeUtil.currentTimeMillis();
//如果超过了等待阈值 那么cas减小时间,相当于回滚操作
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// sleep当前线程进行等待
if (waitTime > 0) {
sleepMs(waitTime);
}
return true;
}
}

其实这个代码也不是天衣无缝:

这段代码也有很妙的地方:

WarmUpRateLimiterController/WarmUpController: 预热/冷启动方式。当系统长期处理低水平的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值的上限,给系统一个预热的时间,避免冷系统被压垮。

这部分源码设计到guava的预热算法,后续了解学习

7.DegradeSlot&DefaultCircuitBreakerSlot

二者都是熔断器插槽,并且短路原理一致,DegradeSlot在用户个资源指定DegradeRule(降级规则)的时候会根据DegradeRule构造出断路器CircuitBreaker,而DefaultCircuitBreakerSlot则是用户配置同一的短路规则,并没有给资源指定特定规则的时候,会使用默认规则生成CircuitBreaker。

DegradeRule分为三种:

RuleConstant.DEGRADE_GRADE_RT

根据rt来进行熔断,对应ResponseTimeCircuitBreaker(响应时间断路器)

RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO

根据异常比列进行熔断,对应ExceptionCircuitBreaker

RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT

根据错误进行熔断,对应ExceptionCircuitBreaker

统计rt,错误次数,错误率都是基于LeapArray(滑动窗口算法)进行统计。

在DegradeSlot&DefaultCircuitBreakerSlot的entry方法(限流操作会调用到此方法)此方法会轮流调用CircuitBreaker#tryPass进行短路校验。

这里都会使用AbstractCircuitBreaker#tryPass

CircuitBreaker具备三种状态:

Open,断路器打开,此时会判断是否大于接口恢复时间,如果大于那么修改为半开,让一个请求先试试水,如果成功那么从半开修改为开。
Close:断路器关闭了,此时所有请求都可以通过。并且根据指标(rt,错误率,错误次数)来决定是否将断路器修改为开
Half open,半开状态,此时除了试水的线程可以通过,其他线程都会抛出DegradeException,试水线程会根据是否调用异常,来决定修改为开还是关

可以看出断路器最重要的是维护这三种状态,并且状态的切换需要保证线程安全

如果断路器处于Open,首先会通过retryTimeoutArrived方法判断当前时间是否大于恢复时间点,如果不是,说明短路了返回false,DegradeSlot会抛出DegradeException,阻止调用。如果大于恢复时间点,会调用fromOpenToHalfOpen 尝试cas open->half_open,并且注册回调,此回调会在 entry.close()的时候被触发。

在DegradeSlot&DefaultCircuitBreakerSlot的exit方法中,会触发circuitBreaker#onRequestComplete(如果执行顺利没有出现BlockException异常的话)

这里会根据DegradeRule调用不同的CircuitBreaker

ExceptionCircuitBreaker

从Entry当中拿到执行的错误,如果具备错误,那么更新滑动窗口中的错误数

private void handleStateChangeWhenThresholdExceeded(Throwable error) {
//当前是开,那么什么也不做,降级slot在开的状态会判断当前时间和恢复时间,实现降级效果,so,这里不需要做什么
if (currentState.get() == State.OPEN) {
return;
} //半开
if (currentState.get() == State.HALF_OPEN) {
// 半开但是执行过程中无异常,那么调整为close,说明被调用方法接口已经稳定了,可以修改为close
if (error == null) {
fromHalfOpenToClose();
} else {
//如果具备异常,那么修改为开,并且更新接口恢复时间,实现熔断
fromHalfOpenToOpen(1.0d);
}
return;
} //开状态,滑动窗口统计错误数,和总数
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
//统计错误数,和总数
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
//小于 最小请求数(默认5)认为样本太少,直接啥也不做
if (totalCount < minRequestAmount) {
return;
}
//默认根据错误次数,curCount记录错误次数
double curCount = errCount;
//根据错误比例,curCount记录错误率
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
// Use errorRatio
curCount = errCount * 1.0d / totalCount;
}
//错误比例 or 错误率超过了阈值,那么调整为开,并且更新接口恢复时间,实现熔断
if (curCount > threshold) {
transformToOpen(curCount);
}
}

ResponseTimeCircuitBreaker

和上面类似,只不过是根据rt来判断,而不是错误次数,错误率。同使用滑动窗口实现计数。

2.4 LeapArry是如何实现滑动窗口计数的

ArrayMetirc被StatisticSlot调用addPass方法

这里的data便是LeapArray的子类BucketLeapArray,或者OccupiableBucketLeapArray

2.4.1构造方法

从构造方法我们可以看出LeapArray的构成

可以看到数据的存储使用了AtomicReferenceArray,其中sampleCount = 2,intervalInMs = 1000,也就说默认只有两个窗口,时间跨度为1s。

2.4.2 获取当前时间对应的桶

这一段就是滑动窗口的精髓

public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
//根据当前时间,计算当前时间位于窗口数组中的下标
// 比如当前是 1600ms,窗口总大小为1000ms,一共两个格子,每一个格子大小为500ms
//1600ms 落在(1600/500)% 2 = 1
int idx = calculateTimeIdx(timeMillis);
// 计算当前时间 对应的起始 => 1600 - 1600%500 = 1500
// |0~500|500~1000|1000~1500|1500~2000| 1600位于1500~2000中所以起始为1500
long windowStart = calculateWindowStart(timeMillis); while (true) {
//获取当前时间对应的桶
WindowWrap<T> old = array.get(idx);
//如果桶为null
if (old == null) {
//创建新元素
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
//cas 设置到数组中
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
//如果cas失败,说明在另外一个线程也是这个index,让当前线程yield放弃cpu
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
//旧桶的start和当前桶一样:意味着是同一秒
//比如600ms也是位于下标1,但是start 是500,就和当前1600ms的start 1500不同 意味着不是同一秒
return old;
} else if (windowStart > old.windowStart()) {
//比如600ms也是位于下标1,但是start 是500,
// 就和当前1600ms的start 1500,1500>500,意味着当前这个桶已经过期了
//上锁
if (updateLock.tryLock()) {
try {
//设置windowStart 并且清空旧桶
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
//上锁失败,那么
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
//当前start小于旧桶start
//这种情况不应该发生,因为时间时越来越来大的
//除非当前线程 一值分配不到时间片?
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}

这里有一个变量windowStart,就如同一个版本,标志了元素是否过期(旧元素windowStart小于当前windowStart说明旧元素过期),是否位于一个桶(windowStart相同说明位于同一个下标,比如1600,和1700,windowStart都是1500)

2.4.3 改变桶记录的值

上面看了是如何拿到当前时间对应下标的桶的,那么桶中记录的数据是如何变更的?

可以看到这里使用了LongAdder进行计数,因为一个桶可能存在多个线程并发更新,使用LongAdder实现热点数据分离,也减少缓存伪共享带来的开销。(JUC源码学习笔记4——原子类源码分析,CAS,Volatile内存屏障,缓存伪共享与UnSafe相关方法)

2.4.4 如何获取当前qps

上面我们直到了,通过的请求,根据时间对应了ArrayMetric滑动窗口数组中的一个元素,这个元素是MetricBucket类型,内部有一个LongAdder的数组,来记录pass,block(通过的请求,被拦截的请求)的数目,那么怎么根据这些数据获取当前qps昵?FlowSlot根据qps限流得使用这个数据呀!

我们看一下这里的pass方法

再看下values方法,入参就是当前时间,这个当前时间timeMills用来排除过期元素

拿通过次数自然是使用了LongAdder#sum方法,但是LongAdder#sum不具备瞬时一致性,也就说可能遍历到n位置,但是其他线程更改n-1位置,n-1的变化不会体现在sum的总和中。

这里有一段代码我觉得有点秒(个人理解,一点拙见不一定对)

就是这里统计pass会先执行data.currentWindow(),把当前时间对应的滑动窗口元素进行初始化。

比如执行data.currentWindow()当前时间点是1001ms,对应桶为null,那么这一步会初始化1001ms对应的桶元素。

后续values遍历的时间是1003ms,这时候就可以拿到1001ms初始化桶的引用返回,后续加入存在另外一个线程在1004ms进行更新,当前线程执行pass累加的时候就又更大概率统计到

为什么我说更大概率,因为如果不初始化的化,1004ms另外一个线程去初始化会new一个桶元素,这个过程是更耗时的,当前线程values遍历可能就没办法拿到1004ms这个线程初始化的桶了(有道理,但是不多doge)。

六丶总结

学到了啥:

学习到了SPI和责任链,这两大解耦+扩展利器。学习到了自旋+CAS的操作。

SPI由一方制定规范,比如SpringBoot的spring.factories,另一方进行扩展,实现服务使用方和提供方的解耦合,以及修改SPI文件内容进行扩展!
责任链简直无处不在,在众多框架中,平时工作也常常使用到,使用责任链,责任单一,并且通过改变链条实现扩展!
对熔断有了更深刻的理解,特别Open状态根据恢复时间判断+cas修改为半开,让一个线程试试水的操作,我大受震撼!
了解到Sentinel还有除了针对单个资源进行限流,还有系统负载限流的功能,这个有点厉害,我一开始还想这个该咋实现,看了SystemSlot,发现自己还是太狭隘了。

不足:

对Sentinel的Node理解不够,文档上说Sentine支持调用链路关系进行限流,这个功能挺牛的,但是我并没有详细阅读这部分源码。

对预热的实现没有深入理解,主要是Guava的预热模型,让二阳的我有点晕了,后续结合guava中的限流进行学习。

Sentinel基本使用与源码分析的相关教程结束。

《Sentinel基本使用与源码分析.doc》

下载本文的Word格式文档,以方便收藏与打印。