目标
通过源码加深对k8s scheduler的了解
源码阅读环境准备
源码在kubernetes
仓库中, 如何调试scheduler在我的另一篇文章《kubeadm搭建k8s源码阅读环境》里面有演示。在本篇文章中就不再赘述了。
调试命令如下
dlv --headless --listen=:8005 --api-version=2 --accept-multiclient exec /root/kubernetes/_output/bin/kube-scheduler -- --authentication-kubeconfig=/etc/kubernetes/scheduler.conf --authorization-kubeconfig=/etc/kubernetes/scheduler.conf --bind-address=127.0.0.1 --kubeconfig=/etc/kubernetes/scheduler.conf --leader-elect=false
源码阅读
代码入口是cmd/kube-scheduler/scheduler.go
与其他组件一样的套路,启动informer,然后开启控制循环。
# 启动informer
startInformersAndWaitForSync(ctx)# scheduler控制循环入口
sched.Run(ctx)
informer
工厂的创建逻辑在前面的config
创建处,属于模版代码。
最终startInformersAndWaitForSync
里面会有以下两个factory
,默认情况下,DynInformerFactory什么也不会做。
# 监听所有namespace ,其中状态 !=Succeeded && != Failed 的pod的变化
cc.InformerFactory.Start(ctx.Done())# 默认informer为空,并不会启动什么
cc.DynInformerFactory.Start(ctx.Done())
informer
启动后,sched.Run
方法中就要开始控制循环处理Pod的调度了。
sched.SchedulingQueue.Run(logger)
中会运行两个goroutinue
来处理调度失败的pod
。我们先看sched.ScheduleOne
里面正常pod
的调度逻辑。
ScheduleOne
源码位置: pkg/scheduler/schedule_one.go : 65行
ScheduleOne是pod
调度的入口,其各个阶段的流程大致如下
下面是ScheduleOne函数的简化后代码
# 获取待调度的pod
podInfo, err := sched.NextPod(logger)# 获取调度策略
fwk, err := sched.frameworkForPod(pod)# 计算得到该pod分配的节点
scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)# 绑定node 和 pod
go func(){status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
}
scheduleResult的类型如下
type ScheduleResult struct {// 被选中的节点SuggestedHost string// 调度器评估了多少个节点EvaluatedNodes int// 适合运行该pod的节点数FeasibleNodes int// 不知道干嘛的nominatingInfo *framework.NominatingInfo
}
因此sched.schedulingCycle
就是我们要找的调度逻辑所在的函数。schedulingCycle
对所有node进行评估,然后选出最合适的节点后,再起一个goroutinue
执行sched.bindingCycle
将调度结果更新回api server
, 最终kubelet
监听到pod
的变化启动pod
。
schedulingCycle(主逻辑)
源码位置: pkg/scheduler/schedule_one.go : 138行
在schedulingCycle方法中,一开始就已经筛选出一个Node来运行pod. 但是在返回结果前,还做了一些额外检查,如下
# 运行调度算法,得到调度结果
scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)# 跟新分配的node信息到缓存的pod中
err = sched.assume(logger, assumedPod, scheduleResult.SuggestedHost)# reserve插件用于预占资源,防止资源冲突
# 默认只有一个插件,当pod有使用pvc时才会用上,没有pvc,默认是不做任何处理
# 默认插件 pkg/scheduler/framework/plugins/volumebinding
if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {...略}# premit插件用于决定是否允许调度
# 可用于任务限流、资源依赖检查,例如GPU资源等
# 默认没有配置premit插件
runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)# 返回调度结果
return scheduleResult, assumedPodInfo, nil
sched.SchedulePod
才是我们本篇文章要关注的主要调度逻辑。下面是摘取出来的关键代码
源文件位置是pkg/scheduler/schedule_one.go: 410
# 更新节点快照信息
sched.Cache.UpdateSnapshot(klog.FromContext(ctx), sched.nodeInfoSnapshot)# 筛选符合调度条件的节点
feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)# 节点评分
priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)# 根据评分对可用节点进行排序并挑选评分最高的那个节点
host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport)#
return ScheduleResult{SuggestedHost: host,EvaluatedNodes: len(feasibleNodes) + diagnosis.NodeToStatus.Len(),FeasibleNodes: len(feasibleNodes),}, err
看上面的代码的时候,可能会出现以下的疑问
更新快照
sched.Cache.UpdateSnapshot(klog.FromContext(ctx), sched.nodeInfoSnapshot)
可以看到,调度器中有两个东西,一个是sched.Cache
,一个是sched.nodeInfoSnapshot
。
Cache
是调度器的核心数据结构之一,用于存储集群中所有节点和 Pod 的最新状态。它提供了高效的读写操作,以支持调度器在调度过程中快速访问和更新节点和 Pod 的信息。
调度pod的本质是给pod找个好节点,节点信息来自Cache
. 如果没有nodeInfoSnapshot
,调度的时候就要直接访问Cache
获取节点的信息。但是节点在调度的过程中,Cache中的数据是有可能会变化的,例如新的节点加入或者现有的节点宕机了。
数据会变化,调度的时候,就需要考虑以及处理节点和 Pod 状态的变化了, 调度的逻辑就会变得很复杂。
通过在调度周期开始时创建 nodeInfoSnapshot
,可以确保在整个调度周期内使用一致的节点信息视图,避免数据不一致的问题。使调度器在调度周期内不需要处理节点和 Pod 状态的变化,调度器的代码更加简洁和易于维护。
如果当前调度的pod没有合适的节点,但是在调度时,新加入的节点能够满足当前pod,该pod会在后续再次调度的时候会分配到该新的节点。因为每次在调度pod的时候,都会先更新一次nodeInfoSnapshot
.
筛选节点
feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
筛选的步骤如下
# 从快照中获取所有node
allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()# 调用预过滤插件的 prefilter()函数 ,如果返回的状态不等于 Success 则终止该次调度
preRes, s, unscheduledPlugins := fwk.RunPreFilterPlugins(ctx, state, pod)
if !s.IsSuccess() { return ..... }# 默认是所有的node都参与后续的过滤
nodes := allNodes
# 如果预校验中有返回节点名称,则后续参与过滤的节点只考虑预校验返回的节点
if !preRes.AllNodes() {nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))for nodeName := range preRes.NodeNames {# 判断是否在节点快照中if nodeInfo, err := sched.nodeInfoSnapshot.Get(nodeName); err == nil {nodes = append(nodes, nodeInfo)}
}# 调用过滤插件对node进行过滤
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, &diagnosis, nodes)
看这部分代码的时候,我的疑问是,这些插件是从哪来的?从这个问题出发,需要了解一下k8s里面几个结构体或者接口: Scheme
、Profile
、Scheduler
、framework.Framework
、frameworkImpl
否则代码看起来会很难受。
从代码fwk.RunPreFilterPlugins(xxx....)
可以知道我们是通过fwk
来调用插件的,所有的插件都在fwk
对象中,所以我们要了解fwk
是怎么来的。fwk
是frameworkImpl
结构体的实例, 它实现了framework.Framework
接口。
fwk对象是在ScheduleOne
函数刚开始的地方获取的,每次pod
的调度都会取一次
fwk, err := sched.frameworkForPod(pod)
看frameworkForPod
函数的实现如下
fwk
存放在sched
对象的一个Map中的,key是从pod中获取,说明不同的pod可以有不同的fwk
,所以才会在每个pod
调度的时候去重复获取这个fwk
对象。fwk代表了一种具体的调度策略的实现,因为在调度的时候,是fwk
决定哪个pod
在哪个node
上运行的。
fwk
可以配置具体使用哪些插件,从而可以定制具体的调度策略。那么scheduler
是干嘛的?对于扩展性有要求的软件设计中,都会有一层抽象是用来控制流程,然后流程具体执行细节由下一层的对象来实现。在这里,scheduler
负责控制流程,Framework
是细节的规范,像Java的接口,frameworkImpl
是具体的实现,是接口的实现。frameworkImpl
负责实现细节,而在frameworkImpl
中为了灵活扩展,又搞了一层插件抽象,想要实现自己的调度逻辑,只需要实现插件接口然后修改配置文件即可。
回到最初的问题,插件存在fwk
对象里,fwk
对象是sched
对象的,所以要找到答案还得继续看sched对象是如何创建fwk。
由于链路比较长,截取了关键的一些代码并添加了注释说明,水平有限,只能搞个流水账来记录了
# pkg/scheduler/apis/config/scheme/scheme.go
# 该函数调用会往Scheme中注册一个设置默认值的函数
func init() {AddToScheme(Scheme)
}# latest.Default() 返回设置了默认值的调度器配置对象
cfg, err := latest.Default()
# 最后就在 cfg 赋值给了 cc.ComponentConfig.Profiles
opts.ComponentConfig = cfg
c, err := opts.Config(ctx)
cc := c.Complete()# Setup函数中创建sched对象,前面得到的cc.ComponentConfig.Profiles作为参数
sched, err := scheduler.New(ctx,...省略了好几个参数,scheduler.WithProfiles(cc.ComponentConfig.Profiles...),# NewMap函数中,会创建fwk对象并保存到scheduler持有的map中
# m就是sched中用来存放fwk对象的那个Map,这个p就是我们要找的fwk对象
# cfgs就是前面的cc.ComponentConfig.Profiles
for _, cfg := range cfgs {p, err := newProfile(ctx, cfg, r, recorderFact, opts...)m[cfg.SchedulerName] = p
}# 在newProfile中,调用了NewFramework创建fwk对象并返回
return frameworkruntime.NewFramework(ctx, r, &cfg, opts...)# 在NewFramework中,调用下面的函数将插件分类,也就是放入fwk的不同插件的数组中
# 等待调度的时候调用
if len(profile.Plugins.MultiPoint.Enabled) > 0 {if err := f.expandMultiPointPlugins(logger, profile); err != nil {return nil, err}
}
总的来说就是插件是在生成默认配置文件的时候就已经确定了有哪些插件,只是等到创建fwk对象的时候才分类,最后在pod的调度循环中,被调用。
节点评分
评分的流程就是这么朴实无华,先调用一遍PreScorePlugins插件,然后调一遍ScorePlugins。
# 调用所有PreScorePlugins
preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)# 调用所有ScorePlugins
nodesScores, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
nodeScores就是评分插件输出的结果,是一个数组[]NodePluginScores
不难想象出,一个node对应一个NodePluginScores. 一个插件对一个节点进行一次评分得到一个PluginScore对象,多个插件评分后得到一个数组。插件评分完后,计算得到最终的评分TotalScore.
下面以经典插件PodTopologySpread来看看PreScore以及Score这两个阶段是怎么做的。PodTopologySpread插件的说明可以参考下面的文章
https://cloud.tencent.com/developer/article/1631990
TopologySpreadConstraint有哪些字段以及字段的作用,可以看官方文档
https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.32/#topologyspreadconstraint-v1-core
PreScore函数简化后如下
# 获取所有节点
allNodes, err := pl.sharedLister.NodeInfos().List()# 初始化用于存储数据的对象
state := &preScoreState{IgnoredNodes: sets.New[string](),
}
# 初始化函数会统计有多少有带有label的节点,
err = pl.initPreScoreState(state, pod, filteredNodes, requireAllTopologies)# 遍历被选中的node
for _, node := range filteredNodes {# 一个pod上可以有多个拓扑约束 topologySpreadConstraintsfor i, constraint := range s.Constraints {# 对应yaml文件上的 topologyKey对应的值value := node.Node().Labels[constraint.TopologyKey]# value对应 区域lable tpCount := state.TopologyValueToPodCounts[i][value]# tpCount 的值是 0 , 如果为nil表示该节点没有区域label,所以不用计算pod数量if tpCount == nil {continue}# 从node中获取pod的信息,统计符合seletor规则的pod的个数count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)# 更新tpCount的值atomic.AddInt64(tpCount, int64(count))}
}# 保存结果
cycleState.Write(preScoreStateKey, state)
可以看到在PreScore阶段,统计了各个区域的pod的数量,并将结果保存起来,等待Score阶段计算分数。
下面是Score函数的调用
代码位置pkg/scheduler/framework/plugins/podtopologyspread/scoring.go :195
在f.runScorePlugin
中会调用插件的Score
函数。Score函数是对单个node的评分进行计算
计算的逻辑很简单,取出在PreScroe阶段得到pod计数、权重,然后加上yaml文件中的MaxSkew作为参数传入函数 scoreForCount
中计算
func scoreForCount(cnt int64, maxSkew int32, tpWeight float64) float64 {return float64(cnt)*tpWeight + float64(maxSkew-1)
}
scoreForCount
计算某个拓扑域的调度分数,因为每个节点的tpWeight 固定的,会让 已有 Pod 多的拓扑域(cnt大)得分更高,这看起来像是鼓励 Pod 聚集,但又会因为MaxSkew有限制,当pod的数量 - 其他节点 > MaxSkew时,该节点不会再分配到Pod,从而保证了不会过于倾斜。
这里说tpWeight是固定的是因为计算权重的函数如下:
func topologyNormalizingWeight(size int) float64 {return math.Log(float64(size + 2))
}
其计算用的入参size值在计算权重时候是固定的,且是所有节点共用的。
结尾
本来想写完失败pod的调度的,但是最近没啥状态,就不写了。还是多搞点操作的文章好点,至少要用的时候能抄抄。。。