Kubernetes 学习(九)Kubernetes 源码阅读之正式篇------核心组件之 Scheduler

2023-07-11,,

0. 前言

继续上一篇博客阅读 Kubernetes 源码,参照《k8s 源码阅读》首先学习 Kubernetes 的一些核心组件,首先是 kube-scheduler
本文严重参考原文:《k8s 源码阅读》之 2.2 章节:scheduler,加入部分自己阅读的体会作为自己的阅读笔记
感谢《k8s 源码阅读》的作者们辛苦编写教材,在此郑重表示感谢,望大家多多支持!~

1. 整体设计

1.1 概述

官网描述:

The Kubernetes scheduler runs as a process alongside the other master components such as the API server.
Its interface to the API server is to watch for Pods with an empty PodSpec.NodeName, and for each Pod,
it posts a binding indicating where the Pod should be scheduled.
Scheduler 是相对独立的一个组件,主动访问 API server,寻找等待调度的 Pod(PodSpec.NodeName 为空)
然后通过一系列调度算法寻找哪个 Node 适合跑这个 Pod
然后将这个 Pod 和 Node 的绑定关系发给 API server,即整个调度流程

1.2 源码层次

cmd/kube-scheduler/scheduler.go:main() 函数入口位置,在 Scheduler 过程开始前的一系列初始化工作
pkg/scheduler/scheduler.go:调度框架整体逻辑,抽象调用各个算法的 Interface
pkg/scheduler/core/generic_scheduler.go:计算哪些 Node适合跑哪些 Pod 的具体算法

1.3 调度流程

通过一系列的 Predicates(预选) 过滤掉不能运行 Pod 的 Node

比如一个 Pod 需要 500M 的内存,有些节点剩余内存只有 100M 了,就会被剔除
通过一系列的 Priorities(优选)给剩下的 Node 排一个分数,寻找能够运行 Pod 的 Node 中最合适的一个
得分最高的一个 Node 胜出,获得了运行 Pod 的资格
若是上述预选过程中没有找到任何一个合适的 Node,则进入抢占模式,移除部分低优先级的 Pod,再选择 Node

1.4 Predicates 和 Priorities 策略

Predicates 是用于过滤不合适 Node的策略
Priorities 是用于计算 Node 分数的策略(作用在通过 Predicates 过滤的 Node上)
Kubernetes 默认内建了一些 Predicates 和 Priorities 策略,代码分别在:
pkg/scheduler/algorithm/predicates/predicates.go
pkg/scheduler/algorithm/priorities/

1.5 调度策略的修改

默认调度策略是通过 defaultPredicates() 和 defaultPriorities() 函数定义的
代码在 pkg/scheduler/algorithmprovider/defaults/defaults.go
可以通过命令行 flag --policy-config-file 来覆盖默认行为
可以修改 pkg/scheduler/algorithm/predicates/predicates.go/pkg/scheduler/algorithm/priorities/,然后注册到 defaultPredicates() 和 defaultPriorities() 来实现
或者通过配置文件:

{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
{"name" : "PodFitsHostPorts"},
{"name" : "PodFitsResources"},
{"name" : "NoDiskConflict"},
{"name" : "NoVolumeZoneConflict"},
{"name" : "MatchNodeSelector"},
{"name" : "HostName"}
],
"priorities" : [
{"name" : "LeastRequestedPriority", "weight" : },
{"name" : "BalancedResourceAllocation", "weight" : },
{"name" : "ServiceSpreadingPriority", "weight" : },
{"name" : "EqualPriority", "weight" : }
],
"hardPodAffinitySymmetricWeight" : ,
"alwaysCheckAllPredicates" : false
}

2. 启动前逻辑

Scheduler 可以分为三层,第一层是调度器启动前的逻辑,包括:命令行参数解析、参数校验、调度器初始化等一系列逻辑

2.1 Cobra

2.1.1 简介

Cobra 既是一个创建强大的现代化命令行程序的库,又是一个用于生成应用和命令行文件的程序。有很多流行的 Golang 项目用了 Cobra(Kubernetes 和 Docker)

2.1.2 具体使用

Go 1.11 版本以上先暂时关闭 go modules,安装 Cobra:go get -u github.com/spf13/cobra/cobra

若是网卡,可能导致安装不成功,可以先下载报错的依赖包,再重新 go get 安装
安装成功后会看到 $GOPATH/
进入 $GOPATH/src 目录,初始化项目:cobra init myapp --pkg-name myapp

$ cobra init myapp --pkg-name myapp
Your Cobra applicaton is ready at
/home/ao/go/src/myapp
$ ls myapp
cmd LICENSE main.go
$ pwd
/home/ao/go/src

本地可以看到一个 main.go 和一个 cmd 目录
查看 cmd/root.go:

package cmd

import (
"fmt"
"os"
"github.com/spf13/cobra" homedir "github.com/mitchellh/go-homedir"
"github.com/spf13/viper" ) var cfgFile string // rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: "myapp",
Short: "A brief description of your application",
Long: `A longer description that spans multiple lines and likely contains
examples and usage of using your application. For example: Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
// Uncomment the following line if your bare application
// has an action associated with it:
// Run: func(cmd *cobra.Command, args []string) { },
}

查看 main.go:

package main

import "myapp/cmd"

func main() {
cmd.Execute()
}

main.go 里 import了 myapp/cmd(也就是 root.go 文件)

在 Execute 里面调用了 rootCmd.Execute() 方法
rootCmd 是*cobra.Command 类型
添加新的命令:cobra add version

$ cobra add version
version created at /home/ao/go/src/myapp%

查看 cmd/version.go:

init() 函数里面调用 rootCmd.AddCommand(versionCmd),子命令就是 version

package cmd

import (
"fmt" "github.com/spf13/cobra"
) // versionCmd represents the version command
var versionCmd = &cobra.Command{
Use: "version",
Short: "A brief description of your command",
Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example: Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("version called")
},
} func init() {
rootCmd.AddCommand(versionCmd)
......
}

添加多级子命令:

$ cobra add server
server created at /home/ao/go/src/myapp%
$ cobra add create -p serverCmd
create created at /home/ao/go/src/myapp%

查看 cmd/create.go 文件:

init() 函数调用 serverCmd.AddCommand(createCmd),表示 create 为子命令

package cmd

import (
"fmt" "github.com/spf13/cobra"
) // createCmd represents the create command
var createCmd = &cobra.Command{
Use: "create",
Short: "A brief description of your command",
Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example: Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("create called")
},
} func init() {
serverCmd.AddCommand(createCmd)
......
}

2.2 Scheduler 的 main 函数

下面正式进入 Kubernetes 的 Scheduler 源码阅读
查看 cmd/kube-scheduler/scheduler.go,删除非主干代码:

func main() {
command := app.NewSchedulerCommand()
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit()
}
}

查看定义 NewSchedulerCommand 的 cmd/kube-scheduler/app/server.go,删除非主干代码:

/ NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary.`,
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, args, opts); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit()
}
},
}
return cmd
}

Schduler 启动时调用了 runCommand(cmd, args, opts),查看 cmd/kube-scheduler/app/server.go 关于 runCommand 定义,删除非主干代码:

// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
c, err := opts.Config()
stopCh := make(chan struct{})
// Get the completed config
cc := c.Complete()
return Run(cc, stopCh)
}

处理配置问题后调用了一个 Run() 函数,Run() 的作用是基于给定的配置启动 Scheduler,它只会在出错时或者 channel stopCh 被关闭时才退出,查看 cmd/kube-scheduler/app/server.go 关于 Run 定义,删除非主干代码:

// Run executes the scheduler based on the given configuration. It only return on error or when stopCh is closed.
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory.Core().V1().Nodes(),
stopCh,
scheduler.WithName(cc.ComponentConfig.SchedulerName)) // Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
} ctx, cancel := context.WithCancel(context.TODO())
defer cancel() go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}() // Leader election is disabled, so runCommand inline until done.
run(ctx)
return fmt.Errorf("finished without leader elect")
}

最终是要跑 sched.Run() 这个方法来启动 Scheduler,sched.Run() 方法就是 Scheduler 框架真正运行的逻辑
上述有一个 sched 变量,linux 里经常会看到一些软件叫 ***d,d 也就是 daemon,守护进程的意思,也就是一直跑在后台的一个程序
这里的 sched 也就是指 “Scheduler Daemon“
sched 的其实是 *Scheduler 类型,定义在 pkg/scheduler/scheduler.go
Scheduler 监控新创建的未被调度的 Pods,然后尝试寻找合适的 Node,回写一个绑定关系到 API Server
这里也可以体会到 Daemon 的感觉,我们平时搭建的 k8s 集群中运行着一个 Daemon 进程叫做 kube-scheduler,在程序里面也就对应这样一个对象:Scheduler

// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
config *factory.Config
}

Scheduler 结构体中的 Config 属性对象定义在 pkg/scheduler/factory/factory.go,主要属性如下:

// Config is an implementation of the Scheduler's configured input data.
type Config struct {
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
SchedulerCache schedulerinternalcache.Cache
// Ecache is used for optimistically invalid affected cache items after
// successfully binding a pod
Ecache *equivalence.Cache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
GetBinder func(pod *v1.Pod) Binder
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
PodConditionUpdater PodConditionUpdater
// PodPreemptor is used to evict pods and update pod annotations.
PodPreemptor PodPreemptor // NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() *v1.Pod // SchedulingQueue holds pods to be scheduled
SchedulingQueue internalqueue.SchedulingQueue
}

3. 整体调度框架

3.1 启动

调度的第二层:负责 Scheduler 除了具体 Node 过滤算法外的工作逻辑
Scheduler 对象绑定了一个 Run() 方法,定义如下:

// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, , sched.config.StopEverything)
}

Run 函数在 cmd/kube-scheduler/app/server.go 调用:

    // Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}

调用了 sched.Run() 之后就等待 ctx.Done()
wait.Until 实现:每隔 n 时间调用 f 一次,除非 channel c 被关闭
这里的 n 就是 0,也就是一直调用,前一次调用返回下一次调用就开始了
f 指 sched.scheduleOne,c 指 sched.config.StopEverything

3.2 一个 Pod 的基本调度流程

scheduleOne 实现 1 个 Pod 的完整调度工作流
这个过程是顺序执行的,也就是非并发的
即前一个 Pod 的 scheduleOne 一完成,下一个 Pod 的 ScheduleOne 立马接着执行
scheduleOne 的主要逻辑:

func (sched *Scheduler) scheduleOne() {
pod := sched.config.NextPod()
suggestedHost, err := sched.schedule(pod)
if err != nil {
if fitError, ok := err.(*core.FitError); ok {
preemptionStartTime := time.Now()
sched.preempt(pod, fitError)
}
return
}
assumedPod := pod.DeepCopy()
allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
err = sched.assume(assumedPod, suggestedHost)
go func() {
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: suggestedHost,
},
})
}()
}

参考流程如下:

3.3 Schedule 算法计算合适 Node

主流程核心步骤是 suggestedHost, err := sched.schedule(pod)
这里完成了非抢占模式下 Node 的计算,包括预选过程、优选过程等
sched.schedule(pod) 方法定义见 pkg/scheduler/scheduler.go

// schedule implements the scheduling algorithm and returns the suggested host.
func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
if err != nil {
pod = pod.DeepCopy()
sched.config.Error(pod, err)
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err)
sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
LastProbeTime: metav1.Now(),
Reason: v1.PodReasonUnschedulable,
Message: err.Error(),
})
return "", err
}
return host, err
}

里面调用了 sched.config.Algorithm.Schedule() 方法(本身为接口),定义见 pkg/scheduler/algorithm/scheduler_interface.go

type ScheduleAlgorithm interface {
Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
Predicates() map[string]FitPredicate
Prioritizers() []PriorityConfig
}

这个接口有 4 个方法,实现 ScheduleAlgorithm 接口的对象实现如何调度 Pods 到 Nodes 上
默认的实现是 pkg/scheduler/core/generic_scheduler.gogenericScheduler 对象
Schedule():给定 Pod 和 Nodes,计算出一个适合跑 Pod 的 Node 并返回
Preempt():抢占模式
Predicates():预选过程
Prioritizers():优选过程

4. 一般调度过程

6. 参考文献

《k8s 源码阅读》之 2.2 章节:scheduler

Kubernetes 学习(九)Kubernetes 源码阅读之正式篇------核心组件之 Scheduler的相关教程结束。

《Kubernetes 学习(九)Kubernetes 源码阅读之正式篇------核心组件之 Scheduler.doc》

下载本文的Word格式文档,以方便收藏与打印。