本文基于Kubernetes v1.22.4版本进行源码学习
kubelet的工作核心就是一个控制循环,即:SyncLoop(图中的大圆圈)。而驱动这个控制循环运行的事件,包括:Pod更新事件、Pod生命周期变化、kubelet本身设置的执行周期、定时的清理事件
kubelet还负责维护着很多其他的子控制循环(也就是图中的小圆圈),叫做xxxManager,比如:probeManager会定时去监控Pod中容器的健康状况,当前支持两种类型的探针:livenessProbe和readinessProbe;statusManager负责维护状态信息,并把Pod状态更新到APIServer;containerRefManager是容器引用的管理,用来报告容器的创建、失败等事件
kubelet调用下层容器运行时的执行过程,并不会直接调用Docker的API,而是通过一组叫作CRI(Container Runtime Interface,容器运行时接口)的gRPC接口来间接执行的
CRI shim负责响应CRI请求,扮演kubelet与容器项目之间的垫片(shim)。CRI shim实现了CRI规定的每个接口,然后把具体的CRI请求翻译成对后端容器项目的请求或者操作
每一种容器项目都可以自己实现一个CRI shim,自行对CRI请求进行处理,这样,Kubernetes就有了一个统一的容器抽象层,使得下层容器运行时可以自由地对接进入Kubernetes当中
如果使用Docker的话,dockershim负责处理CRI请求,然后组装成Docker API请求发给Docker Daemon
CRI接口可以分为两组:
CRI接口核心方法如下图:
CRD设计的一个重要原则,就是确保这个接口本身,只关注容器,不关注Pod,在CRI的设计里并没有一个直接创建Pod或者启动Pod的接口
PodSandboxManager中包含RunPodSandbox方法,这个PodSandbox对应的并不是Kubernetes里的Pod API对象,而只是抽取了Pod里的一部分与容器运行时相关的字段,比如HostName、DnsConfig、CgroupParent等。所以说,PodSandbox描述的其实是Kubernetes将Pod这个概念映射到容器运行时层面所需要的字段,或者说是一个Pod对象子集
比如,当执行kubectl run创建了一个名叫foo的、包括了A、B两个容器的Pod之后。如果是Docker项目,dockershim就会创建出一个名叫foo的Infra容器(pause容器)用来hold住整个Pod的Network Namespace
pkg/kubelet/kubelet.go
的Run()
方法启动了kubelet各个模块,代码如下:
// pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
klog.InfoS("No API server defined - no node status update will be sent")
}
// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
// 启动不依赖container runtime的一些模块
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.ErrorS(err, "Failed to initialize internal modules")
os.Exit(1)
}
// Start volume manager
// 启动volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
// 定时同步node状态
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
// 更新pod CIDR、runtime状态以及执行首次node状态同步
go kl.fastStatusUpdateOnce()
// start syncing lease
// 启动node lease机制
go kl.nodeLeaseController.Run(wait.NeverStop)
}
// 定时更新runtime状态
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Set up iptables util rules
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}
// Start component sync loops.
// 启动statusManager
kl.statusManager.Start()
// Start syncing RuntimeClasses if enabled.
// 启动runtimeClassManager
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}
// Start the pod lifecycle event generator.
// 启动pleg,该模块主要用于周期性地向container runtime刷新当前所有容器的状态
kl.pleg.Start()
// 启动kubelet事件循环,不停监听外部数据的变化执行pod的相应操作
kl.syncLoop(updates, kl)
}
Run()
方法主要逻辑如下:
kl.initializeModules()
方法,启动不依赖container runtime的一些模块kl.fastStatusUpdateOnce()
方法,更新pod CIDR、runtime状态以及执行首次node状态同步kl.updateRuntimeUp()
方法,更新runtime状态kl.pleg.Start()
方法,启动pleg,该模块主要用于周期性地向container runtime刷新当前所有容器的状态kl.syncLoop()
方法,启动kubelet事件循环,不停监听外部数据的变化执行pod的相应操作initializeModules()
方法// pkg/kubelet/kubelet.go
func (kl *Kubelet) initializeModules() error {
// Prometheus metrics.
metrics.Register(
collectors.NewVolumeStatsCollector(kl),
collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),
)
metrics.SetNodeName(kl.nodeName)
servermetrics.Register()
// Setup filesystem directories.
if err := kl.setupDataDirs(); err != nil {
return err
}
// If the container logs directory does not exist, create it.
if _, err := os.Stat(ContainerLogsDir); err != nil {
if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
return fmt.Errorf("failed to create directory %q: %v", ContainerLogsDir, err)
}
}
// Start the image manager.
// 启动imageManager
kl.imageManager.Start()
// Start the certificate manager if it was enabled.
// 启动certificateManager,证书相关
if kl.serverCertificateManager != nil {
kl.serverCertificateManager.Start()
}
// Start out of memory watcher.
// 启动oomWatcher
if kl.oomWatcher != nil {
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
return fmt.Errorf("failed to start OOM watcher: %w", err)
}
}
// Start resource analyzer
// 启动resource analyzer,刷新volume stats到缓存中
kl.resourceAnalyzer.Start()
return nil
}
initializeModules()
方法主要逻辑如下:
kl.imageManager.Start()
方法代码如下:
// pkg/kubelet/images/image_gc_manager.go
func (im *realImageGCManager) Start() {
go wait.Until(func() {
// Initial detection make detected time "unknown" in the past.
var ts time.Time
if im.initialized {
ts = time.Now()
}
// 找出所有的image,并删除不再使用的image
_, err := im.detectImages(ts)
if err != nil {
klog.InfoS("Failed to monitor images", "err", err)
} else {
im.initialized = true
}
}, 5*time.Minute, wait.NeverStop)
// Start a goroutine periodically updates image cache.
// 更新image的缓存
go wait.Until(func() {
// 调用CRI接口,获取最新的image
images, err := im.runtime.ListImages()
if err != nil {
klog.InfoS("Failed to update image list", "err", err)
} else {
im.imageCache.set(images)
}
}, 30*time.Second, wait.NeverStop)
}
realImageGCManager的Start()
方法启动两个协程
detectImages()
方法,会找出所有正在使用的image,然后删除不再使用的imageimageCache()
方法更新image的缓存fastStatusUpdateOnce()
方法// pkg/kubelet/kubelet.go
func (kl *Kubelet) fastStatusUpdateOnce() {
for {
time.Sleep(100 * time.Millisecond)
node, err := kl.GetNode()
if err != nil {
klog.ErrorS(err, "Error getting node")
continue
}
if len(node.Spec.PodCIDRs) != 0 {
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
if _, err := kl.updatePodCIDR(podCIDRs); err != nil {
klog.ErrorS(err, "Pod CIDR update failed", "CIDR", podCIDRs)
continue
}
// 更新runtime状态
kl.updateRuntimeUp()
// node状态同步
kl.syncNodeStatus()
return
}
}
}
fastStatusUpdateOnce()
方法启动一个循环,尝试立即更新pod CIDR。更新pod CIDR后,会更新runtime状态并同步node状态。该方法在一次成功的node状态同步后直接返回,仅在kubelet启动期间执行
kl.updateRuntimeUp()
方法代码如下:
// pkg/kubelet/kubelet.go
// 首次执行的时候会初始化runtime依赖模块,然后更新runtimeState
func (kl *Kubelet) updateRuntimeUp() {
kl.updateRuntimeMux.Lock()
defer kl.updateRuntimeMux.Unlock()
// 获取containerRuntime状态
s, err := kl.containerRuntime.Status()
if err != nil {
klog.ErrorS(err, "Container runtime sanity check failed")
return
}
if s == nil {
klog.ErrorS(nil, "Container runtime status is nil")
return
}
// Periodically log the whole runtime status for debugging.
klog.V(4).InfoS("Container runtime status", "status", s)
networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
// 检查network和runtime是否处于ready状态
if networkReady == nil || !networkReady.Status {
klog.ErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)
kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))
} else {
// Set nil if the container runtime network is ready.
kl.runtimeState.setNetworkState(nil)
}
// information in RuntimeReady condition will be propagated to NodeReady condition.
// 获取运行时状态
runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
// If RuntimeReady is not set or is false, report an error.
if runtimeReady == nil || !runtimeReady.Status {
klog.ErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)
kl.runtimeState.setRuntimeState(fmt.Errorf("container runtime not ready: %v", runtimeReady))
return
}
kl.runtimeState.setRuntimeState(nil)
// 调用kl.initializeRuntimeDependentModules初始化依赖模块
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
kl.runtimeState.setRuntimeSync(kl.clock.Now())
}
updateRuntimeUp()
方法会获取containerRuntime状态信息,然后根据返回containerRuntime状态检查网络、runtime是不是已经处于ready状态;接着调用kl.initializeRuntimeDependentModules()
方法初始化依赖模块,这里会启动cadvisor、containerManager、evictionManager、containerLogManager、pluginManager、shutdownManager;最后设置runtime同步时间
syncLoop()
方法// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.InfoS("Starting kubelet main sync loop")
// The syncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
// Responsible for checking limits in resolv.conf
// The limits do not have anything to do with individual pods
// Since this is called in syncLoop, we don't need to call it anywhere else
if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
kl.dnsConfigurer.CheckLimitsForResolvConf()
}
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.ErrorS(err, "Skipping pod synchronization")
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
// 调用kl.syncLoopIteration方法
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
syncLoop()
方法在一个循环中不断的调用syncLoopIteration()
方法
关于syncLoopIteration()
方法中涉及的channel后面会详细介绍,这里只关注syncLoopIteration()
方法中的处理逻辑
1)configCh
// pkg/kubelet/kubelet.go
// 该方法会监听多个channel,当发现任何一个channel有数据就交给handler去处理,在handler中通过调用dispatchWork分发任务
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
// 该模块将同时watch 3个不同来源的pod信息的变化(file,http,apiServer)
// 一旦某个来源的pod信息发生了变化(创建/更新/删除),这个channel中就会出现被更新的pod信息和更新的具体操作
// Update from a config source; dispatch it to the right handler
// callback.
if !open {
klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
return false
}
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", format.Pods(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}
kl.sourcesReady.AddSource(u.Source)
...
}
return true
}
configCh是读取配置事件的管道,该模块将同时watch 3个不同来源的Pod信息的变化(file、http、APIServer),一旦某个来源的Pod信息发生了变化(创建/更新/删除),这个channel中就会出现被更新的Pod信息和更新的具体操作
2)plegCh
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
...
case e := <-plegCh:
if e.Type == pleg.ContainerStarted {
// record the most recent time we observed a container start for this pod.
// this lets us selectively invalidate the runtimeCache when processing a delete for this pod
// to make sure we don't miss handling graceful termination for containers we reported as having started.
kl.lastContainerStartedTime.Add(e.ID, time.Now())
}
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
...
}
return true
}
kl.pleg.Start()
的时候会每秒钟调用一次relist,根据最新的PodStatus生成PodLiftCycleEvent,然后存入到plegCh中
syncLoop()
方法中调用kl.pleg.Watch()
获取plegCh,然后传给syncLoopIteration()
方法,syncLoopIteration()
方法中消费plegCh中的数据,在handler中通过调用dispatchWork分发任务
3)syncCh
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
...
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", format.Pods(podsToSync))
// 同步最新保存的pod状态
handler.HandlePodSyncs(podsToSync)
...
}
return true
}
syncCh是由syncLoop()
方法里面创建的一个定时任务,每秒钟会向syncCh添加一个数据,这个方法会同步所有等待同步的Pod
4)kl.livenessManager.Updates()
、kl.readinessManager.Updates()
、kl.startupManager.Updates()
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
...
case update := <-kl.livenessManager.Updates():
// 如果探针健康检查失败,需要更新pod的状态
if update.Result == proberesults.Failure {
handleProbeSync(kl, update, handler, "liveness", "unhealthy")
}
case update := <-kl.readinessManager.Updates():
// 当readiness状态变更时,更新容器和pod的状态
ready := update.Result == proberesults.Success
kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
status := ""
if ready {
status = "ready"
}
handleProbeSync(kl, update, handler, "readiness", status)
case update := <-kl.startupManager.Updates():
// 当startup状态变更时,更新容器和pod的状态
started := update.Result == proberesults.Success
kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
status := "unhealthy"
if started {
status = "started"
}
handleProbeSync(kl, update, handler, "startup", status)
...
}
return true
}
6)housekeepingCh
// pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
...
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
// If the sources aren't ready or volume manager has not yet synced the states,
// skip housekeeping, as we may accidentally delete pods from unready sources.
klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
} else {
start := time.Now()
klog.V(4).InfoS("SyncLoop (housekeeping)")
// 执行一些清理工作,包括终止pod workers、删除不想要的pod,移除volumes、pod目录
if err := handler.HandlePodCleanups(); err != nil {
klog.ErrorS(err, "Failed cleaning pods")
}
duration := time.Since(start)
if duration > housekeepingWarningDuration {
klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
}
klog.V(4).InfoS("SyncLoop (housekeeping) end")
}
}
return true
}
housekeepingCh也是由由syncLoop()
方法创建的,每两秒钟会触发清理,包括:终止Pod Workers、删除不想要的Pod,移除Volumes、Pod目录等
syncLoopIteration()
方法监听如下的channel,根据事件做不同的处理:
kl.livenessManager.Updates()
:如果探针检查失败,需要更新Pod的状态kl.readinessManager.Updates()
:当readiness状态变更时,更新容器和Pod的状态kl.startupManager.Updates()
:当startup状态变更时,更新容器和Pod的状态kubelet启动过程如下图:
syncLoopIteration()
方法中涉及的channelconfigCh相关的代码调用流程如上图,关于syncLoopIteration()
方法中configCh的处理逻辑前面已经讲过了,这里来看下kubelet是如何监听APIServer并将Pod信息变化写入configCh的
// pkg/kubelet/kubelet.go
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if len(kubeCfg.StaticPodURLHeader) > 0 {
for k, v := range kubeCfg.StaticPodURLHeader {
for i := range v {
manifestURLHeader.Add(k, v[i])
}
}
}
// source of all configuration
// 初始化config.PodConfig
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
// 添加三种数据来源,分别是file、http、apiServer
// define file config source
if kubeCfg.StaticPodPath != "" {
klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
// define url config source
if kubeCfg.StaticPodURL != "" {
klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
if kubeDeps.KubeClient != nil {
klog.InfoS("Adding apiserver pod source")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource))
}
return cfg, nil
}
makePodSourceConfig()
方法中先初始化config.PodConfig,然后添加三种数据来源:分别是file、http、APIServer,调用cfg.Channel()
方法会创建对应的channel
1)NewSourceApiserver()
这里先来看监听APIServer的部分,NewSourceApiserver()
方法代码如下:
// pkg/kubelet/config/apiserver.go
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
// 创建ListWatch,监听当前node的pod变化
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))
// The Reflector responsible for watching pods at the apiserver should be run only after
// the node sync with the apiserver has completed.
klog.InfoS("Waiting for node sync before watching apiserver pods")
go func() {
for {
if nodeHasSynced() {
klog.V(4).InfoS("node sync completed")
break
}
time.Sleep(WaitForAPIServerSyncPeriod)
klog.V(4).InfoS("node sync has not completed yet")
}
klog.InfoS("Watching apiserver")
// 如果node sync完成,调用newSourceApiserverFromLW方法
newSourceApiserverFromLW(lw, updates)
}()
}
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
// 监听到apiServer当前node的pod信息变化后写入channel,后续listen()方法会监听这个channel接收值
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
// 调用client-go API来创建reflector
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}
newSourceApiserverFromLW()
方法中调用client-go API来创建Reflector,当监听到APIServer中当前Node的Pod信息变化后写入channel
2)cfg.Channel()
makePodSourceConfig()
方法中调用cfg.Channel()
方法会创建对应的channel
// pkg/kubelet/config/config.go
// 给每个来源注册一个专用的channel
func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
// 调用c.mux.Channel方法
return c.mux.Channel(source)
}
// pkg/util/config/config.go
func (m *Mux) Channel(source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
m.sourceLock.Lock()
defer m.sourceLock.Unlock()
channel, exists := m.sources[source]
if exists {
return channel
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
// 同时启动goroutine去监听新数据
// 这里创建的channel最终会传入newSourceApiserverFromLW中定义的send函数中,当监听到apiServer当前node的pod数据变化后会写入channel
// listen函数会一直监听这个channel来接收数据
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
// 调用Merge方法
m.merger.Merge(source, update)
}
}
Channel()
方法中创建的channel最终会传入newSourceApiserverFromLW()
方法中定义的send()
函数中,当监听到APIServer当前Node的Pod信息数据变化后会写入channel,这里的listen()
方法会一直监听这个channel来接收数据,listen()
方法调用Merge()
方法处理接收到的数据
// pkg/kubelet/config/config.go
func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()
seenBefore := s.sourcesSeen.Has(source)
// 区分pod变更类型
adds, updates, deletes, removes, reconciles := s.merge(source, change)
firstSet := !seenBefore && s.sourcesSeen.Has(source)
// deliver update notifications
switch s.mode {
case PodConfigNotificationIncremental:
// 最终将pod变更信息传入configCh
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
// the source is ready.
s.updates <- *adds
}
// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
if len(reconciles.Pods) > 0 {
s.updates <- *reconciles
}
...
}
return nil
}
Merge()
方法中会区分Pod变更类型,最终将Pod变更信息传入configCh,kl.syncLoopIteration()
方法中监听configCh,交给handler去处理,在handler中通过调用dispatchWork分发任务
configCh数据写入流程如下图:
初始化pleg并运行代码如下:
// pkg/kubelet/pleg/generic.go
func (g *GenericPLEG) Start() {
go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}
Start()
方法中启动一个gorounite函数每一秒执行一次relist()
方法
// pkg/kubelet/pleg/generic.go
func (g *GenericPLEG) relist() {
klog.V(5).InfoS("GenericPLEG: Relisting")
if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
}
timestamp := g.clock.Now()
defer func() {
metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))
}()
// Get all the pods.
// 调用runtime获取当前node的所有pod和container信息(最终调用CRI接口)
podList, err := g.runtime.GetPods(true)
if err != nil {
klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")
return
}
g.updateRelistTime(timestamp)
pods := kubecontainer.Pods(podList)
// update running pod and container count
updateRunningPodAndContainerMetrics(pods)
g.podRecords.setCurrent(pods)
// Compare the old and the current pods, and generate events.
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
for pid := range g.podRecords {
oldPod := g.podRecords.getOld(pid)
pod := g.podRecords.getCurrent(pid)
// Get all containers in the old and the new pod.
// 获得pod中的所有container
allContainers := getContainersFromPods(oldPod, pod)
for _, container := range allContainers {
// 检查container是否有变化,如果有变化,生成podLifecycleEvent
events := computeEvents(oldPod, pod, &container.ID)
for _, e := range events {
updateEvents(eventsByPodID, e)
}
}
}
var needsReinspection map[types.UID]*kubecontainer.Pod
if g.cacheEnabled() {
needsReinspection = make(map[types.UID]*kubecontainer.Pod)
}
// If there are events associated with a pod, we should update the
// podCache.
// 遍历所有发生的event的pod
for pid, events := range eventsByPodID {
pod := g.podRecords.getCurrent(pid)
if g.cacheEnabled() {
// updateCache() will inspect the pod and update the cache. If an
// error occurs during the inspection, we want PLEG to retry again
// in the next relist. To achieve this, we do not update the
// associated podRecord of the pod, so that the change will be
// detect again in the next relist.
// TODO: If many pods changed during the same relist period,
// inspecting the pod and getting the PodStatus to update the cache
// serially may take a while. We should be aware of this and
// parallelize if needed.
if err := g.updateCache(pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
// make sure we try to reinspect the pod during the next relisting
needsReinspection[pid] = pod
continue
} else {
// this pod was in the list to reinspect and we did so because it had events, so remove it
// from the list (we don't want the reinspection code below to inspect it a second time in
// this relist execution)
delete(g.podsToReinspect, pid)
}
}
// Update the internal storage and send out the events.
g.podRecords.update(pid)
// Map from containerId to exit code; used as a temporary cache for lookup
containerExitCode := make(map[string]int)
// 遍历这个pod的所有event变化
for i := range events {
// Filter out events that are not reliable and no other components use yet.
if events[i].Type == ContainerChanged {
continue
}
select {
// 推送到kubelet的plegCh中
case g.eventChannel <- events[i]:
default:
metrics.PLEGDiscardEvents.Inc()
klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
}
// Log exit code of containers when they finished in a particular event
if events[i].Type == ContainerDied {
// Fill up containerExitCode map for ContainerDied event when first time appeared
if len(containerExitCode) == 0 && pod != nil && g.cache != nil {
// Get updated podStatus
status, err := g.cache.Get(pod.ID)
if err == nil {
for _, containerStatus := range status.ContainerStatuses {
containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode
}
}
}
if containerID, ok := events[i].Data.(string); ok {
if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {
klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)
}
}
}
}
}
if g.cacheEnabled() {
// reinspect any pods that failed inspection during the previous relist
if len(g.podsToReinspect) > 0 {
klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")
for pid, pod := range g.podsToReinspect {
if err := g.updateCache(pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
needsReinspection[pid] = pod
}
}
}
// Update the cache timestamp. This needs to happen *after*
// all pods have been properly updated in the cache.
g.cache.UpdateTime(timestamp)
}
// make sure we retain the list of pods that need reinspecting the next time relist is called
g.podsToReinspect = needsReinspection
}
relist()
方法中主要逻辑如下:
所有的Pod进入syncLoopIteration()
方法后,最终会走到managePodLoop()
方法中,会将Pod信息添加到workQueue队列里
// pkg/kubelet/pleg/generic.go
func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) {
// Requeue the last update if the last sync returned error.
switch {
case syncErr == nil:
// No error; requeue at the regular resync interval.
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg):
// Network is not ready; back off for short period of time and retry as network might be ready soon.
p.workQueue.Enqueue(pod.UID, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor))
default:
// Error occurred during the sync; back off and then retry.
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
}
p.completeWorkQueueNext(pod.UID)
}
syncCh是由syncLoop()
方法里面创建的一个定时任务,每秒钟会调用getPodsToSync()
方法从workQueue中获取等待同步的Pod进行同步
kubelet核心流程如下图:
参考:
kubelet源码分析 syncLoopIteration(一) configCh