本文共 6212 字,大约阅读时间需要 20 分钟。
kubelet源码入口:cmd/kubelet/kubelet.go main()
cmd/kubelet/app 包中的Run函数:
查看先参数,kubelet.KubeletDeps
1 type KubeletDeps struct { 2 Builder KubeletBuilder 3 ContainerRuntimeOptions []kubecontainer.Option 4 Options []Option 5 Auth server.AuthInterface -------- interface, 重点关注【下面还有,以这个为例】 6 CAdvisorInterface cadvisor.Interface 7 Cloud cloudprovider.Interface 8 ContainerManager cm.ContainerManager 9 DockerClient libdocker.Interface 10 EventClient v1core.EventsGetter11 KubeClient clientset.Interface12 ExternalKubeClient clientgoclientset.Interface13 Mounter mount.Interface14 NetworkPlugins []network.NetworkPlugin15 OOMAdjuster *oom.OOMAdjuster16 OSInterface kubecontainer.OSInterface17 PodConfig *config.PodConfig18 Recorder record.EventRecorder19 Writer kubeio.Writer20 VolumePlugins []volume.VolumePlugin21 TLSOptions *server.TLSOptions22 }
以下为server.AuthInterface 接口定义:
1 // AuthInterface contains all methods required by the auth filters2 type AuthInterface interface {3 authenticator.Request 4 authorizer.RequestAttributesGetter5 authorizer.Authorizer 6 }
继续查看:
1 type Request interface { 2 AuthenticateRequest(req *http.Request) (user.Info, bool, error) 3 } 4 5 type RequestAttributesGetter interface { 6 GetRequestAttributes(user.Info, *http.Request) Attributes 7 } 8 9 type Authorizer interface {10 Authorize(a Attributes) (authorized bool, reason string, err error)11 }
KubeletDeps 结构体中其他的接口也类似;
主要是参数的初始化判断,然后通过kubeDeps.ContainerManager进行管理;
该函数返回的是一个ContainerManager接口,如下:
1 // Manages the containers running on a machine. 2 type ContainerManager interface { 3 // Runs the container manager's housekeeping. 4 // - Ensures that the Docker daemon is in a container. 5 // - Creates the system container where all non-containerized processes run. 6 Start(*v1.Node, ActivePodsFunc) error 7 8 // Returns resources allocated to system cgroups in the machine. 9 // These cgroups include the system and Kubernetes services.10 SystemCgroupsLimit() v1.ResourceList11 12 // Returns a NodeConfig that is being used by the container manager.13 GetNodeConfig() NodeConfig14 15 // Returns internal Status.16 Status() Status17 18 // NewPodContainerManager is a factory method which returns a podContainerManager object19 // Returns a noop implementation if qos cgroup hierarchy is not enabled20 NewPodContainerManager() PodContainerManager21 22 // GetMountedSubsystems returns the mounted cgroup subsystems on the node23 GetMountedSubsystems() *CgroupSubsystems24 25 // GetQOSContainersInfo returns the names of top level QoS containers26 GetQOSContainersInfo() QOSContainersInfo27 28 // GetNodeAllocatable returns the amount of compute resources that have to be reserved from scheduling.29 GetNodeAllocatableReservation() v1.ResourceList30 31 // GetCapacity returns the amount of compute resources tracked by container manager available on the node.32 GetCapacity() v1.ResourceList33 34 // UpdateQOSCgroups performs housekeeping updates to ensure that the top35 // level QoS containers have their desired state in a thread-safe way36 UpdateQOSCgroups() error37 }
而结构体containerManagerImpl 实现了ContainerManager接口:
继续往下:
进入函数,查看注释:
主要就是做一些基本验证:
启动kublet服务:
1 func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) { 2 // start the kubelet 3 go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop) # go routine, 这里主要是对接kube-api 4 5 // start the kubelet server 6 if kubeCfg.EnableServer { 7 go wait.Until(func() { 8 k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling) 9 }, 0, wait.NeverStop) // *****10 } if kubeCfg.ReadOnlyPort > 0 { 11 go wait.Until(func() {12 k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))13 }, 0, wait.NeverStop) 14 } 15 }
启动kubelet HTTP server:
1 // ListenAndServe runs the kubelet HTTP server.2 func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers, enableContentionProfiling bool) {3 server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, enableContentionProfiling, kl.containerRuntime, kl.criHandler)4 }
continue:
1 // ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet. 2 func ListenAndServeKubeletServer( 3 host HostInterface, 4 resourceAnalyzer stats.ResourceAnalyzer, 5 address net.IP, 6 port uint, 7 tlsOptions *TLSOptions, 8 auth AuthInterface, 9 enableDebuggingHandlers,10 enableContentionProfiling bool,11 runtime kubecontainer.Runtime,12 criHandler http.Handler) { glog.Infof("Starting to listen on %s:%d", address, port)13 handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, enableContentionProfiling, runtime, criHandler) s := &http.Server{14 Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),15 Handler: &handler,16 MaxHeaderBytes: 1 << 20, 17 } 18 if tlsOptions != nil {19 s.TLSConfig = tlsOptions.Config20 // Passing empty strings as the cert and key files means no21 // cert/keys are specified and GetCertificate in the TLSConfig22 // should be called instead.23 glog.Fatal(s.ListenAndServeTLS(tlsOptions.CertFile, tlsOptions.KeyFile)) } else {24 glog.Fatal(s.ListenAndServe())25 } 26 }
转载地址:http://xquaa.baihongyu.com/