欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > Sentinel源码—2.Context和处理链的初始化二

Sentinel源码—2.Context和处理链的初始化二

2025/4/21 2:06:16 来源:https://blog.csdn.net/mjunz/article/details/147259284  浏览:    关键词:Sentinel源码—2.Context和处理链的初始化二

大纲

1.Sentinel底层的核心概念

2.Sentinel中Context的设计思想与源码实现

3.Java SPI机制的引入

4.Java SPI机制在Sentinel处理链中的应用

5.Sentinel默认处理链ProcessorSlot的构建

4.Java SPI机制在Sentinel处理链中的应用

(1)初始化Entry会初始化处理链

(2)初始化处理链的设计分析

(3)Sentinel初始化处理链的完整流程

(4)Sentinel初始化处理任链之加载SPI文件

(5)Sentinel初始化处理链之实例化Class

(1)初始化Entry会初始化处理链

初始化Entry时会调用两个与处理链相关的核心方法:一是调用CtSph的lookProcessChain()方法初始化处理链,二是调用ProcessorSlot的entry()方法执行处理链节点的逻辑。

public class CtSph implements Sph {...//Record statistics and perform rule checking for the given resource.//@param name the unique name for the protected resource//@param type the traffic type (inbound, outbound or internal).//This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule//@param count the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens)//@param args args for parameter flow control or customized slots@Overridepublic Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {//StringResourceWrapper是ResourceWrapper的子类,且StringResourceWrapper的构造方法默认了资源类型为COMMONStringResourceWrapper resource = new StringResourceWrapper(name, type);return entry(resource, count, args);}//Do all {@link Rule}s checking about the resource.public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {//调用CtSph.entryWithPriority()方法,执行如下处理://初始化Context -> 将Context与线程绑定 -> 初始化Entry -> 将Context和ResourceWrapper放入Entry中return entryWithPriority(resourceWrapper, count, false, args);}private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {//从当前线程中获取ContextContext context = ContextUtil.getContext();if (context instanceof NullContext) {return new CtEntry(resourceWrapper, null, context);}//如果没获取到Contextif (context == null) {//Using default context.//创建一个名为sentinel_default_context的Context,并且与当前线程绑定context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);}//Global switch is close, no rule checking will do.if (!Constants.ON) {return new CtEntry(resourceWrapper, null, context);}//调用CtSph.lookProcessChain()方法初始化处理链(处理器插槽链条)ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);if (chain == null) {return new CtEntry(resourceWrapper, null, context);}//创建出一个Entry对象,将处理链(处理器插槽链条)、Context与Entry绑定//其中会将Entry的三个基础属性(封装在resourceWrapper里)以及当前Entry所属的Context作为参数传入CtEntry的构造方法Entry e = new CtEntry(resourceWrapper, chain, context);try {//处理链(处理器插槽链条)入口,负责采集数据,规则验证//调用DefaultProcessorSlotChain.entry()方法执行处理链每个节点的逻辑(数据采集+规则验证)chain.entry(context, resourceWrapper, null, count, prioritized, args);} catch (BlockException e1) {//规则验证失败,比如:被流控、被熔断降级、触发黑白名单等e.exit(count, args);throw e1;} catch (Throwable e1) {RecordLog.info("Sentinel unexpected exception", e1);}return e;}...private final static class InternalContextUtil extends ContextUtil {static Context internalEnter(String name) {//调用ContextUtil.trueEnter()方法创建一个Context对象return trueEnter(name, "");}static Context internalEnter(String name, String origin) {return trueEnter(name, origin);}}
}

(2)初始化处理链的设计分析

一.初始化处理链需要加锁确保线程安全

由于每个线程在执行CtSph的entry()方法创建一个Entry对象时,都需要首先调用CtSph的lookProcessChain()方法获取一个处理链,然后再根据这个处理链去初始化一个Entry对象,所以多个同样的Entry对象会共用一个处理链对象。

需要注意:即便是多个线程访问同样的资源(ResourceWrapper对象的属性一样),多个线程也会对应多个Entry对象(Entry对象之间的基本属性一样),多个线程也会对应多个Context对象(使用ThreadLocal存放Context对象),从而多个Entry对象会对应各自的Context对象。多个Entry对象会共用一个处理链对象(使用HashMap来缓存处理链对象),多个Context对象会共用一个Node对象(使用HashMap来缓存Node对象),多个Entry对象会共用一个Node对象(使用HashMap来缓存Node对象)。

因此当出现多线程并发创建多个Entry对象时,CtSph的lookProcessChain()方法自然就会被多线程并发调用。所以在初始化处理链时,需要考虑线程安全和性能问题。

为了确保线程安全,可以采用加锁的方式来初始化处理链,然后将处理链缓存到HashMap中来提高性能,如下伪代码所示:

//缓存处理链,key为资源,value为处理链
//多个线程创建多个Entry对象时,也会创建多个ResourceWrapper对象
//但这些ResourceWrapper对象,在同一个资源下,其属性是一样的
private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>();//加锁synchronized
synchronized ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {ProcessorSlotChain chain = chainMap.get(resourceWrapper);if (chain == null) {//构建(初始化)处理链(处理器插槽链条)chain = SlotChainProvider.newSlotChain();//将处理链(处理器插槽链条)放到缓存中chainMap.put(resourceWrapper, chain);}return chain;
}

二.初始化处理链通过SPI机制动态管理节点

初始化处理链时可使用List硬编码添加每个节点。

public class DefaultSlotChainBuilder implements SlotChainBuilder {@Overridepublic ProcessorSlotChain build() {//硬编码ProcessorSlotChain chain = new DefaultProcessorSlotChain();chain.addLast(new NodeSelectorSlot());chain.addLast(new ClusterBuilderSlot());chain.addLast(new StatisticSlot());chain.addLast(new AuthoritySlot());chain.addLast(new SystemSlot());chain.addLast(new FlowSlot());chain.addLast(new DegradeSlot());return chain;}
}

但硬编码的缺点是无法动态增减ProcessorSlot。比如不需要授权AuthoritySlot,硬编码时就不好删除了。比如内置的几个ProcessorSlot不符合业务需求,硬编码难以实现自定义。因此,需要使用SPI机制来解决硬编码带来的问题。也就是在配置文件中注册多个实现类,然后通过迭代器的方式逐个加载。

具体就是在META-INF/services目录下创建一个接口文件,然后将需要注册的实现类的全类名每行一个写入该文件中。框架启动时依次读取该文件中的实现类,并按照文件中注册的顺序加载。如果业务系统也创建了META-INF/services目录以及一个接口文件,那么业务系统的实现类并不会覆盖框架内置的实现类,而是叠加起来使用。

利用SPI机制,可以实现责任链模式的可插拔式扩展,处理链(处理器插槽链条)的每个节点都可以动态添加、删除、替换。

三.使用SPI机制的两大步骤

步骤一:创建META-INF/services目录,然后在该目录下创建一个文件名为如下的文件,文件内容为定义了处理链由哪些节点组成的类的类名全路径。

com.alibaba.csp.sentinel.slotchain.SlotChainBuilder

步骤二:通过ServiceLoader初始化处理链节点。

public static void main(String[] args){ServiceLoader<SlotChainBuilder> serviceLoader = ServiceLoader.load(SlotChainBuilder.class);...
}

四.初始化处理链的设计要点总结

要点一:初始化处理链时加锁

要点二:使用HashMap缓存处理链

要点三:使用SPI机制初始化处理链

(3)Sentinel初始化处理链的完整流程

一.将处理链存储在全局缓存 + 使用锁初始化处理链

如果对CtSph的lookProcessChain()方法加锁,则锁的粒度过大。所以可以在操作缓存处理链的HashMap时才加synchronized锁,而且在操作缓存处理链的HashMap时,使用了Double Check + 写时复制。

二.利用SPI机制完成处理链的初始化

为了增加扩展性,Sentinel初始化处理链时使用了SPI机制两次。第一次使用SPI机制,是为了可以自定义处理链的节点编排。第二次使用SPI机制,是为了可以自定义处理链各节点的具体逻辑。

public class CtSph implements Sph {//Same resource will share the same ProcessorSlotChain}, no matter in which Context.//Same resource is that ResourceWrapper#equals(Object).private static volatile Map<ResourceWrapper, ProcessorSlotChain> chainMap = new HashMap<ResourceWrapper, ProcessorSlotChain>();private static final Object LOCK = new Object();...//Get ProcessorSlotChain of the resource.//new ProcessorSlotChain will be created if the resource doesn't relate one.//Same resource will share the same ProcessorSlotChain globally, no matter in which Context.//Same resource is that ResourceWrapper#equals(Object).//Note that total ProcessorSlot count must not exceed Constants.MAX_SLOT_CHAIN_SIZE, otherwise null will return.ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {ProcessorSlotChain chain = chainMap.get(resourceWrapper);if (chain == null) {//操作chainMap时才加锁synchronized (LOCK) {chain = chainMap.get(resourceWrapper);if (chain == null) {//Double Check//Entry size limit.if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {return null;}//初始化处理链(处理器插槽链条)chain = SlotChainProvider.newSlotChain();//写时复制Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(chainMap.size() + 1);newMap.putAll(chainMap);newMap.put(resourceWrapper, chain);chainMap = newMap;}}}return chain;}...
}//A provider for creating slot chains via resolved slot chain builder SPI.
public final class SlotChainProvider {private static volatile SlotChainBuilder slotChainBuilder = null;//The load and pick process is not thread-safe,//but it's okay since the method should be only invoked via CtSph.lookProcessChain() under lock.public static ProcessorSlotChain newSlotChain() {//如果存在,则直接返回if (slotChainBuilder != null) {return slotChainBuilder.build();}//Resolve the slot chain builder SPI.//第一次使用SPI: 通过SPI机制初始化SlotChainBuilderslotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();if (slotChainBuilder == null) {//Should not go through here.RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");slotChainBuilder = new DefaultSlotChainBuilder();} else {RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName());}return slotChainBuilder.build();}private SlotChainProvider() {}
}public final class SpiLoader<S> {//Cache the SpiLoader instances, key: classname of Service, value: SpiLoader instanceprivate static final ConcurrentHashMap<String, SpiLoader> SPI_LOADER_MAP = new ConcurrentHashMap<>();//Cache the classes of Providerprivate final List<Class<? extends S>> classList = Collections.synchronizedList(new ArrayList<Class<? extends S>>());//Cache the sorted classes of Providerprivate final List<Class<? extends S>> sortedClassList = Collections.synchronizedList(new ArrayList<Class<? extends S>>());...//Create SpiLoader instance via Service class Cached by className, and load from cache firstpublic static <T> SpiLoader<T> of(Class<T> service) {AssertUtil.notNull(service, "SPI class cannot be null");AssertUtil.isTrue(service.isInterface() || Modifier.isAbstract(service.getModifiers()), "SPI class[" + service.getName() + "] must be interface or abstract class");String className = service.getName();SpiLoader<T> spiLoader = SPI_LOADER_MAP.get(className);if (spiLoader == null) {synchronized (SpiLoader.class) {spiLoader = SPI_LOADER_MAP.get(className);if (spiLoader == null) {//Double CheckSPI_LOADER_MAP.putIfAbsent(className, new SpiLoader<>(service));spiLoader = SPI_LOADER_MAP.get(className);}}}return spiLoader;}//Load the first-found Provider instance,if not found, return default Provider instancepublic S loadFirstInstanceOrDefault() {//SPI机制加载Class,然后将加载的Class放到classList数组里load();//循环遍历,根据classList里的Class来初始化对应的实例for (Class<? extends S> clazz : classList) {if (defaultClass == null || clazz != defaultClass) {return createInstance(clazz);}}//初始化默认的DefaultSlotChainBuilderreturn loadDefaultInstance();}//Load all Provider instances of the specified Service, sorted by order value in class's {@link Spi} annotationpublic List<S> loadInstanceListSorted() {//比如读取com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件里的Class名字//然后根据这些Class名字加载Class//接着将Class放到sortedClassList集合中load();//实例化sortedClassList集合里的每个Classreturn createInstanceList(sortedClassList);}...
}//Builder for a default {@link ProcessorSlotChain}.
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {@Overridepublic ProcessorSlotChain build() {ProcessorSlotChain chain = new DefaultProcessorSlotChain();//通过SPI机制加载责任链的节点ProcessorSlot实现类//然后按照@Spi注解的order属性进行排序并进行实例化//最后将ProcessorSlot实例放到sortedSlotList中List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();//遍历已排好序的ProcessorSlot集合for (ProcessorSlot slot : sortedSlotList) {//安全检查,防止业务系统也写了一个SPI文件,但没按规定继承AbstractLinkedProcessorSlotif (!(slot instanceof AbstractLinkedProcessorSlot)) {RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");continue;}//调用DefaultProcessorSlotChain.addLast()方法构建单向链表//将责任链的节点ProcessorSlot实例放入DefaultProcessorSlotChain中chain.addLast((AbstractLinkedProcessorSlot<?>) slot);}//返回单向链表return chain;}
}

第一次使用SPI机制是初始化SlotChainBuilder,在com.alibaba.csp.sentinel.slotchain.SlotChainBuilder文件中,框架默认的是com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder。

在SpiLoader的loadFirstInstanceOrDefault()方法中,根据"if (defaultClass == null || clazz != defaultClass)"可知,系统接入Sentinel时可以自定义SPI接口文件来替换DefaultSlotChainBuilder。

比如想删除AuthoritySlot,那么可以在META-INF/services目录下,创建文件名如下,文件值为自定义接口的实现类全路径,如下:

文件名:com.alibaba.csp.sentinel.slotchain.SlotChainBuilder
文件值:com.example.MyCustomSlotChainBuilder

然后在MyCustomSlotChainBuilder中就可以自定义一套处理链的规则。

public class MyCustomSlotChainBuilder implements SlotChainBuilder {@Overridepublic ProcessorSlotChain build() {ProcessorSlotChain chain = new DefaultProcessorSlotChain();chain.addLast(new NodeSelectorSlot());chain.addLast(new ClusterBuilderSlot());chain.addLast(new StatisticSlot());chain.addLast(new FlowSlot());return chain;}
}

第二次使用SPI机制是执行DefaultSlotChainBuilder的build()方法初始化处理链,其中的核心代码是SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted()。所调用的方法会读取com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件内容,然后根据文件内容加载Class,接着将Class放到sortedClassList集合中,最后实例化sortedClassList集合中的Class并返回ProcessorSlot实例列表。

注意:ProcessorSlot实现类是有序的,即处理链的节点是有序的。比如ClusterBuilderSlot的前一个节点必须是NodeSelectorSlot,StatisticSlot的前一个节点必须是ClusterBuilderSlot等。

(4)Sentinel初始化处理链之加载SPI文件

Sentinel初始化处理链时会先后调用如下两个方法,这两个方法都会调用SpiLoader的load()方法来加载SPI文件。

SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault()
SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted()

SpiLoader的load()方法的执行过程就是加载SPI文件的过程。但是由于SPI文件可能会有很多个,比如负责管控整个处理链的Builder对应的SPI文件:

com.alibaba.csp.sentinel.slotchain.SlotChainBuilder

以及处理链节点的ProcessorSlot对应的SPI文件:

com.alibaba.csp.sentinel.slotchain.ProcessorSlot

虽然它们都隶属于META-INF/services下,但其文件名是不一样的。所以按理应在load()方法添加一个参数如:load(String fileName)。但是Sentinel却没这么做,因为Sentinel使用配置去替代参数。

具体做法就是:先在SpiLoader内定义常量SPI_FILE_PREFIX = "META-INF/services/"。有了SPI文件夹后,还需接口全路径名为名称的文件,就可读取文件了。

于是SpiLoader提供了一个静态的of()方法,来指定要加载那一类接口。也就是SpiLoader的of()方法会返回一个指定加载某种接口的SpiLoader实例。

然后在执行指定加载某一类接口的SpiLoader实例的load()方法时,就能将指定要加载的接口的名称拼接上SPI_FILE_PREFIX文件夹前缀,得到一份完整的文件路径,接着就可通过IO流读取文件里的内容。

从文件中获取到指定加载的某一类接口的实现类类名之后,就可以通过Class.forName() + 线程上下文类加载器去加载对应的实现类,加载到的实现类会放入到classList和sortedClassList两个列表中。

注意:Sentinel的SPI机制没有使用JDK内置的ServiceLoader,而是自己实现。因为Sentinel的SPI有一些定制化逻辑,比如@Spi注解的order属性可以指定实例化类时的顺序。但本质上和JDK内置的ServiceLoader一致,只是多了个性化的逻辑。

public final class SpiLoader<S> {//Default path for the folder of Provider configuration file//SPI文件夹路径private static final String SPI_FILE_PREFIX = "META-INF/services/";//Cache the SpiLoader instances, key: classname of Service, value: SpiLoader instance//每个接口Class对应的SpiLoader,只需new一次即可,new完就可以将其缓存起来private static final ConcurrentHashMap<String, SpiLoader> SPI_LOADER_MAP = new ConcurrentHashMap<>();//Cache the classes of Providerprivate final List<Class<? extends S>> classList = Collections.synchronizedList(new ArrayList<Class<? extends S>>());//Cache the sorted classes of Providerprivate final List<Class<? extends S>> sortedClassList = Collections.synchronizedList(new ArrayList<Class<? extends S>>());//The Service class, must be interface or abstract class//指定要使用SPI进行加载的实现类的接口的Class,比如SlotChainBuilder.class、ProcessorSlot.classprivate Class<S> service;...private SpiLoader(Class<S> service) {this.service = service;}//Create SpiLoader instance via Service class//Cached by className, and load from cache first//@param service Service class 指定要使用SPI加载的实现类的接口的Classpublic static <T> SpiLoader<T> of(Class<T> service) {//判断是不是nullAssertUtil.notNull(service, "SPI class cannot be null");//判断是不是interface类型AssertUtil.isTrue(service.isInterface() || Modifier.isAbstract(service.getModifiers()), "SPI class[" + service.getName() + "] must be interface or abstract class");//获取接口的全路径名,判断缓存里是否已经存在String className = service.getName();SpiLoader<T> spiLoader = SPI_LOADER_MAP.get(className);//缓存里没有,则使用Double Check + 锁机制去初始化SpiLoader,然后将初始化好的SpiLoader实例放到缓存if (spiLoader == null) {synchronized (SpiLoader.class) {spiLoader = SPI_LOADER_MAP.get(className);if (spiLoader == null) {//new SpiLoader<>(service)初始化SpiLoader实例SPI_LOADER_MAP.putIfAbsent(className, new SpiLoader<>(service));spiLoader = SPI_LOADER_MAP.get(className);}}}//返回SpiLoader实例return spiLoader;}//Load the first-found Provider instance,if not found, return default Provider instancepublic S loadFirstInstanceOrDefault() {//SPI机制加载Class,然后将加载的Class放到classList数组里load();//循环遍历,根据classList里的Class来初始化对应的实例for (Class<? extends S> clazz : classList) {if (defaultClass == null || clazz != defaultClass) {return createInstance(clazz);}}//初始化默认的DefaultSlotChainBuilderreturn loadDefaultInstance();}//Load all Provider instances of the specified Service, sorted by order value in class's {@link Spi} annotationpublic List<S> loadInstanceListSorted() {//比如读取com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件里的Class名字//然后根据这些Class名字加载Class//接着将Class放到sortedClassList集合中load();//实例化sortedClassList集合里的每个Classreturn createInstanceList(sortedClassList);}//Load the Provider class from Provider configuration filepublic void load() {...//IO流读取文件内容while (urls.hasMoreElements()) {...clazz = (Class<S>) Class.forName(line, false, classLoader);//加入到集合中classList.add(clazz);...}...//生成新的按照order排序的集合sortedClassList.addAll(classList);//进行排序Collections.sort(sortedClassList, new Comparator<Class<? extends S>>() {@Overridepublic int compare(Class<? extends S> o1, Class<? extends S> o2) {//获取Spi注解Spi spi1 = o1.getAnnotation(Spi.class);//获取Spi注解的order属性int order1 = spi1 == null ? 0 : spi1.order();Spi spi2 = o2.getAnnotation(Spi.class);int order2 = spi2 == null ? 0 : spi2.order();return Integer.compare(order1, order2);}});}...
}

(5)Sentinel初始化处理链之实例化Class

执行完SpiLoader的load()方法加载好指定的实现类的Class后,就会调用SpiLoader的createInstance()方法或createInstanceList()方法实例化Class。

SpiLoader放入createInstance(clazz)方法内部会判断是否是单例,如果是单例则放到缓存当中,如果不是单例则每次都通过clazz.newInstance()方法创建一个新的对象。

public final class SpiLoader<S> {//Cache the singleton instance of Provider, key: classname of Provider, value: Provider instanceprivate final ConcurrentHashMap<String, S> singletonMap = new ConcurrentHashMap<>();...//Create Provider instance listprivate List<S> createInstanceList(List<Class<? extends S>> clazzList) {if (clazzList == null || clazzList.size() == 0) {return Collections.emptyList();}List<S> instances = new ArrayList<>(clazzList.size());for (Class<? extends S> clazz : clazzList) {S instance = createInstance(clazz);instances.add(instance);}return instances;}//Create Provider instanceprivate S createInstance(Class<? extends S> clazz) {Spi spi = clazz.getAnnotation(Spi.class);boolean singleton = true;if (spi != null) {singleton = spi.isSingleton();}return createInstance(clazz, singleton);}//Create Provider instanceprivate S createInstance(Class<? extends S> clazz, boolean singleton) {S instance = null;try {if (singleton) {instance = singletonMap.get(clazz.getName());if (instance == null) {synchronized (this) {instance = singletonMap.get(clazz.getName());if (instance == null) {instance = service.cast(clazz.newInstance());singletonMap.put(clazz.getName(), instance);}}}} else {instance = service.cast(clazz.newInstance());}} catch (Throwable e) {fail(clazz.getName() + " could not be instantiated");}return instance;}...
}

(6)总结

一.初始化处理链时需要加锁和使用缓存

初始化一个Entry对象时需要根据一个处理链对象进行初始化,并发请求同一接口时的多个同样的Entry对象会共用一个处理链对象,所以初始化处理链的过程中需要加锁来保证线程安全性,初始化完处理链后需要将其放到全局缓存里来提高性能。

二.初始化处理链时分两步使用SPI

首先初始化Builder,负责管控处理链整体即编排处理链节点。Sentinel要求Builder只能存在一个,而且是外部系统优先原则。可以在com.alibaba.csp.sentinel.slotchain.SlotChainBuilder文件中,指定替代默认的DefaultSlotChainBuilder的自定义接口实现类。

然后通过Builder初始化完整的处理链,这里也是通过SPI机制进行初始化。因此可以进行扩展,比如外部系统自定义ProcessorSlot添加到处理链。注意ProcessorSlot是有顺序的,如果顺序没指定正确,则可能造成异常。ProcessorSlot的顺序可以通过@Spi注解的order属性设置。为了防止出现bug,建议直接将自定义的ProcessorSlot放到最后。

三.Sentinel通过配置的方式来替代传入参数

SpiLoader的of()方法会返回一个指定加载某种实现类的SpiLoader实例,这样就可以使用类似SpiLoader.of().load()的方式通过SPI进行加载。

四.Sentinel没有采用JDK内置的ServiceLoader来实现SPI机制

Sentinel单独写一套SPI的实现逻辑,核心原因是需要支持个性化的配置。比如@Spi注解支持order排序属性以及isSingleton是否单例属性。如果是单例,则每个类全局只能实例化一次(通过Map缓存实现)。如果不是单例,则每次都new一个新的对象。

五.Sentinel的这套SPI机制可以当作工具类拷贝到业务系统中

因为没有其他外部依赖,它被单独放到一个包里。

5.Sentinel默认处理链ProcessorSlot的构建

(1)Sentinel默认处理链的节点

(2)Sentinel默认处理链的构建

(1)Sentinel默认处理链的节点

Sentinel中的ProcessorSlot是流量控制处理过程中的处理器,每个ProcessorSlot处理器子类负责处理特定的任务。Sentinel默认的处理链会基于SPI配置在sentinel-core模块的如下文件中:

# Sentinel-1.8.6/sentinel-core/src/main/resources/META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

一共8个Slot,每个Slot都是一个过滤器,各自承担不同的职责。这些Slot整体可以划分为两类:指标数据采集的Slot和规则验证的Slot。其中规则验证的Slot包括:授权验证、流控验证、熔断降级验证。

每个Slot都有自己在整个处理链(处理器插槽链条)中的顺序,具体的顺序(也就是实例化顺序)并不是根据SPI文件的编写顺序来确定的,而是基于修饰每个Slot的@Spi注解来确定的。

@Spi注解有一个order属性用于设置顺序。SpiLoader的load()方法在读取SPI文件时,会按order属性对Slot进行排序,并将排好序的ProcessorSlot实现类放入sortedClassList中。这样后续就可以遍历sortedClassList,按照顺序实例化这些Slot的实现类。

一.NodeSelectorSlot

负责创建和维护资源调用树(资源调用关系),同时为资源访问对象Entry关联对应的统计节点DefaultNode。这样不仅可实现对资源调用链路的监控,还能统计每个资源的调用信息。

二.ClusterBuilderSlot

负责根据资源的统计信息,计算集群维度的统计数据。集群维度统计数据是从资源维度的统计数据中整合得到的,这些统计数据用于实现资源在集群中的流量控制、熔断降级等功能。

三.LogSlot

负责记录请求异常时的日志,可用于故障排查。

四.StatisticsSlot

负责统计资源的调用数据,如成功调用次数、异常次数、响应时间等。这些数据可用于分析资源的性能,或驱动其他Slot(如限流降级Slot)运行。

五.AuthoritySlot

负责进行授权控制,根据资源的授权规则来判断是否允许请求进行访问。如果请求不被允许访问,AuthoritySlot将抛出AuthorityException异常。

六.SystemSlot

负责进行系统保护,根据系统保护规则(如CPU使用率、负载等)判断请求是否需要被限制。如果需要被限制,SystemSlot将抛出SystemException异常。

七.FlowSlot

负责进行流量控制,根据资源的流量控制规则(如QPS限制等)来判断请求是否需要被限流。如果需要被限流,FlowSlot将抛出一个FlowException异常。

八.DegradeSlot

负责进行熔断降级,根据资源的熔断降级规则(如异常比例等)来判断请求是否需要被降级。如果需要被降级,DegradeSlot将抛出一个DegradeException异常。

(2)Sentinel默认处理链的构建

处理链(处理器插槽链条)ProcessorSlotChain其实就是一条责任链,由于责任链是典型的单向链表结构,所以每个Slot必须要有一个next属性,用于指向下一个节点。

为了实现单向链表结构:可以利用已排好序的Slot列表以及每个Slot都有的next引用的特性。只需遍历已排序的Slot列表,让每个Slot的next引用指向下一个Slot即可。这是一个非常简单的单向链表操作,可将这个过程封装到DefaultProcessorSlotChain类中的addLast()方法中。

//Builder for a default {@link ProcessorSlotChain}.
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {@Overridepublic ProcessorSlotChain build() {//创建一个DefaultProcessorSlotChain对象实例ProcessorSlotChain chain = new DefaultProcessorSlotChain();//通过SPI机制加载责任链的节点ProcessorSlot实现类//然后按照@Spi注解的order属性进行排序并进行实例化//最后将ProcessorSlot实例放到sortedSlotList中List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();//遍历已排好序的ProcessorSlot列表for (ProcessorSlot slot : sortedSlotList) {//安全检查,防止业务系统也写了一个SPI文件,但没按规定继承AbstractLinkedProcessorSlotif (!(slot instanceof AbstractLinkedProcessorSlot)) {RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");continue;}//调用DefaultProcessorSlotChain.addLast()方法构建单向链表//将责任链的节点ProcessorSlot实例放入DefaultProcessorSlotChain中chain.addLast((AbstractLinkedProcessorSlot<?>) slot);}//返回单向链表return chain;}
}public class DefaultProcessorSlotChain extends ProcessorSlotChain {AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {super.fireEntry(context, resourceWrapper, t, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {super.fireExit(context, resourceWrapper, count, args);}};AbstractLinkedProcessorSlot<?> end = first;@Overridepublic void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {protocolProcessor.setNext(first.getNext());first.setNext(protocolProcessor);if (end == first) {end = protocolProcessor;}}@Overridepublic void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {end.setNext(protocolProcessor);end = protocolProcessor;}@Overridepublic void setNext(AbstractLinkedProcessorSlot<?> next) {addLast(next);}@Overridepublic AbstractLinkedProcessorSlot<?> getNext() {return first.getNext();}@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {first.transformEntry(context, resourceWrapper, t, count, prioritized, args);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {first.exit(context, resourceWrapper, count, args);}
}public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {private AbstractLinkedProcessorSlot<?> next = null;@Overridepublic void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {if (next != null) {next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);}}@SuppressWarnings("unchecked")void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable {T t = (T)o;entry(context, resourceWrapper, t, count, prioritized, args);}@Overridepublic void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {if (next != null) {next.exit(context, resourceWrapper, count, args);}}public AbstractLinkedProcessorSlot<?> getNext() {return next;}public void setNext(AbstractLinkedProcessorSlot<?> next) {this.next = next;}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com