深入理解 Kubernetes 调度器:从 scheduleOne 到 Pod-Node 绑定

深入理解 Kubernetes 调度器:从 scheduleOne 到 Pod-Node 绑定

11 分钟阅读

这是一篇 kube-scheduler 的源码阅读笔记,重点回答两个问题:

  1. 调度器内部一次完整的调度循环走了哪些代码路径?
  2. Bind 之后,pod 的 spec.nodeName 字段到底是谁写进去的?

代码版本基于 kubernetes 1.18 / 1.19 前后,那个时期 scheduler framework 刚成熟,in-tree 插件大都已经迁移到这个模型。


Scheduler 结构体

调度器的核心结构体定义在 pkg/scheduler/scheduler.go

type Scheduler struct {
	// It is expected that changes made via SchedulerCache will be observed
	// by NodeLister and Algorithm.
	SchedulerCache internalcache.Cache

	Algorithm ScheduleAlgorithm

	Extenders []framework.Extender

	// NextPod should be a function that blocks until the next pod
	// is available. We don't use a channel for this, because scheduling
	// a pod may take some amount of time and we don't want pods to get
	// stale while they sit in a channel.
	NextPod func() *framework.QueuedPodInfo

	// Error is called if there is an error. It is passed the pod in
	// question, and the error
	Error func(*framework.QueuedPodInfo, error)

	// Close this to shut down the scheduler.
	StopEverything <-chan struct{}

	// SchedulingQueue holds pods to be scheduled
	SchedulingQueue internalqueue.SchedulingQueue

	// Profiles are the scheduling profiles.
	Profiles profile.Map

	client clientset.Interface
}

一次调度的入口:scheduleOne

调度器的主循环是 Run 里反复调用的 scheduleOne

scheduleOne

它负责完成一个 Pod 的完整调度。先看整体的调度框架,再回头看主逻辑。

Scheduler Framework:插件化的调度流程

Scheduler Framework 把调度过程切成了一组扩展点,每个扩展点可以挂载多个插件:

  • QueueSortPlugin:对队列中的 Pod 进行排序,选出最先进行调度的 Pod
  • PreFilterPlugin:预过滤,检查 Pod 或集群信息,可以中断调度过程
  • FilterPlugin:过滤节点,过滤掉不适合的节点,备选节点可能是所有节点,也可以是一个提名节点
  • PostFilterPlugin:所有节点都被过滤掉时的兜底(典型应用:抢占调度)
  • PreScorePlugin:预打分
  • ScorePlugin:打分
  • ReservePlugin:预订
  • PermitPlugin:准入
  • PreBindPlugin:预绑定
  • BindPlugin:绑定节点,有一个插件绑定成功后即结束绑定过程
  • PostBindPlugin:结束绑定后的回调

QueueSort:选出下一个待调度的 Pod

调度队列的排序逻辑由 QueueSortPlugin 提供。当前实现里只会选第一个 profile 的 QueueSort:

lessFn := profiles[c.profiles[0].SchedulerName].QueueSortFunc()

一个 profile 由多个插件组成,一个 profile 就是一个 framework。Pod 通过 spec.schedulerName 字段选择 framework。

scheduleOne 主逻辑

  • 根据lessFn从优先级队列取出一个Pod
  • 选择Pod指定的调度器(profile)
  • 检查是否跳过调度
    • pod被删除 DeletionTimestamp != nil
    • 正在被调度,assumed
  • 调用 Algorithm.Schedule 该方法由 generic_shceduler 实现
  • 如果调度失败(err != nil), 进行抢占调度,在 PostFilter 过程中进行,并结束本次调度
  • 标记Pod正在调度 assumed
  • Reserve 预备点
  • Permit 准入点
  • 启动goroutine进行bind
    • WaitOnPermit 等待准入结果
    • PreBind 预Bind点
    • 调用 sched.bind
      • extendersBinding 返回第一个extender的正常结果
      • 如果未绑定,调用framework的Bind 点
    • PostBind 完成Bind点

generic_shceduler中的Schedule过程,

  • 检查Node数量
  • findNodesThatFitPod 预选,过滤不合适的节点
  • 检查预选后结果
    • 如果没有结果返回错误
    • 如果只有一个就直接返回
  • prioritizeNodes 打分,对节点进行打分
  • selectHost 选择最终节点:遍历分数最高,分数相同则随机 O(n)

findNodesThatFitPod 预选的过程,找到合适pod的所有(或一个)节点,

  • PreFilter 预过滤点,失败则返回所有节点均失败
  • 如果提名功能打开且 pod 中设置了 NominatedNodeName 提名节点,调用 evaluateNominatedNode
    • 可能由上一次抢占调度中进行提名
  • 如果有提名则 nodes 为提名节点,否则为所有节点
  • Filter 过滤点, findNodesThatPassFilters: framework RunFilter...
  • extender 过滤,findNodesThatPassExtenders: extender Filter

prioritizeNodes 打分的过程,给所有节点进行打分,各插件之间进行 combined(相加),extender 则加权重 maxscore(100) / extendermax(10)

  • 检查如果没有extenders也没有插件,所有节点分数为1
  • PreScore 预打分点
  • Score 打分点
  • 所有插件分数进行相加
  • 启动 goroutine 对所有 extenders 进行打分(Prioritize),将结果跟进权重进行累加进行零时分
  • 累加的临时分数,根据framework.MaxNodeScore(100) / extenderv1.MaxExtenderPriority(10) 权重放入最终分数

in-tree的调度算法

  • SelectorSpread: PreScore, Score 扩节点部署
  • ImageLocality: Score 选择已存在镜像
  • TaintToleration: Filter, Prescore, Score 污点容忍
  • NodeName: Filter 节点绑定
  • NodePorts: PreFilter, Filter 端口检查
  • NodePreferAvoidPods: Score 基于节点的 注解 scheduler.alpha.kubernetes.io/preferAvoidPods 打分
  • NodeAffinity: Filter, Score 节点选择器和亲和性
  • PodTopologySpread: PreFilter, Filter, PreScore, Score 拓扑区域
  • NodeUnschedulable: Filter 不可调用值
  • NodeResourceFit: PreFilter, Filter 检查资源是否满足
  • NodeResourcesBalancedAllocation: Score 选择资源更为均衡节点
  • NodeResourcesLeastAllocated: Score 选择资源分配少节点
  • VolumeBinding: PreFilter, Filter, Reserve, PreBind, Score 绑定PV
  • VolumeRestrictions: Filter 检查PV
  • VolumeZone: Filter PV可用区
  • NodeVolumeLimits: Filter PV限制
  • PrioritySort: QueueSort 优先级排序
  • DefaultBinder: Bind 默认绑定

看下 DefaultBinder 的实现,比较简单,创建一个Binding对象,

// Bind binds pods to nodes using the k8s client.
func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
	klog.V(3).InfoS("Attempting to bind pod to node", "pod", klog.KObj(p), "node", nodeName)
	binding := &v1.Binding{
		ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
		Target:     v1.ObjectReference{Kind: "Node", Name: nodeName},
	}
	err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
	if err != nil {
		return framework.AsStatus(err)
	}
	return nil
}

那么,最终这个 nodename 需要更新到 pod 对象上去的,是由谁来操作的呢, 或者说是不是kubelet在接收到binding对象后即可进行pod的创建。

直接查看 kubelet 的代码,找到3个source中的apiserver,位于 pkg/kubelet/config/apiserver.go

// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
	lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))
    // ...
}

注意到,list and watch 的查询条件是 spec.nodeName 为 nodename。 所以,对于kubelet来说,只关心pod的nodeName字段。 结合上面调度器中的bind过程,我们还需要继续跟进这个字段是谁来设置的, 按照一般思路,在bind过程中,直接 patch pod的这个字段即可,然而k8s并没有选择这样做。

最后通过一些关键字,搜索到 /kubernetes/pkg/registry/core/pod/storage/storage.go 中进行了 binding 的处理。

// Create ensures a pod is bound to a specific host.
func (r *BindingREST) Create(ctx context.Context, name string, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (out runtime.Object, err error) {

    // ...
    err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, dryrun.IsDryRun(options.DryRun))
	out = &metav1.Status{Status: metav1.StatusSuccess}
	return
}

`assignPod` 函数最终调用的是 `setPodHostAndAnnotations`,其主要逻辑就是去 store 中修改,
```go
// setPodHostAndAnnotations sets the given pod's host to 'machine' if and only if
// the pod is unassigned and merges the provided annotations with those of the pod.
// Returns the current state of the pod, or an error.
func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) {
	podKey, err := r.store.KeyFunc(ctx, podID)
	if err != nil {
		return nil, err
	}

	var preconditions *storage.Preconditions
	if podUID != "" || podResourceVersion != "" {
		preconditions = &storage.Preconditions{}
		if podUID != "" {
			preconditions.UID = &podUID
		}
		if podResourceVersion != "" {
			preconditions.ResourceVersion = &podResourceVersion
		}
	}

	err = r.store.Storage.GuaranteedUpdate(ctx, podKey, &api.Pod{}, false, preconditions, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
		pod, ok := obj.(*api.Pod)
		if !ok {
			return nil, fmt.Errorf("unexpected object: %#v", obj)
		}
		if pod.DeletionTimestamp != nil {
			return nil, fmt.Errorf("pod %s is being deleted, cannot be assigned to a host", pod.Name)
		}
		if pod.Spec.NodeName != "" {
			return nil, fmt.Errorf("pod %v is already assigned to node %q", pod.Name, pod.Spec.NodeName)
		}
        // 这个操作就是真正的绑定操作
		pod.Spec.NodeName = machine
		if pod.Annotations == nil {
			pod.Annotations = make(map[string]string)
		}
		for k, v := range annotations {
			pod.Annotations[k] = v
		}
		podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{
			Type:   api.PodScheduled,
			Status: api.ConditionTrue,
		})
		finalPod = pod
		return pod, nil
	}), dryRun, nil)
	return finalPod, err

小结

读完这一圈 kube-scheduler 的代码,最容易留下的几个印象:

  1. Framework 是把"插件化"做透的好例子 — 11 个扩展点把调度切得很细,每个插件只关心自己关心的事,很容易扩展自定义调度逻辑(比如把"GPU 拓扑亲和"做成一个 ScorePlugin)。
  2. Bind 不是直接 patch pod,而是走 Binding 子资源 — 这种"调度器只写 Binding,apiserver 负责落到 pod 上"的设计,让 Bind 看起来像一次"原子"的提交,也方便 audit。
  3. 预选 + 优选 + 抢占 — 这个经典的三段式被 framework 模型重新表达,PostFilter 就是抢占调度的入口;早期的 priorities/predicates 模型已经基本被 framework 替代。
  4. Scheduler 与 kubelet 之间的契约只有一条:spec.nodeName — Scheduler 写一下 nodeName,kubelet 通过 list/watch + fieldSelector 看到属于自己的 Pod,开始干活。整个 Pod 的"调度→落地"链路就是靠这个字段串起来的。

后续可以从这条主线延伸出去看:

  • kubelet 源码分析:Pod 调到节点后 kubelet 怎么把容器跑起来
  • 自定义 Score 插件实践(GPU 拓扑/网卡亲和/异构资源)
  • Volcano / Koordinator 这些扩展调度器是怎么在 framework 之上做批调度的
Zoe

Written by

Zoe

AI Infra Engineer · LLM Serving · GPU/RDMA · 造工具的偏执狂

评论