自定义 scheduler

本文是翻译文,主要用于理解 kube-scheduler的运行逻辑,仅仅为code snippet笔记,非 extend scheduler,如自定义调度策略、批量调度、磁盘调度等不在本文讨论范围。

原文地址:https://banzaicloud.com/blog/k8s-custom-scheduler

scheduler在 pod 生命周期中的作用:

  • 创建一个Pod,在etcd中保存,这个时候 pod没有 nodename 字段
  • 调度程序scheduler会watch 到没有绑定节点的新容器。
  • 为 pod 找到最适合的节点,并将 node name 打在 pod 上
  • apiserver将Pod绑定到节点->将新的态保存为etcd。
  • Kubelet监听Pod状态,发现有了 node name,并在特定节点上启动容器。

一般来说,我们有4种扩展 Kubernetes 调度器的方法。

  • 一种方法就是直接 clone 官方的 kube-scheduler 源代码,在合适的位置直接修改代码,然后重新编译运行修改后的程序,当然这种方法是最不建议使用的,也不实用,因为需要花费大量额外的精力来和上游的调度程序更改保持一致。

  • 第二种方法就是和默认的调度程序一起运行独立的调度程序,默认的调度器和我们自定义的调度器可以通过 Pod 的 spec.schedulerName 来覆盖各自的 Pod,默认是使用 default 默认的调度器,但是多个调度程序共存的情况下也比较麻烦,比如当多个调度器将 Pod 调度到同一个节点的时候,可能会遇到一些问题,因为很有可能两个调度器都同时将两个 Pod 调度到同一个节点上去,但是很有可能其中一个 Pod 运行后其实资源就消耗完了,并且维护一个高质量的自定义调度程序也不是很容易的,因为我们需要全面了解默认的调度程序,整体 Kubernetes 的架构知识以及各种 Kubernetes API 对象的各种关系或限制。

  • 第三种方法是调度器扩展程序,这个方案目前是一个可行的方案,可以和上游调度程序兼容,所谓的调度器扩展程序其实就是一个可配置的 Webhook 而已,里面包含 过滤器 和 优先级 两个端点,分别对应调度周期中的两个主要阶段(过滤和打分)。

  • 第四种方法是通过调度框架(Scheduling Framework),Kubernetes v1.15 版本中引入了可插拔架构的调度框架,使得定制调度器这个任务变得更加的容易。调库框架向现有的调度器中添加了一组插件化的 API,该 API 在保持调度程序“核心”简单且易于维护的同时,使得大部分的调度功能以插件的形式存在,而且在我们现在的 v1.16 版本中上面的 调度器扩展程序 也已经被废弃了,所以以后调度框架才是自定义调度器的核心方式。

现在一般使用第三种和第四种,可扩展性更高。具体解释可以看自定义 Kubernetes 调度器,下面的自定义调度为第二种。即修改spec.schedulerName

我们在Go中实现一个调度程序,该调度程序完全随机,选择合适的节点。该调度程序不适合生产使用,这是了解调度程序如何工作的一个很好的例子。

初始化 k8s client

使用 incluster 的模式创建 client

func NewScheduler(podQueue chan *v1.Pod, quit chan struct{}) Scheduler {
    config, err := rest.InClusterConfig()
    if err != nil {
        log.Fatal(err)
    }

    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Fatal(err)
    }

    return Scheduler{
        clientset:  clientset,
        podQueue:   podQueue,
        nodeLister: initInformers(clientset, podQueue, quit),
        predicates: []predicateFunc{
            randomPredicate,
        },
        priorities: []priorityFunc{
            randomPriority,
        },
    }
}

使用Informer 监听 node 和 pod 资源

过滤 pod.Spec.NodeName == “”和 pod.Spec.SchedulerName == schedulerName

func initInformers(clientset *kubernetes.Clientset, podQueue chan *v1.Pod, quit chan struct{}) listersv1.NodeLister {
    factory := informers.NewSharedInformerFactory(clientset, 0)

    nodeInformer := factory.Core().V1().Nodes()
    nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            node, ok := obj.(*v1.Node)
            if !ok {
                log.Println("this is not a node")
                return
            }
            log.Printf("New Node Added to Store: %s", node.GetName())
        },
    })

    podInformer := factory.Core().V1().Pods()
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod, ok := obj.(*v1.Pod)
            if !ok {
                log.Println("this is not a pod")
                return
            }
            if pod.Spec.NodeName == "" && pod.Spec.SchedulerName == schedulerName {
                podQueue <- pod
            }
        },
    })

    factory.Start(quit)
    return nodeInformer.Lister()
}

为 pod 挑选 node

  1. 得到所有 node
  2. 筛选合适的 node: s.runPredicates方法,二进制运算rand.Intn(2)==0 即可
  3. 按权重s.prioritize得到最合适的 node: s.findBestNode
nodes, err := s.nodeLister.List(labels.Everything())
    if err != nil {
        return "", err
    }

    filteredNodes := s.runPredicates(nodes, pod)
    if len(filteredNodes) == 0 {
        return "", errors.New("failed to find node that fits pod")
    }
    priorities := s.prioritize(filteredNodes, pod)
    return s.findBestNode(priorities), nil

func (s *Scheduler) runPredicates(nodes []*v1.Node, pod *v1.Pod) []*v1.Node {
    filteredNodes := make([]*v1.Node, 0)
    for _, node := range nodes {
        if s.predicatesApply(node, pod) {
            filteredNodes = append(filteredNodes, node)
        }
    }
    log.Println("nodes that fit:")
    for _, n := range filteredNodes {
        log.Println(n.Name)
    }
    return filteredNodes
}

func (s *Scheduler) predicatesApply(node *v1.Node, pod *v1.Pod) bool {
    for _, predicate := range s.predicates {
        if !predicate(node, pod) {
            return false
        }
    }
    return true
}

func randomPredicate(node *v1.Node, pod *v1.Pod) bool {
    r := rand.Intn(2)
    return r == 0
}

func (s *Scheduler) prioritize(nodes []*v1.Node, pod *v1.Pod) map[string]int {
    priorities := make(map[string]int)
    for _, node := range nodes {
        for _, priority := range s.priorities {
            priorities[node.Name] += priority(node, pod)
        }
    }
    log.Println("calculated priorities:", priorities)
    return priorities
}

得到合适的 node 之后,将 nodename 绑定到 pod 上

err = s.bindPod(p, node)
    if err != nil {
        log.Println("failed to bind pod", err.Error())
        return
    }

message := fmt.Sprintf("Placed pod [%s/%s] on %s\n", p.Namespace, p.Name, node)

func (s *Scheduler) bindPod(p *v1.Pod, node string) error {
    return s.clientset.CoreV1().Pods(p.Namespace).Bind(&v1.Binding{
        ObjectMeta: metav1.ObjectMeta{
            Name:      p.Name,
            Namespace: p.Namespace,
        },
        Target: v1.ObjectReference{
            APIVersion: "v1",
            Kind:       "Node",
            Name:       node,
        },
    })
}

记录 events

err = s.emitEvent(p, message)
    if err != nil {
        log.Println("failed to emit scheduled event", err.Error())
        return
    }

func (s *Scheduler) emitEvent(p *v1.Pod, message string) error {
    timestamp := time.Now().UTC()
    _, err := s.clientset.CoreV1().Events(p.Namespace).Create(&v1.Event{
        Count:          1,
        Message:        message,
        Reason:         "Scheduled",
        LastTimestamp:  metav1.NewTime(timestamp),
        FirstTimestamp: metav1.NewTime(timestamp),
        Type:           "Normal",
        Source: v1.EventSource{
            Component: schedulerName,
        },
        InvolvedObject: v1.ObjectReference{
            Kind:      "Pod",
            Name:      p.Name,
            Namespace: p.Namespace,
            UID:       p.UID,
        },
        ObjectMeta: metav1.ObjectMeta{
            GenerateName: p.Name + "-",
        },
    })
    if err != nil {
        return err
    }
    return nil
}

打出 dockerfile:

# build stage
FROM golang:1.11-alpine as backend
RUN apk add --update --no-cache bash ca-certificates curl git make tzdata

RUN mkdir -p /go/src/github.com/martonsereg/scheduler
ADD Gopkg.* Makefile /go/src/github.com/martonsereg/scheduler/
WORKDIR /go/src/github.com/martonsereg/scheduler
RUN make vendor
ADD . /go/src/github.com/martonsereg/scheduler

RUN make build

FROM alpine:3.7
COPY --from=backend /usr/share/zoneinfo/ /usr/share/zoneinfo/
COPY --from=backend /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=backend /go/src/github.com/martonsereg/scheduler/build/scheduler /bin

ENTRYPOINT ["/bin/scheduler"]

部署 scheduler


apiVersion: apps/v1 kind: Deployment metadata: name: random-scheduler labels: app: random-scheduler spec: replicas: 1 selector: matchLabels: app: random-scheduler template: metadata: labels: app: random-scheduler spec: serviceAccount: random-scheduler containers: - name: random-scheduler image: martonsereg/scheduler:v0.3 imagePullPolicy: Always

Written by

说点什么

欢迎讨论

avatar

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据

  Subscribe  
提醒