欢迎关注本专栏,本专栏主要从 K8s 源码出发,深入理解 K8s 一些组件底层的代码逻辑,同时借助 debug Minikube 来进一步了解 K8s 底层的代码运行逻辑细节,帮助我们更好的了解不为人知的运行机制,让自己学会如何调试源码,玩转 K8s。
本专栏适合于运维、开发以及希望精进 K8s 细节的同学。同时本人水平有限,尽量将本人理解的内容最大程度的展现给大家~
前情提要:
《K8s 源码剖析及debug实战(一):Minikube 安装及源码准备》
《K8s 源码剖析及debug实战(二):debug K8s 源码》
本文主要介绍 K8s 的 Kube-Scheduler 源码的启动流程,看看 Kube-Scheduler 是怎么启动的。文中采用的 K8s 版本是 v1.16。
程序入口在 K8s cmd/kube-scheduler/scheduler.go
文件下:
func main() {
rand.Seed(time.Now().UnixNano())
// 关键逻辑,启动一个 scheduler 命令
command := app.NewSchedulerCommand()
pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
os.Exit(1)
}
}
以命令行方式启动程序。
NewSchedulerCommand
在 cmd/kube-scheduler/app/server.go
中,定义命令行的一些参数,看下整体代码,其中核心代码就一句:
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
opts, err := options.NewOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary.`,
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, args, opts, registryOptions...); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
fs := cmd.Flags()
namedFlagSets := opts.Flags()
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
usageFmt := "Usage:\n %s\n"
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cmd.SetUsageFunc(func(cmd *cobra.Command) error {
fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
return nil
})
cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
})
cmd.MarkFlagFilename("config", "yaml", "yml", "json")
return cmd
}
核心的代码就这一句:
cmd := &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, args, opts, registryOptions...); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
就是这个 runCommand
,别看就这一句,里面别有洞天,里面的代码就是调度器的核心逻辑。
runCommand
同样在 cmd/kube-scheduler/app/server.go
中,其中关键的代码就两句。
func runCommand(cmd *cobra.Command, args []string, opts *options.Options, registryOptions ...Option) error {
...
// 还记得启动参数有一个scheduler.conf吗,这里就根据传入的配置文件创建:包含连接apiserver的client、pod Informer等
// 如果不知道什么是 Informer,可以先 hold,本质上就是一个缓存,缓存环境里的对象,降低对apiserver的访问
c, err := opts.Config()
...
// 这里就是启动scheduler的代码了!
return Run(cc, stopCh, registryOptions...)
}
Run
同样在 cmd/kube-scheduler/app/server.go
中,这里代码点开看的话会觉得很复杂,可以给大家先看下:
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, registryOptions ...Option) error {
// To help debugging, immediately log version
klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())
registry := framework.NewRegistry()
for _, option := range registryOptions {
if err := option(registry); err != nil {
return err
}
}
// Prepare event clients.
if _, err := cc.Client.Discovery().ServerResourcesForGroupVersion(eventsv1beta1.SchemeGroupVersion.String()); err == nil {
cc.Broadcaster = events.NewBroadcaster(&events.EventSinkImpl{Interface: cc.EventClient.Events("")})
cc.Recorder = cc.Broadcaster.NewRecorder(scheme.Scheme, cc.ComponentConfig.SchedulerName)
} else {
recorder := cc.CoreBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: cc.ComponentConfig.SchedulerName})
cc.Recorder = record.NewEventRecorderAdapter(recorder)
}
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory.Core().V1().Nodes(),
cc.PodInformer,
cc.InformerFactory.Core().V1().PersistentVolumes(),
cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
cc.InformerFactory.Core().V1().ReplicationControllers(),
cc.InformerFactory.Apps().V1().ReplicaSets(),
cc.InformerFactory.Apps().V1().StatefulSets(),
cc.InformerFactory.Core().V1().Services(),
cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
cc.InformerFactory.Storage().V1().StorageClasses(),
cc.InformerFactory.Storage().V1beta1().CSINodes(),
cc.Recorder,
cc.ComponentConfig.AlgorithmSource,
stopCh,
registry,
cc.ComponentConfig.Plugins,
cc.ComponentConfig.PluginConfig,
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
if err != nil {
return err
}
// Prepare the event broadcaster.
if cc.Broadcaster != nil && cc.EventClient != nil {
cc.Broadcaster.StartRecordingToSink(stopCh)
}
if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil {
cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
}
// Setup healthz checks.
var checks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
}
// Start up the healthz server.
if cc.InsecureServing != nil {
separateMetrics := cc.InsecureMetricsServing != nil
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
if err := cc.InsecureServing.Serve(handler, 0, stopCh); err != nil {
return fmt.Errorf("failed to start healthz server: %v", err)
}
}
if cc.InsecureMetricsServing != nil {
handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
if err := cc.InsecureMetricsServing.Serve(handler, 0, stopCh); err != nil {
return fmt.Errorf("failed to start metrics server: %v", err)
}
}
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}
// Start all informers.
go cc.PodInformer.Informer().Run(stopCh)
cc.InformerFactory.Start(stopCh)
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(stopCh)
// Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}
ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
defer cancel()
go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()
// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
leaderElector.Run(ctx)
return fmt.Errorf("lost lease")
}
// Leader election is disabled, so runCommand inline until done.
run(ctx)
return fmt.Errorf("finished without leader elect")
}
看到倒数第二句吗?这里就启动了调度器!
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, registryOptions ...Option) error {
...
run(ctx)
...
}
好了这篇文章暂时讲到这里,主要介绍 Kube-Scheduler 的启动流程,后面文章详细介绍 Run
方法涉及到的细节,持续更新中,欢迎关注!
《K8s 源码剖析及debug实战(一):Minikube 安装及源码准备》
《K8s 源码剖析及debug实战(二):debug K8s 源码》