client-go源码结构及Client客户端对象

1. client-go源码结构及说明

源码目录结构及说明:

源码目录

说明

discovery

提供DiscoveryClient发现客户端

dynamic

提供DynamicClient动态客户端

informers

每种kubernetes资源的动态实现

kubernetes

提供ClientSet客户端

listers

为每一个kubernetes资源提供Lister功能,该功能对Get和List请求提供只读的缓存数据

plugin

提供OpenStack、GCP和Azure等云服务商授权插件

rest

提供RESTClient客户端,对Kuberntes API Server执行RESTful操作

scale

提供ScaleClient客户端,用于扩容或缩容Deployment、ReplicaSet、Replication Controller等资源对象

tools

提供常用工具,例如Sharedinformer、Reflector、DealtFIFO及Indexers。提供Client查询和缓存机制,以减少向kube-apiserver发起的请求数等

transport

提供安全的TCP连接,支持Http Stream,某些操作需要在客户端和容器之间传输二进制流,例如exec、attach等操作。该功能由内部的spdy包提供支持

util

提供常用方法,例如WorkQueue工作队列、Certificate证书管理

2. Client客户端对象

client-go支持4种客户端对象与Kubernetes API Server进行交互,如图所示:

RESTClient是最基础的客户端。RESTClient对HTTP Request进行了封装,实现了RESTful风格的API。ClientSet、DynamicClient及DiscoveryClient客户端都是基于RESTClient实现的。

  • ClientSet在RESTClient的基础上封装了对Resource和Version的管理方案。每一个Resource可以理解为一个客户端,而ClientSet则是多个客户端的集合,每一个Resource和Version都以函数的方式暴露给开发者。ClientSet只能够处理Kubernetes内置资源,它是通过client-gen代码生成器自动生成的。

  • DynamicClient与ClientSet最大的不同之处是,ClientSet仅能访问Kubernetes自带的资源(即Client集合内的资源),不能直接访问CRD自定义资源。DynamicClient能够处理Kubernetes中的所有资源对象,包括Kubernetes内置资源与CRD自定义资源

  • DiscoveryClient发现客户端,用于发现kube-apiserver所支持的资源组、资源版本、资源信息(即Group、Version、Resources)

以上4种客户端都可以通过kubeconfig配置信息连接到指定到Kubernetes API Server。

2.1 kubeconfig配置管理

kubeconfig可用于管理访问kube-apiserver的配置信息,也支持多集群的配置管理,可在不同环境下管理不同kube-apiserver集群配置。kubernetes的其他组件都是用kubeconfig配置信息来连接kube-apiserver组件。kubeconfig存储了集群、用户、命名空间和身份验证等信息,默认kubeconfig存放在$HOME/.kube/config路径下。kubeconfig的配置信息如下:

apiVersion: v1
clusters:
- cluster:
    insecure-skip-tls-verify: true
    server: https://127.0.0.1:6443
  name: kubernetes
contexts:
- context:
    cluster: kubernetes
    namespace: default
    user: username
  name: kubernetes
current-context: kubernetes
kind: Config
preferences: {}
users:
- name: username
  user:
    token: asdf

kubeconfig配置信息分为3部分,分别为:

  • clusters:定义kubernetes集群信息,例如kube-apiserver的服务地址及集群的证书信息

  • users:定义kubernetes集群用户身份验证的客户端凭据,例如client-certificate、client-key、token及username/password等。

  • context:定义kuberntes集群用户信息和命名空间等,用于将请求发送到指定的集群

2.1.1 代码练习

由于在windows上进行,所以采用的环境变量的形式设置的kubeconfig文件,查询pod信息,代码如下:

package main
import (
    "context"
    "fmt"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "os"
)
func main() {
    kubeconfig := ""
    // 获取环境变量并赋值给kubeconfig
    if envvar := os.Getenv("KUBECONFIG"); len(envvar) > 0 {
        kubeconfig = envvar
    }
    // 通过kubeconfig文件实例化config对象
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        fmt.Printf("The kubeconfig cannot be loaded: %v\n", err)
        panic(err)
    }
    // 通过config对象得到ClientSet客户端对象
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        fmt.Printf("The kubeconfig cannot be loaded: %v\n", err)
        panic(err)
    }
    // 使用ClientSet客户端对象通过资源函数进行Pod资源的访问
    pod, err := clientset.CoreV1().Pods("my-system").Get(context.Background(), "example", metav1.GetOptions{})
    //打印pod的状态信息
    fmt.Println(pod.Status.String())
}
#### 2.1.2 加载kubeconfig配置信息

**源码路径**:k8s.io\client-go\tools\clientcmd\loader.go

```go
func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {

    ......
    kubeConfigFiles := []string{}
    // Make sure a file we were explicitly told to use exists
    if len(rules.ExplicitPath) > 0 {
        if _, err := os.Stat(rules.ExplicitPath); os.IsNotExist(err) {
            return nil, err
        }
        // 通过文件路径获取kubeconfig配置信息 -> rules.ExplicitPath
        kubeConfigFiles = append(kubeConfigFiles, rules.ExplicitPath)
    } else {
        // 通过环境变量获取kubeconfig配置信息,rules.Precedence...可指定多个路径
        kubeConfigFiles = append(kubeConfigFiles, rules.Precedence...)
    }
    kubeconfigs := []*clientcmdapi.Config{}
    // read and cache the config files so that we only look at them once
    for _, filename := range kubeConfigFiles {
        ......
        // 读取数据并把读取的数据反序列化到Config对象中
        config, err := LoadFromFile(filename)
        ......
    }
    .......
}
**源码路径**:k8s.io\client-go\tools\clientcmd\loader.go
```go
// Load takes a byte slice and deserializes the contents into Config object.
// Encapsulates deserialization without assuming the source is a file.
func Load(data []byte) (*clientcmdapi.Config, error) {
    config := clientcmdapi.NewConfig()
    ......
    // 执行反序列化操作
    decoded, _, err := clientcmdlatest.Codec.Decode(data, &schema.GroupVersionKind{Version: clientcmdlatest.Version, Kind: "Config"}, config)
    ......
    return decoded.(*clientcmdapi.Config), nil
}

2.1.3 合并多个kubeconfig配置信息

多个配置信息合并如图所示:

源码路径:k8s.io\client-go\tools\clientcmd\loader.go

func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {

    ......

    // read and cache the config files so that we only look at them once
    for _, filename := range kubeConfigFiles {
        ......
        config, err := LoadFromFile(filename)
        ......

    // first merge all of our maps
    mapConfig := clientcmdapi.NewConfig()
    for _, kubeconfig := range kubeconfigs {
        mergo.MergeWithOverwrite(mapConfig, kubeconfig)
    }
    // merge all of the struct values in the reverse order so that priority is given correctly
    // errors are not added to the list the second time
    nonMapConfig := clientcmdapi.NewConfig()
    for i := len(kubeconfigs) - 1; i >= 0; i-- {
        kubeconfig := kubeconfigs[i]
        mergo.MergeWithOverwrite(nonMapConfig, kubeconfig)
    }
    // since values are overwritten, but maps values are not, we can merge the non-map config on top of the map config and
    // get the values we expect.
    config := clientcmdapi.NewConfig()
    mergo.MergeWithOverwrite(config, mapConfig)
    mergo.MergeWithOverwrite(config, nonMapConfig)
    ......
}

从代码中可以看出,merge分为四步,①先将map类型的config进行合并到mapConfig中;②将非map类型的配置合并到nonMapConfig中;@将mapConfig合并到config中;④将nonMapConfig合并到config中;

2.2 RESTClient客户端

RESTClient是最基础的客户端,其他的ClientSet、DynamicClient及DiscoveryClient都是基于RESTClient实现的。RESTClient对HTTP Request进行了封装,实现了RESTful风格的API。

2.2.1 代码练习

import (
    "context"
    "fmt"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    scheme "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
    "testing"
)
func TestRestClient(t *testing.T) {
    //加载kubeconfig配置信息
    kubeconfig, err := clientcmd.BuildConfigFromFlags("", "F:\\code\\env\\config")
    if err != nil {
        panic(err)
    }
    //设置config.APIPath请求的HTTP路径
    kubeconfig.APIPath = "api"
    //设置config.GroupVersion请求的资源组/资源版本
    kubeconfig.GroupVersion = &corev1.SchemeGroupVersion
    //设置config.NegotiatedSerializer数据的解码器
    kubeconfig.NegotiatedSerializer = scheme.Codecs
    //实例化Client对象
    restClient, err := rest.RESTClientFor(kubeconfig)
    if err != nil {
        panic(err)
    }
    result := &corev1.PodList{}
    //构建HTTP请求参数
    //请求方法可以是Get、Post、Put、Delete、Patch等
    //Namespace函数设置请求的命名空间
    //Resource函数设置请求的资源名称
    //VersionParams函数将一些查询选项(如limit、TimeoutSeconds等)添加到请求参数中
    // Do函数执行请求,并将kube-apiserver返回的result对象解析到corev1.PodList对象中
    err = restClient.Get().
        Namespace("istio-system").
        Resource("pods").
        VersionedParams(&metav1.ListOptions{Limit: 500}, scheme.ParameterCodec).
        Do(context.Background()).
        Into(result)
    if err != nil {
        panic(err)
    }
    for _, d := range result.Items {
        fmt.Printf("NAMESPACE:%v \t NAME:%v \t STATU:%v\n", d.Namespace, d.Name, d.Status.Phase)
    }
}

2.2.2 源码分析

由上练习得知,RESTClient发送请求的过程对Go语言的标准库net/http进行了封装,由Do—>request函数实现,源码如下:

源码路径:k8s.io\client-go\rest\request.go

func (r *Request) Do(ctx context.Context) Result {
    var result Result
    err := r.request(ctx, func(req *http.Request, resp *http.Response) {
        result = r.transformResponse(resp, req)
    })
    if err != nil {
        return Result{err: err}
    }
    return result
}

其中,r.transformResponse(resp, req)作为传入函数的函数体;是真正对result对象进行解析的函数。request函数源码如下:

源码路径:k8s.io\client-go\rest\request.go

func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
    ......
    // Right now we make about ten retry attempts if we get a Retry-After response.
    retries := 0
    for {
        url := r.URL().String()
        //调用Go标准库net/http进行request构建
        req, err := http.NewRequest(r.verb, url, r.body)
        if err != nil {
            return err
        }
        req = req.WithContext(ctx)
        req.Header = r.headers
        ......
        // Do发送request请求并接受resp
        resp, err := client.Do(req)
        ......
        if err != nil {
            // "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
            // Thus in case of "GET" operations, we simply retry it.
            // We are not automatically retrying "write" operations, as
            // they are not idempotent.
            if r.verb != "GET" {
                return err
            }
            // For connection errors and apiserver shutdown errors retry.
            if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
                // For the purpose of retry, we set the artificial "retry-after" response.
                // TODO: Should we clean the original response if it exists?
                resp = &http.Response{
                    StatusCode: http.StatusInternalServerError,
                    Header:     http.Header{"Retry-After": []string{"1"}},
                    Body:       ioutil.NopCloser(bytes.NewReader([]byte{})),
                }
            } else {
                return err
            }
        }
        done := func() bool {
            defer func() {
                ......
                //Body.close()在defer函数中,保证函数退出时,对其进行关闭,防止内存溢出
                resp.Body.Close()
            }()

            ......
            //fn函数即为之前传入的transformResponse函数,将结果转为资源对象
            fn(req, resp)
            return true
        }()
        ......
    }
}

2.3 ClientSet客户端

RESTClient是最基础的客户端,使用时需要指定Resource和Version等信息,编写代码时需要提前知道Resource所在的Group和对应的Version信息。ClientSet相比而言使用更加便捷,一般情况,对Kubernetes进行二次开发时通常使用ClientSet。 ClientSet在RESTClient的基础上封装了对Resource和Version的管理方法,每个Resource可以理解为一个客户端,而ClientSet则是多个客户端的集合,每个Resource和Version都以函数的方式暴露给开发者。

注意:ClientSet仅能访问Kubernetes自身的内置资源,不能直接访问CRD自定义资源;如果需要使用ClientSet访问CRD,则需要通过client-gen代码生成器重新生成ClientSet;DynamicClient可以访问CRD资源

2.3.1 代码练习

func TestClientSet(t *testing.T) {
    config, err := clientcmd.BuildConfigFromFlags("", "F:\\code\\env\\config")
    if err != nil {
        fmt.Printf("The kubeconfig cannot be loaded: %v\n", err)
        panic(err)
    }
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }
    //请求core核心资源组v1资源版本下的pod资源对象,其内部设置了APIPath请求的HTTP路径
    //GroupVersion请求的资源组、资源版本,NegotiatedSerializer数据的编解码器
    //Pods函数是个资源接口对象,用于管理Pod资源对象的管理,包括Create、Update、Delete、Get、List、Watch、Pathc等操作
    podCliet := clientset.CoreV1().Pods(corev1.NamespaceDefault)
    list, err := podCliet.List(context.Background(), metav1.ListOptions{Limit: 500})
    for _, d := range list.Items {
        fmt.Printf("NAMESPACE:%v \t NAME:%v \t STATU:%v\n", d.Namespace, d.Name, d.Status.Phase)
    }
}

2.3.2 源码分析

  1. 追踪kubernetes.NewForConfig(config)函数,可以看到,针对不同的资源类型,都有一个NewForConfig函数,源码如下:

源码路径:k8s.io\client-go\kubernetes\clientset.go

func NewForConfig(c *rest.Config) (*Clientset, error) {
    ......
    var cs Clientset
    ......
    cs.appsV1, err = appsv1.NewForConfig(&configShallowCopy)
    ......
    cs.coreV1, err = corev1.NewForConfig(&configShallowCopy)
    ......
    return &cs, nil
}

继续进入一层的NewForConfig中,调用的是rest.RESTClientFor(&config)来生成客户端。源码如下:

func NewForConfig(c *rest.Config) (*AppsV1Client, error) {
    ......
    client, err := rest.RESTClientFor(&config)
    ......              
}
  1. 追踪Pods函数的代码,可以看到返回的pods函数对象中的client为RESTClient,源码如下:

源码路径:k8s.io\client-go\kubernetes\typed\core\v1\pod.go

func newPods(c *CoreV1Client, namespace string) *pods {
    return &pods{
        //返回的客户端依然是RESTClient
        client: c.RESTClient(),
        ns:     namespace,
    }
}
  1. 追踪的Pods函数的List方法源码如下:

源码路径:k8s.io\client-go\kubernetes\typed\core\v1\pod.go

func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
    var timeout time.Duration
    if opts.TimeoutSeconds != nil {
        timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
    }
    result = &v1.PodList{}
    //可以看到依然是利用RESTClient进行调用,命名空间通过传值给到,而资源则写死为pods
    err = c.client.Get().
        Namespace(c.ns).
        Resource("pods").
        VersionedParams(&opts, scheme.ParameterCodec).
        Timeout(timeout).
        Do(ctx).
        Into(result)
    return
}

2.4 DynamicClient客户端

DynamicClient客户端是一种动态客户端,可以对任意的Kubernetes资源进行RESTful操作,包括CRD资源。 DynamicClient内部实现了Unstructured,用于处理非结构化数据结构(即无法提前预知的数据结构),这也是DynamicClient能够处理CRD资源的关键。

DynamicClient不是类型安全的,因此在访问CRD自定义资源是要注意,例如,在操作不当时可能会导致程序崩溃。 DynamicClient的处理过程是将Resource(如PodList)转换成Unstructured结构类型,Kubernetes的所有Resource都可以转换为该结构类型。处理完后再将Unstructured转换成PodList。过程类似于Go语言的interface{}断言转换过程。另外,Unstructured结构类型是通过map[string]interface{}转换的。

2.4.1 代码练习

func TestDynamicClient(t *testing.T) {
    config, err := clientcmd.BuildConfigFromFlags("", "F:\\code\\env\\config")
    if err != nil {
        panic(err)
    }
    dynamicClient, err := dynamic.NewForConfig(config)
    if err != nil {
        panic(err)
    }
    gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
    //用于设置请求的资源组,资源版本,资源名称即命名空间;List函数用于获取Pod列表
    unstructObj, err := dynamicClient.Resource(gvr).Namespace(corev1.NamespaceDefault).
        List(context.Background(), metav1.ListOptions{Limit: 500})
    if err != nil {
        panic(err)
    }
    podList := &corev1.PodList{}
    //通过runtime的函数将unstructured.UnstructuredList转换为PodList
    err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), podList)
    if err != nil {
        panic(err)
    }
    for _, d := range podList.Items {
        fmt.Printf("NAMESPACE:%v \t NAME:%v \t STATU:%v\n", d.Namespace, d.Name, d.Status.Phase)
    }
}

2.5 DiscoveryClient客户端

DiscoveryClient是发现客户端,主要用于发现Kubenetes API Server所支持的资源组、资源版本、资源信息。 kubectl的api-versions和api-resources命令输出也是通过DiscoveryClient实现的。其同样是在RESTClient的基础上进行的封装。DiscoveryClient还可以将资源组、资源版本、资源信息等存储在本地,用于本地缓存,减轻对kubernetes api sever的访问压力,缓存信息默认存储在:~/.kube/cache和~/.kube/http-cache下。

2.5.1 代码练习

func TestDiscoveryClient(t *testing.T) {
    config, err := clientcmd.BuildConfigFromFlags("", "F:\\code\\env\\config")
    if err != nil {
        panic(err)
    }
    discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
    if err != nil {
        panic(err)
    }
    _, APIResourceList, err := discoveryClient.ServerGroupsAndResources()
    if err != nil {
        panic(err)
    }
    for _, list := range APIResourceList {
        gv, err := schema.ParseGroupVersion(list.GroupVersion)
        if err != nil {
            panic(err)
        }
        for _, resource := range list.APIResources {
            fmt.Printf("name:%v,group:%v,version:%v\n", resource.Name, gv.Group, gv.Version)
        }
    }
}

2.5.2获取Kubernetes API Server所支持的资源组、资源版本及资源信息

DiscoveryClient通过RESTClient分别请求/api和/apis接口,从而获取Kubernetes API Server的信息,其核心实现位于ServerGroupsAndResources —> ServerGroups中。

源码路径:k8s.io\client-go\discovery\discovery_client.go

func (d *DiscoveryClient) ServerGroups() (apiGroupList *metav1.APIGroupList, err error) {
    // Get the groupVersions exposed at /api
    // 在/api中获取groupVersions并将结构放于metav1.APIVersions
    v := &metav1.APIVersions{}
    err = d.restClient.Get().AbsPath(d.LegacyPrefix).Do(context.TODO()).Into(v)
    apiGroup := metav1.APIGroup{}
    if err == nil && len(v.Versions) != 0 {
        apiGroup = apiVersionsToAPIGroup(v)
    }
    if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
        return nil, err
    }
    // Get the groupVersions exposed at /apis
    // 在/apis中获取groupVersions并将结构放于metav1.APIGroupList
    apiGroupList = &metav1.APIGroupList{}
    err = d.restClient.Get().AbsPath("/apis").Do(context.TODO()).Into(apiGroupList)
    if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
        return nil, err
    }
    // to be compatible with a v1.0 server, if it's a 403 or 404, ignore and return whatever we got from /api
    if err != nil && (errors.IsNotFound(err) || errors.IsForbidden(err)) {
        apiGroupList = &metav1.APIGroupList{}
    }
    // prepend the group retrieved from /api to the list if not empty
    //将api接口检索到的资源组信息合并到apiGroupList列表中
    if len(v.Versions) != 0 {
        apiGroupList.Groups = append([]metav1.APIGroup{apiGroup}, apiGroupList.Groups...)
    }
    return apiGroupList, nil
}

2.5.3 本地缓存的DiscoveryClient

DiscoveryClient可将资源相关信息存储在本地,默认每10分钟与kubernetes api server同步一次。 DiscoveryClient第一次获取资源组,资源版本,资源信息时,首先会查询本地缓存,如果数据不存在(没有命中)则请求Kubernetes API Server接口(回源),Cache将Kubernetes API Server响应的数据存储在本地一份并返回给DiscoveryClient。当下一次DiscoveryClient再次获取资源信息时,会将数据直接从本地缓存返回(命中)给DiscoveryClient。本地缓存的默认存储周期为10分钟。

流程如图所示:

源码路径:k8s.io\client-go\discovery\cached\disk\cached_discovery.go

func (d *CachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {
    filename := filepath.Join(d.cacheDirectory, "servergroups.json")
    // 获取缓存Cache中的servergroups.json文件信息
    cachedBytes, err := d.getCachedFile(filename)
    // don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
    if err == nil {
        cachedGroups := &metav1.APIGroupList{}
        if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedGroups); err == nil {
            klog.V(10).Infof("returning cached discovery info from %v", filename)
            //将文件中的信息反序列化后转为cachedGroup对象并返回
            return cachedGroups, nil
        }
    }
    //如果没有命中返回,则从api-server中去获取信息
    liveGroups, err := d.delegate.ServerGroups()

    ......
    // 将最终的liveGroups回写到文件中
    if err := d.writeCachedFile(filename, liveGroups); err != nil {
        klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
    }
    //返回liveGroups
    return liveGroups, nil
}

通过源码可知,DiscoveryClient是通过创建的Client是DiscoveryClient还是CachedDiscoveryClient来决定是从缓存中读取还是从api-server中进行读取。

Last updated