#### 2.1.2 加载kubeconfig配置信息
**源码路径**:k8s.io\client-go\tools\clientcmd\loader.go
```go
func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {
......
kubeConfigFiles := []string{}
// Make sure a file we were explicitly told to use exists
if len(rules.ExplicitPath) > 0 {
if _, err := os.Stat(rules.ExplicitPath); os.IsNotExist(err) {
return nil, err
}
// 通过文件路径获取kubeconfig配置信息 -> rules.ExplicitPath
kubeConfigFiles = append(kubeConfigFiles, rules.ExplicitPath)
} else {
// 通过环境变量获取kubeconfig配置信息,rules.Precedence...可指定多个路径
kubeConfigFiles = append(kubeConfigFiles, rules.Precedence...)
}
kubeconfigs := []*clientcmdapi.Config{}
// read and cache the config files so that we only look at them once
for _, filename := range kubeConfigFiles {
......
// 读取数据并把读取的数据反序列化到Config对象中
config, err := LoadFromFile(filename)
......
}
.......
}
**源码路径**:k8s.io\client-go\tools\clientcmd\loader.go
```go
// Load takes a byte slice and deserializes the contents into Config object.
// Encapsulates deserialization without assuming the source is a file.
func Load(data []byte) (*clientcmdapi.Config, error) {
config := clientcmdapi.NewConfig()
......
// 执行反序列化操作
decoded, _, err := clientcmdlatest.Codec.Decode(data, &schema.GroupVersionKind{Version: clientcmdlatest.Version, Kind: "Config"}, config)
......
return decoded.(*clientcmdapi.Config), nil
}
2.1.3 合并多个kubeconfig配置信息
多个配置信息合并如图所示:
源码路径:k8s.io\client-go\tools\clientcmd\loader.go
func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {
......
// read and cache the config files so that we only look at them once
for _, filename := range kubeConfigFiles {
......
config, err := LoadFromFile(filename)
......
// first merge all of our maps
mapConfig := clientcmdapi.NewConfig()
for _, kubeconfig := range kubeconfigs {
mergo.MergeWithOverwrite(mapConfig, kubeconfig)
}
// merge all of the struct values in the reverse order so that priority is given correctly
// errors are not added to the list the second time
nonMapConfig := clientcmdapi.NewConfig()
for i := len(kubeconfigs) - 1; i >= 0; i-- {
kubeconfig := kubeconfigs[i]
mergo.MergeWithOverwrite(nonMapConfig, kubeconfig)
}
// since values are overwritten, but maps values are not, we can merge the non-map config on top of the map config and
// get the values we expect.
config := clientcmdapi.NewConfig()
mergo.MergeWithOverwrite(config, mapConfig)
mergo.MergeWithOverwrite(config, nonMapConfig)
......
}
func (r *Request) Do(ctx context.Context) Result {
var result Result
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
})
if err != nil {
return Result{err: err}
}
return result
}
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
......
// Right now we make about ten retry attempts if we get a Retry-After response.
retries := 0
for {
url := r.URL().String()
//调用Go标准库net/http进行request构建
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {
return err
}
req = req.WithContext(ctx)
req.Header = r.headers
......
// Do发送request请求并接受resp
resp, err := client.Do(req)
......
if err != nil {
// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
// Thus in case of "GET" operations, we simply retry it.
// We are not automatically retrying "write" operations, as
// they are not idempotent.
if r.verb != "GET" {
return err
}
// For connection errors and apiserver shutdown errors retry.
if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
// For the purpose of retry, we set the artificial "retry-after" response.
// TODO: Should we clean the original response if it exists?
resp = &http.Response{
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{"1"}},
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}
} else {
return err
}
}
done := func() bool {
defer func() {
......
//Body.close()在defer函数中,保证函数退出时,对其进行关闭,防止内存溢出
resp.Body.Close()
}()
......
//fn函数即为之前传入的transformResponse函数,将结果转为资源对象
fn(req, resp)
return true
}()
......
}
}
DiscoveryClient是发现客户端,主要用于发现Kubenetes API Server所支持的资源组、资源版本、资源信息。 kubectl的api-versions和api-resources命令输出也是通过DiscoveryClient实现的。其同样是在RESTClient的基础上进行的封装。DiscoveryClient还可以将资源组、资源版本、资源信息等存储在本地,用于本地缓存,减轻对kubernetes api sever的访问压力,缓存信息默认存储在:~/.kube/cache和~/.kube/http-cache下。
2.5.1 代码练习
func TestDiscoveryClient(t *testing.T) {
config, err := clientcmd.BuildConfigFromFlags("", "F:\\code\\env\\config")
if err != nil {
panic(err)
}
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err)
}
_, APIResourceList, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
panic(err)
}
for _, list := range APIResourceList {
gv, err := schema.ParseGroupVersion(list.GroupVersion)
if err != nil {
panic(err)
}
for _, resource := range list.APIResources {
fmt.Printf("name:%v,group:%v,version:%v\n", resource.Name, gv.Group, gv.Version)
}
}
}
2.5.2获取Kubernetes API Server所支持的资源组、资源版本及资源信息
DiscoveryClient通过RESTClient分别请求/api和/apis接口,从而获取Kubernetes API Server的信息,其核心实现位于ServerGroupsAndResources —> ServerGroups中。
func (d *DiscoveryClient) ServerGroups() (apiGroupList *metav1.APIGroupList, err error) {
// Get the groupVersions exposed at /api
// 在/api中获取groupVersions并将结构放于metav1.APIVersions
v := &metav1.APIVersions{}
err = d.restClient.Get().AbsPath(d.LegacyPrefix).Do(context.TODO()).Into(v)
apiGroup := metav1.APIGroup{}
if err == nil && len(v.Versions) != 0 {
apiGroup = apiVersionsToAPIGroup(v)
}
if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
return nil, err
}
// Get the groupVersions exposed at /apis
// 在/apis中获取groupVersions并将结构放于metav1.APIGroupList
apiGroupList = &metav1.APIGroupList{}
err = d.restClient.Get().AbsPath("/apis").Do(context.TODO()).Into(apiGroupList)
if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
return nil, err
}
// to be compatible with a v1.0 server, if it's a 403 or 404, ignore and return whatever we got from /api
if err != nil && (errors.IsNotFound(err) || errors.IsForbidden(err)) {
apiGroupList = &metav1.APIGroupList{}
}
// prepend the group retrieved from /api to the list if not empty
//将api接口检索到的资源组信息合并到apiGroupList列表中
if len(v.Versions) != 0 {
apiGroupList.Groups = append([]metav1.APIGroup{apiGroup}, apiGroupList.Groups...)
}
return apiGroupList, nil
}
2.5.3 本地缓存的DiscoveryClient
DiscoveryClient可将资源相关信息存储在本地,默认每10分钟与kubernetes api server同步一次。 DiscoveryClient第一次获取资源组,资源版本,资源信息时,首先会查询本地缓存,如果数据不存在(没有命中)则请求Kubernetes API Server接口(回源),Cache将Kubernetes API Server响应的数据存储在本地一份并返回给DiscoveryClient。当下一次DiscoveryClient再次获取资源信息时,会将数据直接从本地缓存返回(命中)给DiscoveryClient。本地缓存的默认存储周期为10分钟。
func (d *CachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {
filename := filepath.Join(d.cacheDirectory, "servergroups.json")
// 获取缓存Cache中的servergroups.json文件信息
cachedBytes, err := d.getCachedFile(filename)
// don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
if err == nil {
cachedGroups := &metav1.APIGroupList{}
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedGroups); err == nil {
klog.V(10).Infof("returning cached discovery info from %v", filename)
//将文件中的信息反序列化后转为cachedGroup对象并返回
return cachedGroups, nil
}
}
//如果没有命中返回,则从api-server中去获取信息
liveGroups, err := d.delegate.ServerGroups()
......
// 将最终的liveGroups回写到文件中
if err := d.writeCachedFile(filename, liveGroups); err != nil {
klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
}
//返回liveGroups
return liveGroups, nil
}