大纲
1.FlowSlot根据流控规则对请求进行限流
2.FlowSlot实现流控规则的快速失败效果的原理
3.FlowSlot实现流控规则中排队等待效果的原理
4.FlowSlot实现流控规则中Warm Up效果的原理
1.FlowSlot根据流控规则对请求进行限流
(1)流控规则FlowRule的配置Demo
(2)注册流控监听器和加载流控规则
(3)FlowSlot根据流控规则对请求进行限流
(1)流控规则FlowRule的配置Demo
从下图可知,流控规则包含以下属性:
一.规则id、资源名、针对来源
这三个属性是所有规则共有的属性,会分别封装到AbstractRule的id、resource、limitApp字段中,各个具体的规则子类都会继承AbstractRule类。
二.阈值类型
阈值类型包括QPS和并发线程数两个选项,对应FlowRule中的字段为grade。
三.单机阈值
单机阈值也就是限流阈值。无论是基于QPS还是并发线程数,都要设置限流阈值,对应FlowRule中的字段为count。
四.是否集群
是否集群是一个boolean类型的字段,对应FlowRule中的字段为clusterMode。true表示开启集群模式,false表示单机模式;
五.流控模式
流控模式有三种:直接模式、关联模式和链路模式,对应FlowRule中的字段为strategy。
六.关联资源
当流控模式选择为关联时,此值含义是关联资源,当流控模式选择为链路时,此值含义是入口资源。所以仅当流控模式选择关联和链路时,才对应FlowRule中的字段为refResource。
七.流控效果
流控效果有三种:快速失败、Warm Up、排队等待。还有一个选项未在页面上显示,即Warm Up和排队等待的结合体。也就是Warm Up + 排队等待,对应FlowRule中的字段为controlBehavior。
八.流控控制器
由于流控有多种模式,而每种模式都会对应一个模式流量整形控制器。所以流控规则FlowRule中会有一个字段TrafficShapingController,用来实现不同流控模式下的不同流控效果。
九.预热时长
此选项仅在流控效果选择Warm Up时出现,表示Warm Up的时长,对应FlowRule中的字段为warmUpPeriodSec。
十.超时时间
此选项仅在流控效果选择排队等待时出现,表示超出流控阈值后,排队等待多久才抛出异常,对应FlowRule中的字段为maxQueueingTimeMs。
public abstract class AbstractRule implements Rule {//rule id. 规则idprivate Long id;//Resource name. 资源名称private String resource;//针对来源,默认是default//多个来源使用逗号隔开,比如黑名单规则,限制userId是1和3的访问,那么就设置limitApp为"1,3"//Application name that will be limited by origin.//The default limitApp is {@code default}, which means allowing all origin apps.//For authority rules, multiple origin name can be separated with comma (',').private String limitApp;...
}//规则id、资源名(resource)、针对来源(limitApp),这三个字段在父类AbstractRule里
//Each flow rule is mainly composed of three factors: grade, strategy and controlBehavior:
//The grade represents the threshold type of flow control (by QPS or thread count).
//The strategy represents the strategy based on invocation relation.
//The controlBehavior represents the QPS shaping behavior (actions on incoming request when QPS exceeds the threshold.
public class FlowRule extends AbstractRule {public FlowRule() {super();setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);}public FlowRule(String resourceName) {super();setResource(resourceName);setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);}//The threshold type of flow control (0: thread count, 1: QPS).//阈值类型:1代表QPS;0代表并发线程数private int grade = RuleConstant.FLOW_GRADE_QPS;//Flow control threshold count.//单机阈值:也就是限流数private double count;//Flow control strategy based on invocation chain.//流控模式:0代表直接;1代表关联;2代表链路//RuleConstant#STRATEGY_DIRECT for direct flow control (by origin);//RuleConstant#STRATEGY_RELATE for relevant flow control (with relevant resource);//RuleConstant#STRATEGY_CHAIN for chain flow control (by entrance resource).private int strategy = RuleConstant.STRATEGY_DIRECT;//关联资源,当流控模式选择为关联时,此值含义是关联资源,当流控模式选择为链路时,此值含义是入口资源//Reference resource in flow control with relevant resource or context.private String refResource;//Rate limiter control behavior.//0. default(reject directly), 1. warm up, 2. rate limiter, 3. warm up + rate limiter//流控效果:0代表快速失败, 1代表Warm Up, 2代表排队等待, 3代表Warm Up + 排队等待private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;//预热时长:只有当流控效果选择为Warm Up时才会出现private int warmUpPeriodSec = 10;//Max queueing time in rate limiter behavior.//超时时间:只有当流控效果选择排队等待时才会出现private int maxQueueingTimeMs = 500;//是否集群:默认false表示单机private boolean clusterMode;//Flow rule config for cluster mode.//集群配置private ClusterFlowConfig clusterConfig;//The traffic shaping (throttling) controller.//流量整形控制器:实现[流控效果]的四种不同模式private TrafficShapingController controller;...
}public class FlowQpsDemo {private static final String KEY = "abc";private static AtomicInteger pass = new AtomicInteger();private static AtomicInteger block = new AtomicInteger();private static AtomicInteger total = new AtomicInteger();private static volatile boolean stop = false;private static final int threadCount = 32;private static int seconds = 60 + 40;public static void main(String[] args) throws Exception {//初始化QPS的流控规则initFlowQpsRule();//启动线程定时输出信息tick();//first make the system run on a very low condition//模拟QPS为32时的访问场景simulateTraffic();System.out.println("===== begin to do flow control");System.out.println("only 20 requests per second can pass");}private static void initFlowQpsRule() {List<FlowRule> rules = new ArrayList<FlowRule>();FlowRule rule1 = new FlowRule();rule1.setResource(KEY);//设置QPS的限制为20rule1.setCount(20);rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);rule1.setLimitApp("default");rules.add(rule1);//加载流控规则FlowRuleManager.loadRules(rules);}private static void simulateTraffic() {for (int i = 0; i < threadCount; i++) {Thread t = new Thread(new RunTask());t.setName("simulate-traffic-Task");t.start();}}private static void tick() {Thread timer = new Thread(new TimerTask());timer.setName("sentinel-timer-task");timer.start();}static class TimerTask implements Runnable {@Overridepublic void run() {long start = System.currentTimeMillis();System.out.println("begin to statistic!!!");long oldTotal = 0;long oldPass = 0;long oldBlock = 0;while (!stop) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}long globalTotal = total.get();long oneSecondTotal = globalTotal - oldTotal;oldTotal = globalTotal;long globalPass = pass.get();long oneSecondPass = globalPass - oldPass;oldPass = globalPass;long globalBlock = block.get();long oneSecondBlock = globalBlock - oldBlock;oldBlock = globalBlock;System.out.println(seconds + " send qps is: " + oneSecondTotal);System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock);if (seconds-- <= 0) {stop = true;}}long cost = System.currentTimeMillis() - start;System.out.println("time cost: " + cost + " ms");System.out.println("total:" + total.get() + ", pass:" + pass.get() + ", block:" + block.get());System.exit(0);}}static class RunTask implements Runnable {@Overridepublic void run() {while (!stop) {Entry entry = null;try {//调用entry()方法开始规则验证entry = SphU.entry(KEY);//token acquired, means passpass.addAndGet(1);} catch (BlockException e1) {block.incrementAndGet();} catch (Exception e2) {//biz exception} finally {total.incrementAndGet();if (entry != null) {//完成规则验证调用exit()方法entry.exit();}}Random random2 = new Random();try {TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));} catch (InterruptedException e) {//ignore}}}}
}
(2)注册流控监听器和加载流控规则
一.Sentinel监听器模式的处理流程
Sentinel监听器模式会包含三大角色:
角色一:监听器PropertyListener
角色二:监听器管理器SentinelProperty
角色三:规则管理器RuleManager
首先,规则管理器RuleManager在初始化时,会调用监听器管理器SentinelProperty的addListener()方法将监听器PropertyListener注册到监听器管理器SentinelProperty中。
然后,使用方使用具体的规则时,可以通过调用规则管理器RuleManager的loadRules()方法加载规则。加载规则时会调用监听器管理器SentinelProperty的updateValue()方法通知每一个监听器PropertyListener,也就是通过监听器PropertyListener的configUpdate()方法把规则加载到规则管理器RuleManager的本地中。
二.注册流控监听器和加载流控规则
需要注意的是:加载流控规则时会调用FlowRuleUtil的generateRater()方法,根据不同的流控效果选择不同的流量整形控制器TrafficShapingController。
//One resources can have multiple rules.
//And these rules take effects in the following order:
//requests from specified caller
//no specified caller
public class FlowRuleManager {//维护每个资源的流控规则列表,key是资源名称,value是资源对应的规则private static volatile Map<String, List<FlowRule>> flowRules = new HashMap<>();//饿汉式单例模式实例化流控规则的监听器对象private static final FlowPropertyListener LISTENER = new FlowPropertyListener();//监听器对象的管理器private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();...static {//将流控规则监听器注册到监听器管理器中currentProperty.addListener(LISTENER);startMetricTimerListener();}//Load FlowRules, former rules will be replaced.//加载流控规则public static void loadRules(List<FlowRule> rules) {//通知监听器管理器中的每一个监听器,规则已发生变化,需要重新加载规则配置//其实就是更新FlowRuleManager规则管理器中的流控规则列表flowRulescurrentProperty.updateValue(rules);}private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {@Overridepublic synchronized void configUpdate(List<FlowRule> value) {Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);if (rules != null) {flowRules = rules;}RecordLog.info("[FlowRuleManager] Flow rules received: {}", rules);}@Overridepublic synchronized void configLoad(List<FlowRule> conf) {Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);if (rules != null) {flowRules = rules;}RecordLog.info("[FlowRuleManager] Flow rules loaded: {}", rules);}}...
}public final class FlowRuleUtil {private static final Function<FlowRule, String> extractResource = new Function<FlowRule, String>() {@Overridepublic String apply(FlowRule rule) {return rule.getResource();}};...//Build the flow rule map from raw list of flow rules, grouping by resource name.public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list) {return buildFlowRuleMap(list, null);}//Build the flow rule map from raw list of flow rules, grouping by resource name.public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Predicate<FlowRule> filter) {return buildFlowRuleMap(list, filter, true);}//Build the flow rule map from raw list of flow rules, grouping by resource name.public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Predicate<FlowRule> filter, boolean shouldSort) {return buildFlowRuleMap(list, extractResource, filter, shouldSort);}//Build the flow rule map from raw list of flow rules, grouping by provided group function.public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction, Predicate<FlowRule> filter, boolean shouldSort) {Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();if (list == null || list.isEmpty()) {return newRuleMap;}Map<K, Set<FlowRule>> tmpMap = new ConcurrentHashMap<>();for (FlowRule rule : list) {...//获取[流控效果]流量整形控制器TrafficShapingControllerTrafficShapingController rater = generateRater(rule);rule.setRater(rater);//获取资源名K key = groupFunction.apply(rule);if (key == null) {continue;}//获取资源名对应的流控规则列表Set<FlowRule> flowRules = tmpMap.get(key);//将规则放到Map里,和当前资源绑定if (flowRules == null) {//Use hash set here to remove duplicate rules.flowRules = new HashSet<>();tmpMap.put(key, flowRules);}flowRules.add(rule);}Comparator<FlowRule> comparator = new FlowRuleComparator();for (Entry<K, Set<FlowRule>> entries : tmpMap.entrySet()) {List<FlowRule> rules = new ArrayList<>(entries.getValue());if (shouldSort) {//Sort the rules.Collections.sort(rules, comparator);}newRuleMap.put(entries.getKey(), rules);}return newRuleMap;}private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {//判断只有当阈值类型为QPS时才生效if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {//根据流控效果选择不同的流量整形控制器TrafficShapingControllerswitch (rule.getControlBehavior()) {case RuleConstant.CONTROL_BEHAVIOR_WARM_UP://Warm Up预热模式——冷启动模式return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor);case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER://排队等待模式return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER://Warm Up + 排队等待模式return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);case RuleConstant.CONTROL_BEHAVIOR_DEFAULT://快速失败模式——Default默认模式default://Default mode or unknown mode: default traffic shaping controller (fast-reject).}}//默认模式:快速失败用的是DefaultControllerreturn new DefaultController(rule.getCount(), rule.getGrade());}...
}
(3)FlowSlot根据流控规则对请求进行限流
FlowSlot中处理限流的核心方法其实是FlowRuleChecker.checkFlow()。该方法首先会从FlowRuleManager.flowRules中获取资源对应的流控规则列表,然后再遍历规则列表并调用canPassCheck()方法验证当前请求是否命中规则。如果命中规则,则抛出异常。如果没命中规则,则允许请求通过。
FlowRuleChecker的canPassCheck()方法会判断规则是否是集群模式。如果流控规则是集群模式,则调用passClusterCheck()方法。如果流控规则不是集群模式,则调用passLocalCheck()方法。
在FlowRuleChecker的passLocalCheck()方法中,首先会根据流控规则选择合适的Node作为限流计算的依据,然后再通过TrafficShapingController的canPass()方法判断是否放行请求。
其中选择合适的Node作为限流计算的依据时,会调用selectNodeByRequesterAndStrategy()方法根据流控规则的针对来源、流控模式和当前请求的来源,从下面三个节点中选择一个作为合适的Node:
节点一:上下文中的来源节点OriginNode
节点二:当前请求的集群节点ClusterNode
节点三:默认节点DefaultNode
注意:limitApp是流控规则里的字段,代表此规则生效时的针对来源。而origin是Context里的字段,代表当前请求的来源是什么。假设limitApp配置的是shop,某个请求的origin也是shop,则规则生效。如果limitApp采取默认值default,则代表全部origin都生效。
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private final FlowRuleChecker checker;private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {@Overridepublic Collection<FlowRule> apply(String resource) {//Flow rule map should not be null.Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();return flowRules.get(resource);}};public FlowSlot() {this(new FlowRuleChecker());}FlowSlot(FlowRuleChecker checker) {AssertUtil.notNull(checker, "flow checker should not be null");this.checker = checker;}@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {//检查流控规则,count默认是1checkFlow(resourceWrapper, context, node, count, prioritized);fireEntry(context, resourceWrapper, node, count, prioritized, args);}void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {//调用规则检查器FlowRuleChecker的checkFlow()方法进行检查,count默认是1checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}
}//Rule checker for flow control rules.
public class FlowRuleChecker {public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {if (ruleProvider == null || resource == null) {return;}//从Map中获取resource资源对应的流控规则列表Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {//循环遍历每一个流控规则for (FlowRule rule : rules) {//调用canPassCheck方法进行流控规则验证,判断此次请求是否命中针对resource资源配置的流控规则//传入的参数count默认是1if (!canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}}public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount) {return canPassCheck(rule, context, node, acquireCount, false);}public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {//参数校验String limitApp = rule.getLimitApp();if (limitApp == null) {return true;}//如果是集群模式,则执行passClusterCheck()方法if (rule.isClusterMode()) {return passClusterCheck(rule, context, node, acquireCount, prioritized);}//如果是单机模式,则执行passLocalCheck()方法,acquireCount默认是1return passLocalCheck(rule, context, node, acquireCount, prioritized);}private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {//选择Node作为限流计算的依据Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);if (selectedNode == null) {return true;}//先通过FlowRule.getRater()方法获取流控规则对应的流量整形控制器//然后调用TrafficShapingController.canPass()方法对请求进行检查,acquireCount默认是1return rule.getRater().canPass(selectedNode, acquireCount, prioritized);}//选择Node作为限流计算的依据static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {//The limit app should not be empty.//获取流控规则中配置的"针对来源",默认是defaultString limitApp = rule.getLimitApp();//获取流控规则中配置的"流控模式",0代表直接,1代表关联,2代表链路int strategy = rule.getStrategy();//从context对象中获取当前请求的来源String origin = context.getOrigin();//情形一:当流控规则的针对来源(limitApp)与当前请求的来源(origin)相同时//这种情况表示该限流规则针对特定来源进行限流//如果配置了针对app1进行限流,那么app2就不会生效,这就是针对特定来源进行限流if (limitApp.equals(origin) && filterOrigin(origin)) {//如果流控规则中配置的"流控模式"是直接(RuleConstant.STRATEGY_DIRECT),则返回上下文中的Origin Node//因为这种模式要求根据调用方的情况进行限流,而Origin Node包含了调用方的统计信息,所以选择Origin Node作为限流计算的依据if (strategy == RuleConstant.STRATEGY_DIRECT) {//Matches limit origin, return origin statistic node.return context.getOriginNode();}//如果流控规则中配置的"流控模式"是关联、链路(RuleConstant.STRATEGY_RELATE或RuleConstant.STRATEGY_CHAIN),则调用selectReferenceNode()方法//此方法会判断://如果"流控模式"是关联(RuleConstant.STRATEGY_RELATE),则返回关联资源的ClusterNode//如果"流控模式"是链路(RuleConstant.STRATEGY_CHAIN),则返回DefaultNodereturn selectReferenceNode(rule, context, node);}//情况二:当流控规则的针对来源(limitApp)是默认值(RuleConstant.LIMIT_APP_DEFAULT)时//这种情况表示该流控规则对所有来源都生效else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {//如果流控规则中配置的"流控模式"是直接(RuleConstant.STRATEGY_DIRECT),则返回当前请求的集群节点ClusterNode//因为此时要求的是根据被调用资源的情况进行限流,而集群节点包含了被调用资源的统计信息,所以选择集群节点作为限流计算的依据if (strategy == RuleConstant.STRATEGY_DIRECT) {//Return the cluster node.return node.getClusterNode();}//如果流控规则中配置的"流控模式"是关联、链路(RuleConstant.STRATEGY_RELATE或RuleConstant.STRATEGY_CHAIN),则调用selectReferenceNode()方法//此方法会判断://如果"流控模式"是关联(RuleConstant.STRATEGY_RELATE),则返回关联资源的ClusterNode//如果"流控模式"是链路(RuleConstant.STRATEGY_CHAIN),则返回DefaultNodereturn selectReferenceNode(rule, context, node);}//情况三:当流控规则的针对来源(limitApp)是其他(RuleConstant.LIMIT_APP_OTHER),且当前请求的来源(origin)与流控规则的资源名(rule.getResource())不同时//这种情况表示该流控规则针对除默认来源以外的其他来源进行限流,可实现个性化限流//比如可以对app1进行个性化限流,对其他所有app进行整体限流else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {//如果流控规则中配置的"流控模式"是直接(RuleConstant.STRATEGY_DIRECT),则返回上下文中的来源节点(Origin Node)//因为这种"流控模式"要求根据调用方的情况进行限流,而来源节点包含了调用方的统计信息,所以选择来源节点作为限流计算的依据if (strategy == RuleConstant.STRATEGY_DIRECT) {return context.getOriginNode();}//如果流控规则中配置的"流控模式"是关联、链路(RuleConstant.STRATEGY_RELATE或RuleConstant.STRATEGY_CHAIN),则调用selectReferenceNode()方法//此方法会判断://如果"流控模式"是关联(RuleConstant.STRATEGY_RELATE),则返回关联资源的ClusterNode//如果"流控模式"是链路(RuleConstant.STRATEGY_CHAIN),则返回DefaultNodereturn selectReferenceNode(rule, context, node);}return null;}//如果流控规则中配置的"流控模式"是关联(RuleConstant.STRATEGY_RELATE),则返回关联资源的ClusterNode//如果流控规则中配置的"流控模式"是链路(RuleConstant.STRATEGY_CHAIN),则返回DefaultNodestatic Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {String refResource = rule.getRefResource();int strategy = rule.getStrategy();if (StringUtil.isEmpty(refResource)) {return null;}if (strategy == RuleConstant.STRATEGY_RELATE) {//关联资源的ClusterNodereturn ClusterBuilderSlot.getClusterNode(refResource);}if (strategy == RuleConstant.STRATEGY_CHAIN) {if (!refResource.equals(context.getName())) {return null;}return node;}//No node.return null;}private static boolean filterOrigin(String origin) {// Origin cannot be `default` or `other`.return !RuleConstant.LIMIT_APP_DEFAULT.equals(origin) && !RuleConstant.LIMIT_APP_OTHER.equals(origin);}...
}
2.FlowSlot实现流控规则的快速失败效果的原理
(1)流控效果为快速失败时对应的流量整形控制器
(2)流量整形控制器DefaultController执行分析
(3)流控模式中的关联模式和链路模式说明
(1)流控效果为快速失败时对应的流量整形控制器
流控效果有:快速失败、Warm Up、排队等待。
调用FlowRuleManager的loadRules()方法加载流控规则时,会触发执行FlowPropertyListener的configUpdate()方法,该方法又会调用FlowRuleUtil的generateRater()方法。在FlowRuleUtil的generateRater()方法中,便会根据不同的流控效果选择不同的流量整形控制器。
其中当流控规则的流控效果为快速失败时,对应的流量整形控制器为TrafficShapingController的子类DefaultController。
public class FlowRuleManager {//维护每个资源的流控规则列表,key是资源名称,value是资源对应的规则private static volatile Map<String, List<FlowRule>> flowRules = new HashMap<>();//饿汉式单例模式实例化流控规则的监听器对象private static final FlowPropertyListener LISTENER = new FlowPropertyListener();//监听器对象的管理器private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();...static {//将流控规则监听器注册到监听器管理器中currentProperty.addListener(LISTENER);startMetricTimerListener();}//Load FlowRules, former rules will be replaced.//加载流控规则public static void loadRules(List<FlowRule> rules) {//通知监听器管理器中的每一个监听器,规则已发生变化,需要重新加载规则配置//其实就是更新FlowRuleManager规则管理器中的流控规则列表flowRulescurrentProperty.updateValue(rules);}private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {@Overridepublic synchronized void configUpdate(List<FlowRule> value) {Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);if (rules != null) {flowRules = rules;}RecordLog.info("[FlowRuleManager] Flow rules received: {}", rules);}@Overridepublic synchronized void configLoad(List<FlowRule> conf) {Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);if (rules != null) {flowRules = rules;}RecordLog.info("[FlowRuleManager] Flow rules loaded: {}", rules);}}...
}public final class FlowRuleUtil {private static final Function<FlowRule, String> extractResource = new Function<FlowRule, String>() {@Overridepublic String apply(FlowRule rule) {return rule.getResource();}};...//Build the flow rule map from raw list of flow rules, grouping by provided group function.public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction, Predicate<FlowRule> filter, boolean shouldSort) {Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();if (list == null || list.isEmpty()) {return newRuleMap;}Map<K, Set<FlowRule>> tmpMap = new ConcurrentHashMap<>();for (FlowRule rule : list) {...//获取[流控效果]流量整形控制器TrafficShapingControllerTrafficShapingController rater = generateRater(rule);rule.setRater(rater);//获取资源名K key = groupFunction.apply(rule);if (key == null) {continue;}//获取资源名对应的流控规则列表Set<FlowRule> flowRules = tmpMap.get(key);//将规则放到Map里,和当前资源绑定if (flowRules == null) {// Use hash set here to remove duplicate rules.flowRules = new HashSet<>();tmpMap.put(key, flowRules);}flowRules.add(rule);}Comparator<FlowRule> comparator = new FlowRuleComparator();for (Entry<K, Set<FlowRule>> entries : tmpMap.entrySet()) {List<FlowRule> rules = new ArrayList<>(entries.getValue());if (shouldSort) {//Sort the rules.Collections.sort(rules, comparator);}newRuleMap.put(entries.getKey(), rules);}return newRuleMap;}private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {//判断只有当阈值类型为QPS时才生效if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {//根据流控效果选择不同的流量整形控制器TrafficShapingControllerswitch (rule.getControlBehavior()) {case RuleConstant.CONTROL_BEHAVIOR_WARM_UP://Warm Up预热模式——冷启动模式return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor);case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER://排队等待模式return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER://Warm Up + 排队等待模式return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);case RuleConstant.CONTROL_BEHAVIOR_DEFAULT://快速失败模式——Default默认模式default:// Default mode or unknown mode: default traffic shaping controller (fast-reject).}}//默认模式:快速失败用的是DefaultControllerreturn new DefaultController(rule.getCount(), rule.getGrade());}...
}
当FlowSlot的entry()方法对请求进行流控规则验证时,会调用规则检查器FlowRuleChecker的checkFlow()方法进行检查,最终会通过FlowRule的getRater()方法获取流控规则对应的流量整形控制器,然后调用TrafficShapingController的canPass()方法对请求进行检查。当流控效果为快速失败时,具体调用的其实就是DefaultController的canPass()方法。
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {private final FlowRuleChecker checker;private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {@Overridepublic Collection<FlowRule> apply(String resource) {//Flow rule map should not be null.Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();return flowRules.get(resource);}};...@Overridepublic void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {//检查流控规则 checkFlow(resourceWrapper, context, node, count, prioritized);fireEntry(context, resourceWrapper, node, count, prioritized, args);}void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {//调用规则检查器FlowRuleChecker的checkFlow()方法进行检查checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);}@Overridepublic void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {fireExit(context, resourceWrapper, count, args);}
}//Rule checker for flow control rules.
public class FlowRuleChecker {public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {if (ruleProvider == null || resource == null) {return;}//从Map中获取resource资源对应的流控规则列表Collection<FlowRule> rules = ruleProvider.apply(resource.getName());if (rules != null) {//循环遍历每一个流控规则for (FlowRule rule : rules) {//调用canPassCheck方法进行流控规则验证,判断此次请求是否命中针对resource资源配置的流控规则if (!canPassCheck(rule, context, node, count, prioritized)) {throw new FlowException(rule.getLimitApp(), rule);}}}}public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount) {return canPassCheck(rule, context, node, acquireCount, false);}public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {//参数校验String limitApp = rule.getLimitApp();if (limitApp == null) {return true;}//如果是集群模式,则执行passClusterCheck()方法if (rule.isClusterMode()) {return passClusterCheck(rule, context, node, acquireCount, prioritized);}//如果是单机模式,则执行passLocalCheck()方法return passLocalCheck(rule, context, node, acquireCount, prioritized);}private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {//选择Node作为限流计算的依据Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);if (selectedNode == null) {return true;}//先通过FlowRule.getRater()方法获取流控规则对应的流量整形控制器//然后调用TrafficShapingController.canPass()方法对请求进行检查return rule.getRater().canPass(selectedNode, acquireCount, prioritized);}...
}//Default throttling controller (immediately reject strategy).
public class DefaultController implements TrafficShapingController {private static final int DEFAULT_AVG_USED_TOKENS = 0;private double count;private int grade;public DefaultController(double count, int grade) {this.count = count;this.grade = grade;}@Overridepublic boolean canPass(Node node, int acquireCount) {return canPass(node, acquireCount, false);}@Overridepublic boolean canPass(Node node, int acquireCount, boolean prioritized) {//获取当前请求数int curCount = avgUsedTokens(node);//如果当前请求数 + 1超出阈值,那么返回失败if (curCount + acquireCount > count) {//进行优先级逻辑处理if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {long currentTime;long waitInMs;currentTime = TimeUtil.currentTimeMillis();waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {node.addWaitingRequest(currentTime + waitInMs, acquireCount);node.addOccupiedPass(acquireCount);sleep(waitInMs);//PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.throw new PriorityWaitException(waitInMs);}}return false;}//如果当前请求数+1没有超出阈值,则返回成功return true;}private int avgUsedTokens(Node node) {if (node == null) {return DEFAULT_AVG_USED_TOKENS;}return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());}private void sleep(long timeMillis) {try {Thread.sleep(timeMillis);} catch (InterruptedException e) {//Ignore.}}
}
(2)流量整形控制器DefaultController执行分析
流控效果为快速失败时,会调用DefaultController的canPass()方法。
问题一:其中的入参Node是如何选择的
Node的选择主要依赖于三个参数:FlowRule的limitApp、FlowRule的strategy、Context的origin,这三个参数分别表示流控规则的针对来源、流控模式和当前请求的来源。
情形一:如果limitApp和origin相等并且limitApp不是默认值default
此时的流控规则便是针对特定调用方的
如果流控模式为直接,那么选择的Node是OriginNode
如果流控模式为关联,那么选择的Node是关联资源的ClusterNode
如果流控模式为链路,那么选择的Node是DefaultNode
情形二:limitApp值为默认的default
如果流控模式为直接,那么选择的Node是当前资源的ClusterNode
如果流控模式为关联,那么选择的Node是关联资源的ClusterNode
如果流控模式为链路,那么选择的Node是DefaultNode
情形三:limitApp值为other且origin与FlowRule.getResource()不同
如果流控模式为直接,那么选择的Node是OriginNode
如果流控模式为关联,那么选择的Node是关联资源的ClusterNode
如果流控模式为链路,那么选择的Node是DefaultNode
问题二:如何判断阈值类型是QPS还是线程
FlowRule规则类里有个名为grade的字段,代表着阈值类型。所以在初始化DefaultController时就会传入流控规则FlowRule的grade值,这样在DefaultController的avgUsedTokens()方法中,就可以根据grade字段的值判断出阈值类型是QPS还是线程数了。
问题三:其中的入参prioritized的作用是什么
DefaultController的canPass()的入参prioritized表示是否对请求设置优先级。当prioritized为true时,表示该请求具有较高优先级的请求。当prioritized为false时,表示该请求是普通请求。而高优先级的请求需要尽量保证其可以通过限流检查。不过一般情况下,prioritized的值默认为false,除非手动指定为true,毕竟限流的目的是不想让超出的流量通过。
(3)流控模式中的关联模式和链路模式说明
一.关联模式
关联流控模式中,可以将两个资源进行关联。当某个资源的流量超限时,可以触发其他资源的流控规则。
比如用户下单购物时会涉及下单资源和支付资源,如果支付资源达到流控阈值,那么应该要同时禁止下单,也就是通过支付资源来关联到下单资源。
注意:如果采取关联模式,那么设置的QPS阈值是被关联者的,而非关联者的。在如下图示中配置了QPS的阈值为3,这是针对testPay资源设置的,而不是针对testOrder资源设置的。也就是testOrder被流控的时机就是当testPay的QPS达到3的时候,3并不是testOrder所访问的次数,而是testPay这个接口被访问的次数。
二.链路模式
一个资源可能会被多个业务链路调用,不同的业务链路需要进行不同的流控,这时就可以使用链路模式。
下图便为testTrace资源创建了一条链路模式的流控规则,规则为QPS限制到1 ,且链路入口资源为/trace/test2。
这意味着:/trace/test1链路可以随便访问testTrace资源,不受任何限制。/trace/test2链路访问testTrace资源时会限制QPS为1,超出限制被流控。