前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >k8s first commit 源码分析之 API Server

k8s first commit 源码分析之 API Server

作者头像
菜菜cc
发布2022-11-15 21:36:54
3510
发布2022-11-15 21:36:54
举报
文章被收录于专栏:菜菜的技术博客

穿越回 2014 年,分析下 k8s 第一个提交的源码。

获取 first commit 源码

代码语言:javascript
复制
git clone https://github.com/kubernetes/kubernetes.git
cd kubernetes
git checkout `git rev-list --max-parents=0 HEAD`

简介

api-server 是 k8s 的核心组件之一,用于接收 kubelet 的请求,并将请求信息保存到后端存储 etcd 中。核心功能是提供 k8s 各类资源对象的 CURD 等操作。

源码分析

从 api-server 的命令行入口开始分析,命令行代码位于 cmd/apiserver/apiserver.go

代码语言:javascript
复制
var (
	port                        = flag.Uint("port", 8080, "The port to listen on.  Default 8080.")
	address                     = flag.String("address", "127.0.0.1", "The address on the local server to listen to. Default 127.0.0.1")
	apiPrefix                   = flag.String("api_prefix", "/api/v1beta1", "The prefix for API requests on the server. Default '/api/v1beta1'")
	etcdServerList, machineList util.StringList
)

func init() {
	flag.Var(&etcdServerList, "etcd_servers", "Servers for the etcd (http://ip:port), comma separated")
	flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.")
}

最开始定义了 api-server 启动所需要的相关参数,上古版本的 k8s 使用了标准库自带的 flag 库,其中 util.StringList实现了flag.Value接口。

代码语言:javascript
复制
type StringList []string

func (sl *StringList) String() string {
	return fmt.Sprint(*sl)
}

func (sl *StringList) Set(value string) error {
	for _, s := range strings.Split(value, ",") {
		if len(s) == 0 {
			return fmt.Errorf("value should not be an empty string")
		}
		*sl = append(*sl, s)
	}
	return nil
}

可以看到util.StringList用于将以逗号分割的字符串转为[]string类型。各个命令行参数含义如下:

  • port: api-server 监听的 port
  • address: api-server 监听的 ip
  • apiPrefix: 访问 api-server 的 URL 前缀
  • etcdServerList: 后端存储的 etcd 节点列表
  • machineList: 工作节点的列表

从 main 函数开始分析 api-server 的具体实现

代码语言:javascript
复制
var (
    taskRegistry       registry.TaskRegistry
    controllerRegistry registry.ControllerRegistry
    serviceRegistry    registry.ServiceRegistry
)

if len(etcdServerList) > 0 {
    log.Printf("Creating etcd client pointing to %v", etcdServerList)
    etcdClient := etcd.NewClient(etcdServerList)
    taskRegistry = registry.MakeEtcdRegistry(etcdClient, machineList)
    controllerRegistry = registry.MakeEtcdRegistry(etcdClient, machineList)
    serviceRegistry = registry.MakeEtcdRegistry(etcdClient, machineList)
} else {
    taskRegistry = registry.MakeMemoryRegistry()
    controllerRegistry = registry.MakeMemoryRegistry()
    serviceRegistry = registry.MakeMemoryRegistry()
}

registry 是对具体资源对象的后端存储的抽象,这里定义了三个 registry,并根据命令行参数判断是使用 etcd 还是内存作为存储后端。

代码语言:javascript
复制
// 代码路径:pkg/registry/interfaces.go
// TaskRegistry is an interface implemented by things that know how to store Task objects
type TaskRegistry interface {
	// ListTasks obtains a list of tasks that match query.
	// Query may be nil in which case all tasks are returned.
	ListTasks(query *map[string]string) ([]api.Task, error)
	// Get a specific task
	GetTask(taskId string) (*api.Task, error)
	// Create a task based on a specification, schedule it onto a specific machine.
	CreateTask(machine string, task api.Task) error
	// Update an existing task
	UpdateTask(task api.Task) error
	// Delete an existing task
	DeleteTask(taskId string) error
}

其中taskRegistry是对 task 的存储抽象,task 可以当作 pod 的前身看待,实现了对 task 的 list,get,create, update, delete 的操作。

代码语言:javascript
复制
// 代码路径:pkg/registry/interfaces.go
// ControllerRegistry is an interface for things that know how to store Controllers
type ControllerRegistry interface {
	ListControllers() ([]api.ReplicationController, error)
	GetController(controllerId string) (*api.ReplicationController, error)
	CreateController(controller api.ReplicationController) error
	UpdateController(controller api.ReplicationController) error
	DeleteController(controllerId string) error
}

ControllerRegistry是对 RC(Replication Controller) 的存储抽象,而我们现在使用的较多的是 RS(RepicateSet), RS 正是 RC 的升级,同样是实现了对 RC 的 list,get,create,update,delete 操作。

代码语言:javascript
复制
// 代码路径:pkg/registry/service_registry.go
type ServiceRegistry interface {
	ListServices() (ServiceList, error)
	CreateService(svc Service) error
	GetService(name string) (*Service, error)
	DeleteService(name string) error
	UpdateService(svc Service) error
	UpdateEndpoints(e Endpoints) error
}

ServiceRegistry是对 service 的存储抽象

代码语言:javascript
复制
containerInfo := &kube_client.HTTPContainerInfo{
    Client: http.DefaultClient,
    Port:   10250,
}

storage := map[string]apiserver.RESTStorage{
    "tasks":                  registry.MakeTaskRegistryStorage(taskRegistry, containerInfo, registry.MakeFirstFitScheduler(machineList, taskRegistry)),
    "replicationControllers": registry.MakeControllerRegistryStorage(controllerRegistry),
    "services":               registry.MakeServiceRegistryStorage(serviceRegistry),
}

storge 是对所有资源的 registry 的统一抽象,被定义为 REST 风格的资源操作接口。

代码语言:javascript
复制
// 代码路径: pkg/apiserver/api_server.go
// RESTStorage is a generic interface for RESTful storage services
type RESTStorage interface {
	List(*url.URL) (interface{}, error)
	Get(id string) (interface{}, error)
	Delete(id string) error
	Extract(body string) (interface{}, error)
	Create(interface{}) error
	Update(interface{}) error
}

实例化所有资源的 storage 后放在 map 中维护,用于后面 handler 的处理。

代码语言:javascript
复制
s := &http.Server{
    Addr:           fmt.Sprintf("%s:%d", *address, *port),
    Handler:        apiserver.New(storage, *apiPrefix),  // 使用 REST storage 创建请求的 handler
    ReadTimeout:    10 * time.Second,
    WriteTimeout:   10 * time.Second,
    MaxHeaderBytes: 1 << 20,
}
log.Fatal(s.ListenAndServe())

使用前面的 REST Storage map 和 api prefix 创建 handler,启动 HTTP 服务器等待接收请求。接下来转到 handler 分析源码

代码语言:javascript
复制
// 代码路径:pkg/apiserver/api_server.go
// New creates a new ApiServer object.
// 'storage' contains a map of handlers.
// 'prefix' is the hosting path prefix.
func New(storage map[string]RESTStorage, prefix string) *ApiServer {
	return &ApiServer{
		storage: storage,
		prefix:  prefix,
	}
}

// HTTP Handler interface
func (server *ApiServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	log.Printf("%s %s", req.Method, req.RequestURI)
	url, err := url.ParseRequestURI(req.RequestURI)
	if err != nil {
		server.error(err, w)
		return
	}
	if url.Path == "/index.html" || url.Path == "/" || url.Path == "" {
		server.handleIndex(w)
		return
	}
	if !strings.HasPrefix(url.Path, server.prefix) {
		server.notFound(req, w)
		return
	}
	requestParts := strings.Split(url.Path[len(server.prefix):], "/")[1:]
	if len(requestParts) < 1 {
		server.notFound(req, w)
		return
	}
	storage := server.storage[requestParts[0]]
	if storage == nil {
		server.notFound(req, w)
		return
	} else {
		server.handleREST(requestParts, url, req, w, storage)
	}
}

Golang HTTP 的标准库是通过实现 Handler 接口的 ServeHTTP 函数来实现处理请求,通过代码可以看出先对请求的 URL 进行解析获取具体的资源对象,再通过 REST storage map 拿到对应资源对象的 REST storage,最后调用 server.handleREST来处理具体的请求。

代码语言:javascript
复制
// 代码路径:pkg/apiserver/api_server.go
func (server *ApiServer) handleREST(parts []string, url *url.URL, req *http.Request, w http.ResponseWriter, storage RESTStorage) {
	switch req.Method {
	case "GET":
		switch len(parts) {
		case 1:
			controllers, err := storage.List(url)
			if err != nil {
				server.error(err, w)
				return
			}
			server.write(200, controllers, w)
		case 2:
			task, err := storage.Get(parts[1])
			if err != nil {
				server.error(err, w)
				return
			}
			if task == nil {
				server.notFound(req, w)
				return
			}
			server.write(200, task, w)
		default:
			server.notFound(req, w)
		}
		return
	case "POST":
		if len(parts) != 1 {
			server.notFound(req, w)
			return
		}
		body, err := server.readBody(req)
		if err != nil {
			server.error(err, w)
			return
		}
		obj, err := storage.Extract(body)
		if err != nil {
			server.error(err, w)
			return
		}
		storage.Create(obj)
		server.write(200, obj, w)
		return
	case "DELETE":
		if len(parts) != 2 {
			server.notFound(req, w)
			return
		}
		err := storage.Delete(parts[1])
		if err != nil {
			server.error(err, w)
			return
		}
		server.write(200, Status{success: true}, w)
		return
	case "PUT":
		if len(parts) != 2 {
			server.notFound(req, w)
			return
		}
		body, err := server.readBody(req)
		if err != nil {
			server.error(err, w)
		}
		obj, err := storage.Extract(body)
		if err != nil {
			server.error(err, w)
			return
		}
		err = storage.Update(obj)
		if err != nil {
			server.error(err, w)
			return
		}
		server.write(200, obj, w)
		return
	default:
		server.notFound(req, w)
	}
}

可以很清晰的看出,这段逻辑是根据请求方法和请求参数对实际的资源对象进行特定的 REST 的操作。

回到 main 函数,在启动 HTTP server 之前还启动了一个 goroutine 做定时任务

代码语言:javascript
复制
endpoints := registry.MakeEndpointController(serviceRegistry, taskRegistry)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)

其中 util.Forever就是周期性任务的封装

代码语言:javascript
复制
// 代码路径: pkg/util/util.go
// Loops forever running f every d.  Catches any panics, and keeps going.
func Forever(f func(), period time.Duration) {
	for {
		func() {
			defer HandleCrash()
			f()
		}()
		time.Sleep(period)
	}
}

任务实体endpoints.SyncServiceEndpoints逻辑如下

代码语言:javascript
复制
// 代码路径: pkg/registry/endpoint.go
func (e *EndpointController) SyncServiceEndpoints() error {
	services, err := e.serviceRegistry.ListServices()
	if err != nil {
		return err
	}
	var resultErr error
	for _, service := range services.Items {
		tasks, err := e.taskRegistry.ListTasks(&service.Labels)
		if err != nil {
			log.Printf("Error syncing service: %#v, skipping.", service)
			resultErr = err
			continue
		}
		endpoints := make([]string, len(tasks))
		for ix, task := range tasks {
			// TODO: Use port names in the service object, don't just use port #0
			endpoints[ix] = fmt.Sprintf("%s:%d", task.CurrentState.Host, task.DesiredState.Manifest.Containers[0].Ports[0].HostPort)
		}
		err = e.serviceRegistry.UpdateEndpoints(Endpoints{
			Name:      service.ID,
			Endpoints: endpoints,
		})
		if err != nil {
			log.Printf("Error updating endpoints: %#v", err)
			continue
		}
	}
	return resultErr
}

可以看到主要逻辑就是定时获取所有 service 列表,再遍历 service 列表查询 service 下所有 task,最后根据 task 的 endpoint 来更新 service 的 endpoints。这一段逻辑其实就是为 kubeproxy 做负载均衡用的,让 kubeproxy 知道需要代理的 endpoint 有哪些。这一块逻辑在现在的 k8s 架构中已经从 api-server 中移除了。

笔者只分析了 api-server 主体的逻辑,后续会分析具体 registry 的逻辑。

本文作者: Ifan Tsai  (菜菜)

本文链接: https://cloud.tencent.com/developer/article/2164602

版权声明: 本文采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。转载请注明出处!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-09-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 获取 first commit 源码
  • 简介
  • 源码分析
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档