kubernetes版本:1.23.0
本文的源码分析调用链路图如上图所示,话不多说,我们开始逐步分析kebelet的启动流程。
首先找到kubelet的入口函数,一般入口函数的位置是在/cmd文件夹下。这里是main函数实现入口,主要是封装了cobra命令行基础类型并启动。Cobra 是一个用于创建命令行应用程序的 Go 语言开源框架。它提供了一套简单而强大的工具,用于定义命令、子命令、标志和参数,以及处理用户输入和执行相应的操作。
Cobra:一种基于golang的命令行开发框架(一)
期望深入了解的读者请参考这篇文章
\k8s.io\kubernetes\cmd\kubelet\kubelet.go
func main() {
// 组装cobra的命令行实现逻辑,这是是kubelet核心逻辑入口,这里的command的实现类是*cobra.Command,也就是cobra框架的基础类型
command := app.NewKubeletCommand()
// kubelet uses a config file and does its own special
// parsing of flags and that config file. It initializes
// logging after it is done with that. Therefore it does
// not use cli.Run like other, simpler commands.
// 继续追踪run()可以看到这里实现的是cobra框架的启动,可以追踪到该函数中有command.Execute()代码逻辑,该函数实现的是cobra命令行工具启动逻辑
code := run(command)
os.Exit(code)
}
继续追踪app.NewKubeletCommand()函数.这个函数主要分为两个部分:
func NewKubeletCommand() *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
// 1、kubelet配置分两部分:
// KubeletFlag: kubelet的启动参数
// KubeletConfiguration: 指可以在集群中各个Nodes之间共享的配置集,可以进行动态配置,一般解析的路径是:/var/lib/kubelet/config.yaml
kubeletFlags := options.NewKubeletFlags()
kubeletConfig, err := options.NewKubeletConfiguration()
// programmer error
if err != nil {
klog.ErrorS(err, "Failed to create a new kubelet configuration")
os.Exit(1)
}
// 2.构建cobra的命令结构体
cmd := &cobra.Command{
.....
// 这里注意一下,这个参数是不走cobra的flag解析逻辑,所有的flag均有kubelet程
//序自主解析
DisableFlagParsing: true,
Run: func(cmd *cobra.Command, args []string) {
.....
//1.处理flag和args的逻辑省略
// 这里就是上述flag和kubeletConfig的解析结果保存在kubeletServer中
kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}
// 2.KubeletDeps是kubelet启动所依赖的所有资源,包含集群的连接客户
//端.....
kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
if err != nil {
klog.ErrorS(err, "Failed to construct kubelet dependencies")
os.Exit(1)
}
// 动态kubelet config才依赖这个配置,这里看起来是由一个controller来纳
//管kubelet的config
kubeletDeps.KubeletConfigController = kubeletConfigController
if err := checkPermissions(); err != nil {
klog.ErrorS(err, "kubelet running with insufficient permissions")
}
ctx := genericapiserver.SetupSignalContext()
config := kubeletServer.KubeletConfiguration.DeepCopy()
for k := range config.StaticPodURLHeader {
config.StaticPodURLHeader[k] = []string{"<masked>"}
}
// log the kubelet's config for inspection
klog.V(5).InfoS("KubeletConfiguration", "configuration", config)
// run the kubelet
// 3.这里启动kubelet
if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
klog.ErrorS(err, "Failed to run kubelet")
os.Exit(1)
}
},
}
.....
return cmd
}
额外看一下UnsecuredDependencies这个函数,我想看一下这个kubelet启动依赖的资源有哪些,在这个函数中初始化了docker的相关配置,并且配置了一个ExperimentalMounterPath,这个ExperimentalMounterPath 是一个与 Kubernetes 相关的实验性功能,用于指定容器运行时(如 Docker 或 containerd)在节点上挂载容器文件系统的路径。
func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.FeatureGate) (*kubelet.Dependencies, error) {
// Initialize the TLS Options
// 初始化一写些tls的选项TLSCertFile and TLSPrivateKeyFile,tls协议相关的
tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
if err != nil {
return nil, err
}
mounter := mount.New(s.ExperimentalMounterPath)
subpather := subpath.New(mounter)
hu := hostutil.NewHostUtil()
var pluginRunner = exec.New()
var dockerOptions *kubelet.DockerOptions
// 如果配置的容器运行时是docker,在这里添加默认的docker options
if s.ContainerRuntime == kubetypes.DockerContainerRuntime {
dockerOptions = &kubelet.DockerOptions{
DockerEndpoint: s.DockerEndpoint,
RuntimeRequestTimeout: s.RuntimeRequestTimeout.Duration,
ImagePullProgressDeadline: s.ImagePullProgressDeadline.Duration,
}
}
plugins, err := ProbeVolumePlugins(featureGate)
if err != nil {
return nil, err
}
// 这里仅仅初始化了一个空的dependence,应该是在后续流程里初始化其余字段
return &kubelet.Dependencies{
Auth: nil, // default does not enforce auth[nz]
CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
Cloud: nil, // cloud provider might start background processes
ContainerManager: nil,
DockerOptions: dockerOptions,
KubeClient: nil,
HeartbeatClient: nil,
EventClient: nil,
HostUtil: hu,
Mounter: mounter,
Subpather: subpather,
OOMAdjuster: oom.NewOOMAdjuster(),
OSInterface: kubecontainer.RealOS{},
VolumePlugins: plugins,
DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner),
TLSOptions: tlsOptions}, nil
}
继续追踪上文的NewKubeletCommand中的Run函数,该函数的逻辑是实现落在了\k8s.io\kubernetes\cmd\kubelet\app\server.go代码中。该函数主要做了这么几件事情:
1、为 kubelet 设置默认的 FeatureGates,kubelet 所有的 FeatureGates 可以通过命令参数查看,k8s 中处于 Alpha 状态的 FeatureGates 在组件启动时默认关闭,处于 Beta 和 GA 状态的默认开启。这里主要是解析配置文件,通过配置文件中的配置项决定是不是开启或者关闭某些功能。
2、校验 kubelet 的参数;
3、尝试获取 kubelet 的 lock file,可以在 kubelet 启动时指定,在启动时,kubelet 会尝试创建或获取锁文件,确保自己是当前节点上唯一管理容器生命周期的 kubelet 实例。如果另一个 kubelet 进程已经存在并持有该锁文件,则新启动的 kubelet 实例通常会结束启动过程,因为它无法获得对节点资源的独占访问权。总结来说,kubelet 的锁文件(s.LockFilePath 所指的锁文件)用于保障节点上只运行一个 kubelet 实例,以防止潜在的资源冲突
4、将当前的配置文件注册到 http server /configz URL 中;
5、检查 kubelet 启动模式是否为 standalone 模式,此模式下不会和 apiserver 交互,主要用于 kubelet 的调试;
6、初始化 kubeDeps,kubeDeps 中包含 kubelet 的一些依赖,主要有 KubeClient、EventClient、HeartbeatClient、Auth、cadvisor、ContainerManager;
7、检查是否以 root 用户启动;
8、为进程设置 oom 分数,默认为 -999,分数范围为 [-1000, 1000],越小越不容易被 kill 掉;
9、调用 RunKubelet 方法;
10、检查 kubelet 是否启动了动态配置功能;
11、启动 Healthz http server;
12、如果使用 systemd 启动,通知 systemd kubelet 已经启动;
server.go
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
// Set global feature gates based on the value on the initial KubeletServer
// 从配置项中解析kubelet的各个组件的开关配置信息,忽略
......
// Register current configuration with /configz endpoint
// 这个我没验证成功,似乎1.18版本后的这个configz端点不存在了,/configz是kubelet的静态api提供的kubulet配置信息的查询接口
err = initConfigz(&s.KubeletConfiguration)
if err != nil {
klog.ErrorS(err, "Failed to register kubelet configuration with configz")
}
if len(s.ShowHiddenMetricsForVersion) > 0 {
metrics.SetShowHidden()
}
// About to get clients and such, detect standaloneMode
// kubelet 的 standaloneMode 是一个配置选项,用于将 kubelet 配置为独立模式。在独立模式下,kubelet 不会连接到 Kubernetes 控制平面,而是以独立的方式运行,并根据本地配置文件和参数来管理容器。
// 使用 kubelet 的 standaloneMode 可以将节点配置为脱离 Kubernetes 集群的独立容器运行环境。这在一些特定场景下可能会有用,例如在测试环境中快速部署容器应用,或者在不需要完整 Kubernetes 功能的情况下运行单个容器
standaloneMode := true
if len(s.KubeConfig) > 0 {
standaloneMode = false
}
if kubeDeps == nil {
kubeDeps, err = UnsecuredDependencies(s, featureGate)
if err != nil {
return err
}
}
//主要用于实例化云厂商提供的接口服务
if kubeDeps.Cloud == nil {
if !cloudprovider.IsExternal(s.CloudProvider) {
cloudprovider.DeprecationWarningForProvider(s.CloudProvider)
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return err
}
if cloud != nil {
klog.V(2).InfoS("Successfully initialized cloud provider", "cloudProvider", s.CloudProvider, "cloudConfigFile", s.CloudConfigFile)
}
kubeDeps.Cloud = cloud
}
}
// 这里就是之前提到的覆盖主机名的实现入口,可以通过HostnameOverride来覆盖实际的主机名
hostName, err := nodeutil.GetHostname(s.HostnameOverride)
if err != nil {
return err
}
nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
if err != nil {
return err
}
// if in standalone mode, indicate as much by setting all clients to nil
// kubelet的standloneMode
switch {
case standaloneMode:
//手动清空client
kubeDeps.KubeClient = nil
kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil
klog.InfoS("Standalone mode, no API client")
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
// 初始化集群的客户端连接
clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, nodeName)
if err != nil {
return err
}
if onHeartbeatFailure == nil {
return errors.New("onHeartbeatFailure must be a valid function other than nil")
}
kubeDeps.OnHeartbeatFailure = onHeartbeatFailure
kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet client: %w", err)
}
// make a separate client for events
eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst)
kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet event client: %w", err)
}
// make a separate client for heartbeat with throttling disabled and a timeout attached
heartbeatClientConfig := *clientConfig
heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
// The timeout is the minimum of the lease duration and status update frequency
leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
if heartbeatClientConfig.Timeout > leaseTimeout {
heartbeatClientConfig.Timeout = leaseTimeout
}
heartbeatClientConfig.QPS = float32(-1)
kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet heartbeat client: %w", err)
}
}
if kubeDeps.Auth == nil {
auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
if err != nil {
return err
}
kubeDeps.Auth = auth
runAuthenticatorCAReload(ctx.Done())
}
var cgroupRoots []string
// 这里return的是""
nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
// 这里return的是""
kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
if err != nil {
klog.InfoS("Failed to get the kubelet's cgroup. Kubelet system container metrics may be missing.", "err", err)
} else if kubeletCgroup != "" {
cgroupRoots = append(cgroupRoots, kubeletCgroup)
}
// 这里return的是""
runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
if err != nil {
klog.InfoS("Failed to get the container runtime's cgroup. Runtime system container metrics may be missing.", "err", err)
} else if runtimeCgroup != "" {
// RuntimeCgroups is optional, so ignore if it isn't specified
cgroupRoots = append(cgroupRoots, runtimeCgroup)
}
if s.SystemCgroups != "" {
// SystemCgroups is optional, so ignore if it isn't specified
cgroupRoots = append(cgroupRoots, s.SystemCgroups)
}
if kubeDeps.CAdvisorInterface == nil {
//cadvisor初始化
imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
if err != nil {
return err
}
}
// Setup event recorder if required.
// 初始化事件发送器
makeEventRecorder(kubeDeps, nodeName)
if kubeDeps.ContainerManager == nil {
if s.CgroupsPerQOS && s.CgroupRoot == "" {
klog.InfoS("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
s.CgroupRoot = "/"
}
machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
if err != nil {
return err
}
reservedSystemCPUs, err := getReservedCPUs(machineInfo, s.ReservedSystemCPUs)
if err != nil {
return err
}
if reservedSystemCPUs.Size() > 0 {
// at cmd option validation phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
klog.InfoS("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
if s.KubeReserved != nil {
delete(s.KubeReserved, "cpu")
}
if s.SystemReserved == nil {
s.SystemReserved = make(map[string]string)
}
s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
klog.InfoS("After cpu setting is overwritten", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
}
kubeReserved, err := parseResourceList(s.KubeReserved)
if err != nil {
return err
}
systemReserved, err := parseResourceList(s.SystemReserved)
if err != nil {
return err
}
var hardEvictionThresholds []evictionapi.Threshold
// If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
if err != nil {
return err
}
}
experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
if err != nil {
return err
}
devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
var cpuManagerPolicyOptions map[string]string
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) {
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) {
cpuManagerPolicyOptions = s.CPUManagerPolicyOptions
} else if s.CPUManagerPolicyOptions != nil {
return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled",
s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions)
}
}
//初始化containManage
kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
cm.NodeConfig{
RuntimeCgroupsName: s.RuntimeCgroups,
SystemCgroupsName: s.SystemCgroups,
KubeletCgroupsName: s.KubeletCgroups,
ContainerRuntime: s.ContainerRuntime,
CgroupsPerQOS: s.CgroupsPerQOS,
CgroupRoot: s.CgroupRoot,
CgroupDriver: s.CgroupDriver,
KubeletRootDir: s.RootDirectory,
ProtectKernelDefaults: s.ProtectKernelDefaults,
NodeAllocatableConfig: cm.NodeAllocatableConfig{
KubeReservedCgroupName: s.KubeReservedCgroup,
SystemReservedCgroupName: s.SystemReservedCgroup,
EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
KubeReserved: kubeReserved,
SystemReserved: systemReserved,
ReservedSystemCPUs: reservedSystemCPUs,
HardEvictionThresholds: hardEvictionThresholds,
},
QOSReserved: *experimentalQOSReserved,
ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
ExperimentalCPUManagerPolicyOptions: cpuManagerPolicyOptions,
ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
ExperimentalMemoryManagerPolicy: s.MemoryManagerPolicy,
ExperimentalMemoryManagerReservedMemory: s.ReservedMemory,
ExperimentalPodPidsLimit: s.PodPidsLimit,
EnforceCPULimits: s.CPUCFSQuota,
CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy,
ExperimentalTopologyManagerScope: s.TopologyManagerScope,
},
s.FailSwapOn,
devicePluginEnabled,
kubeDeps.Recorder)
if err != nil {
return err
}
}
utilruntime.ReallyCrash = s.ReallyCrashForTesting
// TODO(vmarmol): Do this through container config.
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
}
// 在kubelet启动之前,初始化一些依赖服务,这里启动了docker shim的service
err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
kubeDeps, &s.ContainerRuntimeOptions,
s.ContainerRuntime,
s.RuntimeCgroups,
s.RemoteRuntimeEndpoint,
s.RemoteImageEndpoint,
s.NonMasqueradeCIDR)
if err != nil {
return err
}
// 看起来kubelet的核心启动逻辑是在这里
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
// If the kubelet config controller is available, and dynamic config is enabled, start the config and status sync loops
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
return err
}
}
if s.HealthzPort > 0 {
//启动kubelet的健康检查web服务
mux := http.NewServeMux()
healthz.InstallHandler(mux)
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
if err != nil {
klog.ErrorS(err, "Failed to start healthz server")
}
}, 5*time.Second, wait.NeverStop)
}
if s.RunOnce {
return nil
}
// If systemd is used, notify it that we have started
go daemon.SdNotify(false, "READY=1")
select {
case <-done:
break
case <-ctx.Done():
break
}
return nil
}
继续追踪RunKubelet的函数逻辑。该函数主要做了两件事:
// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
//
// 1 Integration tests
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
//
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
.......
// 1、调用 createAndInitKubelet,创建并且实例化一个kubelet结构
k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
kubeDeps,
&kubeServer.ContainerRuntimeOptions,
kubeServer.ContainerRuntime,
hostname,
hostnameOverridden,
nodeName,
nodeIPs,
kubeServer.ProviderID,
kubeServer.CloudProvider,
kubeServer.CertDirectory,
kubeServer.RootDirectory,
kubeServer.ImageCredentialProviderConfigFile,
kubeServer.ImageCredentialProviderBinDir,
kubeServer.RegisterNode,
kubeServer.RegisterWithTaints,
kubeServer.AllowedUnsafeSysctls,
kubeServer.ExperimentalMounterPath,
kubeServer.KernelMemcgNotification,
kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
kubeServer.MinimumGCAge,
kubeServer.MaxPerPodContainerCount,
kubeServer.MaxContainerCount,
kubeServer.MasterServiceNamespace,
kubeServer.RegisterSchedulable,
kubeServer.KeepTerminatedPodVolumes,
kubeServer.NodeLabels,
kubeServer.NodeStatusMaxImages,
kubeServer.KubeletFlags.SeccompDefault || kubeServer.KubeletConfiguration.SeccompDefault,
)
.......
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %w", err)
}
klog.InfoS("Started kubelet as runonce")
} else {
// 2.启动kubelet
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
klog.InfoS("Started kubelet")
}
return nil
}
追踪到此处,kubelet初始化和启动的所有执行步骤已经全部准备完毕,我们似乎已经追踪到kubelet的核心初始化逻辑和启动逻辑。
该步骤有两个核心函数,分别是createAndInitKubelet和startKubelet,从名称可以看出这两个函数应该是一个初始化kubelet,一个是启动kubelet。
首先,将视角转移到createAndInitKubelet函数。该函数做了三件事,第一件事是初始化一个kubelet对象,第二件事是把kubelet初始化事件发送到apiserver,第三件事是启动垃圾回收服务,回收 container 和 images。
func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *kubelet.Dependencies,
crOptions *config.ContainerRuntimeOptions,
containerRuntime string,
hostname string,
hostnameOverridden bool,
nodeName types.NodeName,
nodeIPs []net.IP,
providerID string,
cloudProvider string,
certDirectory string,
rootDirectory string,
imageCredentialProviderConfigFile string,
imageCredentialProviderBinDir string,
registerNode bool,
registerWithTaints []v1.Taint,
allowedUnsafeSysctls []string,
experimentalMounterPath string,
kernelMemcgNotification bool,
experimentalCheckNodeCapabilitiesBeforeMount bool,
experimentalNodeAllocatableIgnoreEvictionThreshold bool,
minimumGCAge metav1.Duration,
maxPerPodContainerCount int32,
maxContainerCount int32,
masterServiceNamespace string,
registerSchedulable bool,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
nodeStatusMaxImages int32,
seccompDefault bool,
) (k kubelet.Bootstrap, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
// 1.实例化 kubelet 对象,并对 kubelet 依赖的所有模块进行初始化;
k, err = kubelet.NewMainKubelet(kubeCfg,
kubeDeps,
crOptions,
containerRuntime,
hostname,
hostnameOverridden,
nodeName,
nodeIPs,
providerID,
cloudProvider,
certDirectory,
rootDirectory,
imageCredentialProviderConfigFile,
imageCredentialProviderBinDir,
registerNode,
registerWithTaints,
allowedUnsafeSysctls,
experimentalMounterPath,
kernelMemcgNotification,
experimentalCheckNodeCapabilitiesBeforeMount,
experimentalNodeAllocatableIgnoreEvictionThreshold,
minimumGCAge,
maxPerPodContainerCount,
maxContainerCount,
masterServiceNamespace,
registerSchedulable,
keepTerminatedPodVolumes,
nodeLabels,
nodeStatusMaxImages,
seccompDefault,
)
if err != nil {
return nil, err
}
//2.向apiserver发送一条kubelet启动了的event事件
k.BirthCry()
//3.启动垃圾回收服务,回收 container 和 images;
k.StartGarbageCollection()
return k, nil
}
NewMainKubelet实例化一个新的 Kubelet 对象以及所有必需的内部模块。
1、初始化 PodConfig 即监听 pod 元数据的来源(file,http,apiserver),将不同 source 的 pod configuration 合并到一个结构中;
2、初始化 containerGCPolicy、imageGCPolicy、evictionConfig 配置;
3、启动 serviceInformer 和 nodeInformer;
4、初始化 containerRefManager、oomWatcher;
5、初始化 kubelet 对象;
6、初始化 secretManager、configMapManager;
7、初始化 livenessManager、podManager、statusManager、resourceAnalyzer;
8、调用 kuberuntime.NewKubeGenericRuntimeManager 初始化 containerRuntime;
9、初始化 pleg;
10、初始化 containerGC、containerDeletor、imageManager、containerLogManager;
11、初始化 serverCertificateManager、probeManager、tokenManager、volumePluginMgr、pluginManager、volumeManager;
12、初始化 workQueue、podWorkers、evictionManager;
13、最后注册相关模块的 handler;
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeDeps *Dependencies,
crOptions *config.ContainerRuntimeOptions,
containerRuntime string,
hostname string,
hostnameOverridden bool,
nodeName types.NodeName,
nodeIPs []net.IP,
providerID string,
cloudProvider string,
certDirectory string,
rootDirectory string,
imageCredentialProviderConfigFile string,
imageCredentialProviderBinDir string,
registerNode bool,
registerWithTaints []v1.Taint,
allowedUnsafeSysctls []string,
experimentalMounterPath string,
kernelMemcgNotification bool,
experimentalCheckNodeCapabilitiesBeforeMount bool,
experimentalNodeAllocatableIgnoreEvictionThreshold bool,
minimumGCAge metav1.Duration,
maxPerPodContainerCount int32,
maxContainerCount int32,
masterServiceNamespace string,
registerSchedulable bool,
keepTerminatedPodVolumes bool,
nodeLabels map[string]string,
nodeStatusMaxImages int32,
seccompDefault bool,
) (*Kubelet, error) {
......
var nodeHasSynced cache.InformerSynced
var nodeLister corelisters.NodeLister
// If kubeClient == nil, we are running in standalone mode (i.e. no API servers)
// If not nil, we are running as part of a cluster and should sync w/API
// 1.初始化informer
if kubeDeps.KubeClient != nil {
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.Set{metav1.ObjectNameField: string(nodeName)}.String()
}))
nodeLister = kubeInformers.Core().V1().Nodes().Lister()
nodeHasSynced = func() bool {
return kubeInformers.Core().V1().Nodes().Informer().HasSynced()
}
kubeInformers.Start(wait.NeverStop)
klog.InfoS("Attempting to sync node with API server")
} else {
// we don't have a client to sync!
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
nodeLister = corelisters.NewNodeLister(nodeIndexer)
nodeHasSynced = func() bool { return true }
klog.InfoS("Kubelet is running in standalone mode, will skip API server sync")
}
// kubeDeps的PodConfig
if kubeDeps.PodConfig == nil {
var err error
// 2.初始化 PodConfig 即监听 pod 元数据的来源(file,http,apiserver),将不同 source 的 pod configuration 合并到一个结构中;
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced)
if err != nil {
return nil, err
}
}
containerGCPolicy := kubecontainer.GCPolicy{
MinAge: minimumGCAge.Duration,
MaxPerPodContainer: int(maxPerPodContainerCount),
MaxContainers: int(maxContainerCount),
}
daemonEndpoints := &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
}
imageGCPolicy := images.ImageGCPolicy{
MinAge: kubeCfg.ImageMinimumGCAge.Duration,
HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
}
enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
if experimentalNodeAllocatableIgnoreEvictionThreshold {
// Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions
enforceNodeAllocatable = []string{}
}
thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
if err != nil {
return nil, err
}
evictionConfig := eviction.Config{
PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
KernelMemcgNotification: kernelMemcgNotification,
PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(),
}
// informer启动serviceInformer和nodeInformer
.....
klet := &Kubelet{
hostname: hostname,
hostnameOverridden: hostnameOverridden,
nodeName: nodeName,
kubeClient: kubeDeps.KubeClient,
heartbeatClient: kubeDeps.HeartbeatClient,
onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure,
rootDirectory: rootDirectory,
resyncInterval: kubeCfg.SyncFrequency.Duration,
sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
registerNode: registerNode,
registerWithTaints: registerWithTaints,
registerSchedulable: registerSchedulable,
dnsConfigurer: dns.NewConfigurer(kubeDeps.Recorder, nodeRef, nodeIPs, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
serviceLister: serviceLister,
serviceHasSynced: serviceHasSynced,
nodeLister: nodeLister,
nodeHasSynced: nodeHasSynced,
masterServiceNamespace: masterServiceNamespace,
streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
recorder: kubeDeps.Recorder,
cadvisor: kubeDeps.CAdvisorInterface,
cloud: kubeDeps.Cloud,
externalCloudProvider: cloudprovider.IsExternal(cloudProvider),
providerID: providerID,
nodeRef: nodeRef,
nodeLabels: nodeLabels,
nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
nodeStatusReportFrequency: kubeCfg.NodeStatusReportFrequency.Duration,
os: kubeDeps.OSInterface,
oomWatcher: oomWatcher,
cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
cgroupRoot: kubeCfg.CgroupRoot,
mounter: kubeDeps.Mounter,
hostutil: kubeDeps.HostUtil,
subpather: kubeDeps.Subpather,
maxPods: int(kubeCfg.MaxPods),
podsPerCore: int(kubeCfg.PodsPerCore),
syncLoopMonitor: atomic.Value{},
daemonEndpoints: daemonEndpoints,
containerManager: kubeDeps.ContainerManager,
containerRuntimeName: containerRuntime,
nodeIPs: nodeIPs,
nodeIPValidator: validateNodeIP,
clock: clock.RealClock{},
enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit),
iptablesDropBit: int(kubeCfg.IPTablesDropBit),
experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
nodeStatusMaxImages: nodeStatusMaxImages,
lastContainerStartedTime: newTimeCache(),
}
// 初始化各种manager组件
.....
//初始化kubelet的Pod worker
klet.podWorkers = newPodWorkers(
klet.syncPod,
klet.syncTerminatingPod,
klet.syncTerminatedPod,
kubeDeps.Recorder,
klet.workQueue,
klet.resyncInterval,
backOffPeriod,
klet.podCache,
)
// 初始化容器运行时接口
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
klet.readinessManager,
klet.startupManager,
rootDirectory,
machineInfo,
klet.podWorkers,
kubeDeps.OSInterface,
klet,
httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
imageCredentialProviderConfigFile,
imageCredentialProviderBinDir,
kubeCfg.CPUCFSQuota,
kubeCfg.CPUCFSQuotaPeriod,
kubeDeps.RemoteRuntimeService,
kubeDeps.RemoteImageService,
kubeDeps.ContainerManager.InternalContainerLifecycle(),
kubeDeps.dockerLegacyService,
klet.containerLogManager,
klet.runtimeClassManager,
seccompDefault,
kubeCfg.MemorySwap.SwapBehavior,
kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
*kubeCfg.MemoryThrottlingFactor,
)
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
klet.streamingRuntime = runtime
klet.runner = runtime
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
return nil, err
}
klet.runtimeCache = runtimeCache
// common provider to get host file system usage associated with a pod managed by kubelet
hostStatsProvider := stats.NewHostStatsProvider(kubecontainer.RealOS{}, func(podUID types.UID) (string, bool) {
return getEtcHostsPath(klet.getPodDir(podUID)), klet.containerRuntime.SupportsSingleFileMapping()
})
if kubeDeps.useLegacyCadvisorStats {
klet.StatsProvider = stats.NewCadvisorStatsProvider(
klet.cadvisor,
klet.resourceAnalyzer,
klet.podManager,
klet.runtimeCache,
klet.containerRuntime,
klet.statusManager,
hostStatsProvider)
} else {
klet.StatsProvider = stats.NewCRIStatsProvider(
klet.cadvisor,
klet.resourceAnalyzer,
klet.podManager,
klet.runtimeCache,
kubeDeps.RemoteRuntimeService,
kubeDeps.RemoteImageService,
hostStatsProvider,
utilfeature.DefaultFeatureGate.Enabled(features.DisableAcceleratorUsageMetrics),
utilfeature.DefaultFeatureGate.Enabled(features.PodAndContainerStatsFromCRI))
}
//9、初始化 pleg,pleg
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
klog.ErrorS(err, "Pod CIDR update failed")
}
// setup containerGC
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
if err != nil {
return nil, err
}
klet.containerGC = containerGC
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
// setup imageManager
imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
klet.imageManager = imageManager
// 初始化各种plugin和manager
....
return klet, nil
}
上文中的一个字段值得我们关注,还记得我们在kubelet源码分析(一)中提到的,kubelet接收三种形式的PodSpec输入来进行Pod的各种操作,一个是file形式、一种是apiserver、一种是来自云端厂商提供的服务service。我们将目光聚焦于kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced),来看看kubeDeps.PodConfig究竟怎么获取Pod的描述信息,解析Pod的描述信息,并且以事件的方式交给kubelet的某一个worker去逐渐处理的。
PodConfig 是一个配置多路复用器,它将许多 Pod 配置源合并成一个单一的一致结构,然后按顺序向监听器传递增量变更通知。入口函数的位置在pkg/kubelet/server.go,kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced)。代码详情如下:
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if len(kubeCfg.StaticPodURLHeader) > 0 {
for k, v := range kubeCfg.StaticPodURLHeader {
for i := range v {
manifestURLHeader.Add(k, v[i])
}
}
}
// 1.初始化PodConfg结构体,
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
// define file config source
if kubeCfg.StaticPodPath != "" {
klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
// 2.注册file数据来源
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
// define url config source
if kubeCfg.StaticPodURL != "" {
klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
// 2.注册URL数据来源
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
if kubeDeps.KubeClient != nil {
klog.InfoS("Adding apiserver pod source")
// 3.注册ApiServer的数据来源
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource))
}
return cfg, nil
}
cfg := config.NewPodConfig是PodConfig的结构初始化函数,下面的三个函数看起来就有亲缘关系,
func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
path = strings.TrimRight(path, string(os.PathSeparator))
// 1.初始化结构体
config := newSourceFile(path, nodeName, period, updates)
klog.V(1).InfoS("Watching path", "path", path)
//2. 数据源初始化
config.run()
}
func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
// 1.该函数暂时不知道调用逻辑,但是看起来是将配置文件解析成的数组类型数据,包装成kubetypes.PodUpdate,并将发送到指定channel
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
}
// 2.该结构体不知道具体作用和调用入口,可能是在run方法中有具体调用
store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
return &sourceFile{
path: path,
nodeName: nodeName,
period: period,
store: store,
fileKeyMapping: map[string]string{},
updates: updates,
watchEvents: make(chan *watchEvent, eventBufferLen),
}
}
回到config.run方法,关键的实现方法有s.listConfig(),s.consumeWatchEvent(e)和 s.startWatch(),在这个函数主要启动了一个定时任务去不断监听配置文件的变化,从而更新从File来源处获得的Pod描述信息。
func (s *sourceFile) run() {
// 这是一种golang的定时任务执行实现方式
listTicker := time.NewTicker(s.period)
go func() {
// Read path immediately to speed up startup.
if err := s.listConfig(); err != nil {
klog.ErrorS(err, "Unable to read config path", "path", s.path)
}
for {
select {
case <-listTicker.C:
if err := s.listConfig(); err != nil {
klog.ErrorS(err, "Unable to read config path", "path", s.path)
}
case e := <-s.watchEvents:
//这一步是无意义的代码,未实现
if err := s.consumeWatchEvent(e); err != nil {
klog.ErrorS(err, "Unable to process watch event")
}
}
}
}()
//这一步也是无意义代码,未实现
s.startWatch()
}
继续回到s.listConfig()方法
func (s *sourceFile) listConfig() error {
// 这里的path是staticpod的文件路径
path := s.path
statInfo, err := os.Stat(path)
if err != nil {
if !os.IsNotExist(err) {
return err
}
// Emit an update with an empty PodList to allow FileSource to be marked as seen
// 这里可能要等到update的channel消费的时候才可以看到,好像是空传了一个消息
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
return fmt.Errorf("path does not exist, ignoring")
}
switch {
// path是文件夹的处理逻辑在这里
case statInfo.Mode().IsDir():
// s.extractFromDir输出的结果就是[]*v1.Pod,盲猜这里就是将文件解析成pod struct结构体的地方
pods, err := s.extractFromDir(path)
if err != nil {
return err
}
if len(pods) == 0 {
// Emit an update with an empty PodList to allow FileSource to be marked as seen
s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
return nil
}
// 核心处理逻辑
return s.replaceStore(pods...)
// path是常规文件的处理逻辑在这里
case statInfo.Mode().IsRegular():
pod, err := s.extractFromFile(path)
if err != nil {
return err
}
return s.replaceStore(pod)
default:
return fmt.Errorf("path is not a directory or file")
}
}
func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
klog.V(3).InfoS("Reading config file", "path", filename)
defer func() {
if err == nil && pod != nil {
objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
if keyErr != nil {
err = keyErr
return
}
// 这个结构体是一个pod文件对应pod名称的缓存
s.fileKeyMapping[filename] = objKey
}
}()
file, err := os.Open(filename)
if err != nil {
return pod, err
}
defer file.Close()
//读取文件内容
data, err := utilio.ReadAtMost(file, maxConfigLength)
if err != nil {
return pod, err
}
defaultFn := func(pod *api.Pod) error {
return s.applyDefaults(pod, filename)
}
// 解析pod文件
parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn)
if parsed {
if podErr != nil {
return pod, podErr
}
return pod, nil
}
return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file", filename, podErr)
}
tryDecodeSinglePod是单个pod的解析步骤,
// tryDecodeSinglePod takes data and tries to extract valid Pod config information from it.
func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod *v1.Pod, err error) {
// JSON is valid YAML, so this should work for everything.
// 将文件内容解析成json结构体
json, err := utilyaml.ToJSON(data)
if err != nil {
return false, nil, err
}
// 将json结构体解析成runtime.Object
obj, err := runtime.Decode(legacyscheme.Codecs.UniversalDecoder(), json)
if err != nil {
return false, pod, err
}
newPod, ok := obj.(*api.Pod)
// Check whether the object could be converted to single pod.
if !ok {
return false, pod, fmt.Errorf("invalid pod: %#v", obj)
}
// Apply default values and validate the pod.
//为pod结构体添加status等字段的默认值
if err = defaultFn(newPod); err != nil {
return true, pod, err
}
if errs := validation.ValidatePodCreate(newPod, validation.PodValidationOptions{}); len(errs) > 0 {
return true, pod, fmt.Errorf("invalid pod: %v", errs)
}
v1Pod := &v1.Pod{}
if err := k8s_api_v1.Convert_core_Pod_To_v1_Pod(newPod, v1Pod, nil); err != nil {
klog.ErrorS(err, "Pod failed to convert to v1", "pod", klog.KObj(newPod))
return true, nil, err
}
return true, v1Pod, nil
}
至此,从文件中解析成一个v1.Pod{}的结构体的步骤完毕了,我们在回到s.listConfig()方法中,如果一切正常,最终对于解析出的所有pod将落入s.replaceStore(pods…)这个方法中执行相关逻辑。这一步骤可以填上store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc),这个未知的结构体的坑上来。
我们首先回顾一下这个Replace函数的具体实现:
// Replace will delete the contents of current store, using instead the given list.
// 'u' takes ownership of the list, you should not reference the list again
// after calling this function.
// The new contents complete state will be sent by calling PushFunc after replacement.
func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error {
if err := u.Store.Replace(list, resourceVersion); err != nil {
return err
}
u.PushFunc(u.Store.List())
return nil
}
这个函数执行了两个函数逻辑,其一是u.Store.Replace(list, resourceVersion),其二是u.PushFunc(u.Store.List()),首先第一个u.Store是一个线程安全的map结构,主要是用来更新pod缓存数据,pushFunction的初始化逻辑如下,这里主要是将所有的解析出的Pod数据包装成kubetypes.PodUpdate结构体发送到update的channel中:
// cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
}
我们来追踪一下updates是在哪里被传递进来的,cfg.Channel函数中传递了一个channel到sourceFile结构体中。
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
我们沿着cfg.Channel(kubetypes.FileSource)这个方法追踪update channel的生成逻辑。
//就是这里
if kubeCfg.StaticPodPath != "" {
klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
// 在source中插入记录,这里的source是一个map,用来根据消息来源渠道来去重
c.sources.Insert(source)
// 返回channel通道
return c.mux.Channel(source)
}
在追踪Channel这个方法之前,我们先看一下mux的具体实现,mux的初始化步骤在NewPodConfig,podConfig初始化的时候实现的,我们将这几段关联的代码联合在一起来琢磨琢磨。Mux中有两个字段,其一是Merger,这个字段主要声明了多数据源合并数据的具体实现细节,其二是source,这个是channel通道缓存结构体。
// Mux is a class for merging configuration from multiple sources. Changes are
// pushed via channels and sent to the merge function.
type Mux struct {
// Invoked when an update is sent to a source.
// 多路复用合并方式
merger Merger
// Sources and their lock.
sourceLock sync.RWMutex
// Maps source names to channels
// channel的缓存器
sources map[string]chan interface{}
}
func NewMux(merger Merger) *Mux {
mux := &Mux{
sources: make(map[string]chan interface{}),
merger: merger,
}
return mux
}
// 初始化步骤在podConfig结构体初始化过程中
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
updates := make(chan kubetypes.PodUpdate, 50)
storage := newPodStorage(updates, mode, recorder)
podConfig := &PodConfig{
pods: storage,
mux: config.NewMux(storage),
updates: updates,
sources: sets.String{},
}
return podConfig
}
函数 c.mux.Channel(source)做的事情很简单,就是返回Mux的source中缓存的channel通道,如果source对应的channel不存在就创建
func (m *Mux) Channel(source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
m.sourceLock.Lock()
defer m.sourceLock.Unlock()
channel, exists := m.sources[source]
if exists {
return channel
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}
函数中的 go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)值得注意,似乎在返回channel通道的同时,还启动了一个协程来持续监听这个通道内的数据信息,并做了处理步骤。
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
// 这里的merge在podCofig的初始化的时候被创建
m.merger.Merge(source, update)
}
}
channel通道中信息有一个算一个都落入了m.merger.Merge的方法中进行处理,继续追踪Merge的方法的实现上来,其中merge的实现类为podStorage这个结构体,我们顺藤摸瓜,看看Merge函数是怎么实现的
func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
//这里的sourcesSeen是一个set结构体
seenBefore := s.sourcesSeen.Has(source)
// 对pod的变动数据进行归类,分流
adds, updates, deletes, removes, reconciles := s.merge(source, change)
// 判断是不是首次创建
firstSet := !seenBefore && s.sourcesSeen.Has(source)
// deliver update notifications
switch s.mode {
// 根据分流的结果,将podUpdate事件发送到updatechannel中
case PodConfigNotificationIncremental:
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
// the source is ready.
s.updates <- *adds
}
// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
if len(reconciles.Pods) > 0 {
s.updates <- *reconciles
}
case PodConfigNotificationSnapshotAndUpdates:
if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
}
case PodConfigNotificationUnknown:
fallthrough
default:
panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
}
return nil
}
上面提到了s.merge函数是用来对channel中的pod变更进行分流的函数,我们现在看看这个函数是在做什么事情:
type PodUpdate struct {
// pods
Pods []*v1.Pod
// 执行的操作
Op PodOperation
// 数据来源
Source string
}
其中,PodOperation定义了kubelet处理Pod时的不同操作类型。每种操作类型表示在Pod生命周期中的不同状态或事件。以下是每个操作类型的简要解释:
SET: 设置当前源的Pod配置。表示kubelet收到了Pod配置列表的完整更新事件,通常意味着kubelet收到了一组来自特定源(例如API服务器)的Pod信息。
ADD: 表示新增的Pod。这个操作类型意味着在此源新增了一个Pod,kubelet需要将该Pod添加到其内部状态和调度队列中。
DELETE: 表示从此源优雅地删除了Pod。这意味着该Pod已被删除,kubelet需要在完成任何必要的清理工作之后停止容器并删除该Pod。
REMOVE: 表示从此源移除了Pod。在某些情况下,可能不希望等待Pod的优雅删除操作(例如,删除操作耗时过长),这时会发出该操作类型,kubelet将尽快停止容器并删除Pod。
UPDATE: 表示此源中的Pod已更新。这意味着kubelet需要检查Pod配置的变更,并根据新配置更新内部状态和调度队列。
RECONCILE: 表示此源中的Pod有意外状态,kubelet需要与该源对Pod的状态进行协调。这个操作类型通常用于当kubelet检测到Pod状态与预期值不一致或状态变化时(例如,资源使用情况、健康状态等)。
这些操作类型主要用于在kubelet内部管理Pod生命周期事件,以确保kubelet根据当前Pod配置和状态做出正确的操作。
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
//这里的change是从channel中读取的事件
s.podLock.Lock()
defer s.podLock.Unlock()
// 记录器,记录从channel获取的pod变更记录的归属,分成了5个桶
addPods := []*v1.Pod{}
updatePods := []*v1.Pod{}
deletePods := []*v1.Pod{}
removePods := []*v1.Pod{}
reconcilePods := []*v1.Pod{}
// s.pods缓存了不同渠道pod数据信息
pods := s.pods[source]
if pods == nil {
pods = make(map[types.UID]*v1.Pod)
}
// updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*.
// After updated, new pod will be stored in the pod cache *pods*.
// Notice that *pods* and *oldPods* could be the same cache.
updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
// 1)对pod数据进行去重
filtered := filterInvalidPods(newPods, source, s.recorder)
//2)对上面经过筛选的pod数据进行整理,根据checkAndUpdatePod函数确定不同的pod数据变更对应的分类归属
for _, ref := range filtered {
// Annotate the pod with the source before any comparison.
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
if existing, found := oldPods[ref.UID]; found {
pods[ref.UID] = existing
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate {
updatePods = append(updatePods, existing)
} else if needReconcile {
reconcilePods = append(reconcilePods, existing)
} else if needGracefulDelete {
deletePods = append(deletePods, existing)
}
continue
}
recordFirstSeenTime(ref)
pods[ref.UID] = ref
addPods = append(addPods, ref)
}
}
update := change.(kubetypes.PodUpdate)
switch update.Op {
case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
if update.Op == kubetypes.ADD {
klog.V(4).InfoS("Adding new pods from source", "source", source, "pods", klog.KObjs(update.Pods))
} else if update.Op == kubetypes.DELETE {
klog.V(4).InfoS("Gracefully deleting pods from source", "source", source, "pods", klog.KObjs(update.Pods))
} else {
klog.V(4).InfoS("Updating pods from source", "source", source, "pods", klog.KObjs(update.Pods))
}
updatePodsFunc(update.Pods, pods, pods)
case kubetypes.REMOVE:
klog.V(4).InfoS("Removing pods from source", "source", source, "pods", klog.KObjs(update.Pods))
for _, value := range update.Pods {
if existing, found := pods[value.UID]; found {
// this is a delete
delete(pods, value.UID)
removePods = append(removePods, existing)
continue
}
// this is a no-op
}
case kubetypes.SET:
klog.V(4).InfoS("Setting pods for source", "source", source)
// 标识source来源,重置pod数据源中的事件缓存
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[types.UID]*v1.Pod)
updatePodsFunc(update.Pods, oldPods, pods)
for uid, existing := range oldPods {
if _, found := pods[uid]; !found {
// this is a delete
removePods = append(removePods, existing)
}
}
default:
klog.InfoS("Received invalid update type", "type", update)
}
s.pods[source] = pods
adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}
return adds, updates, deletes, removes, reconciles
}
至此,我们大概已经梳理出了config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))这段代码的含义。
这段代码是一个典型的生产者消费者模式,pod数据的来源有三个分别是文件File,Apiserver、Url,三者各自启动了一个channel通道,并且将数据源的pod数据序列化成v1.Pod{}结构,并且推送到channel通道中。channel通道在创建的时候,启动了一个协程去消费通道中的数据,并将数据进行去重过滤后,修正每一个pod的真实操作标识,归类成不同的操作类型数据,并发送到公共的事件处理channel通道。
分析到这里,似乎已经走完了一调链路,但是我们还是不知道,当pod的数据汇集到update channel后,kubelet是怎么处理消息通道中的数据的。现在,我们可以将目光回到RunKubelet函数了,这个函数中有一个startKubelet函数现在没有分析。
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
// 这里是核心逻辑入口,这里的Updates()函数返回的就是update的channel通道
go k.Run(podCfg.Updates())
// start the kubelet server
if enableServer {
go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth)
}
// readOnlyApi 入口
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}
看起来核心的逻辑落在了Run函数中,左饶右绕的看起来着实很痛苦。在这个方法中启动了很多的协程去定时执行一些逻辑
go kl.cloudResourceSyncManager.Run(wait.NeverStop):云端资源的同步?
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop):启动volumeManage
go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop):定时向apiserver同步node节点状态信息
go kl.nodeLeaseController.Run(wait.NeverStop):启动NodeLease机制,kubelet 使用一种称为 “NodeLease” 的机制来向控制平面报告节点的健康状态和可用性
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop):定期更新runtime运行时状态。
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
//1.创建logserver服务器
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
klog.InfoS("No API server defined - no node status update will be sent")
}
// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
// 3、调用 kl.initializeModules 首先启动不依赖 container runtime 的一些模块
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.ErrorS(err, "Failed to initialize internal modules")
os.Exit(1)
}
// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Introduce some small jittering to ensure that over time the requests won't start
// accumulating at approximately the same time from the set of nodes due to priority and
// fairness effect.
// 5、执行 kl.syncNodeStatus 定时同步 Node 状态
go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
go kl.fastStatusUpdateOnce()
// start syncing lease
//启用 NodeLease 机制
go kl.nodeLeaseController.Run(wait.NeverStop)
}
//6、定期更新runtime状态
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Set up iptables util rules
// 9、执行 kl.syncNetworkUtil 定时同步 iptables 规则
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}
// Start component sync loops.
kl.statusManager.Start()
// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}
// Start the pod lifecycle event generator.
// 12、启动 pleg,持续监听container变化并同步到podcache中
kl.pleg.Start()
// 13、调用 kl.syncLoop 监听 pod 变化
kl.syncLoop(updates, kl)
}
至此kubelet的启动流程就梳理完毕。慢慢长路,刚刚启程。如果你在互联网上检索kubelet源码分析,可能你对pleg和syncLoop并不陌生。对于pod的实际操作过程实际上是在这两个步骤中执行的。后文将继续分析kubelet中pod的创建流程。敬请期待。
笔者作为云原生的初学者,在源码分析的过程中难免会出现一些错误,也留了一些坑没有填上,期望大家能够在编辑器中随着笔者的行文思路去逐步阅读kubelet的源码。欢迎大家指正,也期望大家共同进步。