0%

PodWorkers是最终“同步”pod状态的一块逻辑,
“同步”在这里的含义是:确保kubelet所在节点的Pod状态和etcd中的状态一致,该增加的增加,该删除的删除,该更新的更新。

“同步”动作的触发有几个方式:
(1)通过文件、apiserver、http方式监听到的变化
(2)定时器触发,例如每隔10s

PodWorkers的入口是UpdatePod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type podWorkers struct {
// key是pod的ID,value是一个chan,会有一个gorouting监听该chann
// 死循环一般的处理该Pod上的所有变动
podUpdates map[types.UID]chan UpdatePodOptions

// 记录每个pod对应的gorouting当前的工作状态
isWorking map[types.UID]bool

// 每个Pod对应的gorouting处理完相关逻辑之后,会把Pod的ID塞入
// workQueue。定时器会触发kubelet,kubelet会从workQueue里
// 获取需要同步的Pod,然后调用PodWorkers的UpdatePod方法,
// 触发新一轮的同步
workQueue queue.WorkQueue
}

简单的示意图
在这里插入图片描述

参考前一篇文章

目前的docker已经不是之前的docker了,技术栈进行了分层。
docker cli -> dockerd -> containerd -> oci implementation

OCI规范,简单来说,包含容器规范和镜像规范,具体参考:
link
由于引入了OCI,我们可以近乎透明无缝的替换 “docker run”命令 之下的具体实现。
根据自己业务场景的需求,可以选择高性能的容器运行时,也可以选择性能不那么高但安全性更好的运行时。

目前符合OCI规范的容器运行时有:

  • runc, docker默认的运行时,与宿主机共享内核,利用cgroup做资源隔离,安全性不是很高,由于内核共享,性能最好
  • runv,基于hypervisor的容器运行时,有自己单独的内核,安全性好很多
  • kata,被蚂蚁收了,号称“容器的速度,虚拟机的安全”,貌似是基于runv做的
  • gvisor,谷歌搞的,比runc安全,比VM性能要好。有一个用户空间的内核,会拦截应用程序的系统调用,目前没有实现所有的系统调用,因此不是所有的应用都可以运行

目前docker默认使用的容器运行时runc。

docker命令行有一个参数:runtime,可以制定底层的容器运行时,如

1
docker run --runtime=runc hello-world

runc官方有一个实例,介绍如何用runc运行一个容器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
EXAMPLE:
To run docker's hello-world container one needs to set the args parameter
in the spec to call hello. This can be done using the sed command or a text
editor. The following commands create a bundle for hello-world, change the
default args parameter in the spec from "sh" to "/hello", then run the hello
command in a new hello-world container named container1:

mkdir hello
cd hello
docker pull hello-world
docker export $(docker create hello-world) > hello-world.tar
mkdir rootfs
tar -C rootfs -xf hello-world.tar
runc spec
sed -i 's;"sh";"/hello";' config.json
runc run container1

网上有一个runc 和 runv 运行示例的文章,参考: 链接

说回到kubernetes,k8s抽象出了一个CRI接口。

kubelet 通过该接口与底层不同的容器运行时进行交互,从而实现解耦。
其中的关系如下图
在这里插入图片描述
kubelet通过gRPC方式调用CRI的相关接口,CRI shim是该接口的一个实现。

kubelet有一个选项“–container-runtime”,
默认为docker,可以理解为非CRI模式。
设置为remote的时候,可以认为是启用了CRI模式,通过另外一个选项
–container-runtime-path-endpoint= 来指定CRI服务的地址,一般是一个Linux 本地Socket。
kubelet会通过指定的CRI地址来进行容器的管理。

这个图看着可能会比较清楚
在这里插入图片描述

项目中有一个需求:给线程池提交任务的时候,如果任务队列已满,需要ThreadPoolExecutor.execute调用阻塞等待。google了相关的资料,记录在这里,供有同样需求的同行参考。

https://stackoverflow.com/questions/4521983/java-executorservice-that-blocks-on-submission-after-a-certain-queue-size/4522411#4522411

ThreadPoolExecutor相关的几个点:
(1)execute提交任务的时候,会调用指定队列的offer方法,如果offer方法返回失败,则表示队列已满。如果此时,corePoolSize < maximumPoolSize 会发起新的线程执行新提交的任务;如果 corePoolSize == maximumPoolSize, 则任务提交失败,会调用RejectedExecutionHandler处理
(2)java提供了四个内置的RejectedExecutionHandler, 如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }

/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }

/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

解法1: 重写队列的offer方法,让其变为一个阻塞调用
使用这种方法时,线程数最多只能到corePoolSize个,相当于maximumPoolSize的设置无效;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 重写offer为阻塞操作
*/
private static class MyBlockingQueue<T> extends LinkedBlockingQueue<T> {

public MyBlockingQueue(int size) {
super(size);
}

@Override
public boolean offer(T t) {
try {
put(t);
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return false;
}
}

/**
* 固定线程池, block when queue is filled up
*/
public static ThreadPoolExecutor newFixedThreadPool(int nThreads, int queueSize, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(
nThreads, nThreads, 0L, TimeUnit.SECONDS, new MyBlockingQueue<>(queueSize), threadFactory);
}

解法2: 基于java信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;

public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}

public void submitTask(final Runnable command)
throws InterruptedException, RejectedExecutionException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
}

本文跟踪一下kubelet启动过程中,参数的默认值是如何注入的。

我们知道,为了启动kubelet服务,首先要构造kubelet的配置对象,即kubeletconfig.KubeletConfiguration结构体,

1
2
3
4
5
6
7
8
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
kubeletFlags := options.NewKubeletFlags()
#返回一个带有默认值的kubelet配置对象
kubeletConfig, err := options.NewKubeletConfiguration()
// programmer error
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// NewKubeletConfiguration will create a new KubeletConfiguration with default values
func NewKubeletConfiguration() (*kubeletconfig.KubeletConfiguration, error) {
//构造了一个针对kubelet的Schema,schema是为了解决数据结构 序列化、反序列化
//以及多版本对象的兼容和转换问题引入的一个概念;Schema会登记资源对象到类型、类型到
//资源对象的双向映射;不同版本数据对象的转换函数;
//不同类型的默认初始化函数(设置default值);
scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
if err != nil {
return nil, err
}

//v1beta1版本的kubelet配置
versioned := &v1beta1.KubeletConfiguration{}
//设置各字段的默认值,大致的原理是:先拿到versioned的类型,根据该类型找到对应的初始
//化函数,然后调用初始化函数。
scheme.Default(versioned)

//无版本的kubelet配置
config := &kubeletconfig.KubeletConfiguration{}
//用v1beta1版本的配置去初始化无版本的kubelet配置,这里其实没有理解为什么要绕这么一下??
if err := scheme.Convert(versioned, config, nil); err != nil {
return nil, err
}

//设置其他配置项的默认值
applyLegacyDefaults(config)
return config, nil
}

构造kubelet schmea的函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// NewSchemeAndCodecs is a utility function that returns a Scheme and CodecFactory
// that understand the types in the kubeletconfig API group.
func NewSchemeAndCodecs() (*runtime.Scheme, *serializer.CodecFactory, error) {
//new了一个空的Schema
scheme := runtime.NewScheme()

//调用了register.go中的注册函数,针对资源:
//GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal}
//注册了两个类型:KubeletConfiguration 和 SerializedNodeConfigSource
if err := kubeletconfig.AddToScheme(scheme); err != nil {
return nil, nil, err
}

//调用v1beta1下的注册函数,为scheme设置了结构体默认值初始化函数,该初始化函数
//符合结构体 KubeletConfiguration各字段默认值的设置,
//具体的代码在v1beta1下的defaults.go文件中
if err := kubeletconfigv1beta1.AddToScheme(scheme); err != nil {
return nil, nil, err
}

codecs := serializer.NewCodecFactory(scheme)
return scheme, &codecs, nil
}

最下层的初始化函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfiguration) {
if obj.SyncFrequency == zeroDuration {
obj.SyncFrequency = metav1.Duration{Duration: 1 * time.Minute}
}
if obj.FileCheckFrequency == zeroDuration {
obj.FileCheckFrequency = metav1.Duration{Duration: 20 * time.Second}
}
if obj.HTTPCheckFrequency == zeroDuration {
obj.HTTPCheckFrequency = metav1.Duration{Duration: 20 * time.Second}
}
if obj.Address == "" {
obj.Address = "0.0.0.0"
}
if obj.Port == 0 {
obj.Port = ports.KubeletPort
}
if obj.Authentication.Anonymous.Enabled == nil {
obj.Authentication.Anonymous.Enabled = utilpointer.BoolPtr(false)
}
if obj.Authentication.Webhook.Enabled == nil {
obj.Authentication.Webhook.Enabled = utilpointer.BoolPtr(true)
}
if obj.Authentication.Webhook.CacheTTL == zeroDuration {
obj.Authentication.Webhook.CacheTTL = metav1.Duration{Duration: 2 * time.Minute}
}
if obj.Authorization.Mode == "" {
obj.Authorization.Mode = kubeletconfigv1beta1.KubeletAuthorizationModeWebhook
}
if obj.Authorization.Webhook.CacheAuthorizedTTL == zeroDuration {
obj.Authorization.Webhook.CacheAuthorizedTTL = metav1.Duration{Duration: 5 * time.Minute}
}
if obj.Authorization.Webhook.CacheUnauthorizedTTL == zeroDuration {
obj.Authorization.Webhook.CacheUnauthorizedTTL = metav1.Duration{Duration: 30 * time.Second}
}
if obj.RegistryPullQPS == nil {
obj.RegistryPullQPS = utilpointer.Int32Ptr(5)
}
if obj.RegistryBurst == 0 {
obj.RegistryBurst = 10
}

最后看下,scheme的具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// Scheme defines methods for serializing and deserializing API objects, a type
// registry for converting group, version, and kind information to and from Go
// schemas, and mappings between Go schemas of different versions. A scheme is the
// foundation for a versioned API and versioned configuration over time.
//
// In a Scheme, a Type is a particular Go struct, a Version is a point-in-time
// identifier for a particular representation of that Type (typically backwards
// compatible), a Kind is the unique name for that Type within the Version, and a
// Group identifies a set of Versions, Kinds, and Types that evolve over time. An
// Unversioned Type is one that is not yet formally bound to a type and is promised
// to be backwards compatible (effectively a "v1" of a Type that does not expect
// to break in the future).
//
// Schemes are not expected to change at runtime and are only threadsafe after
// registration is complete.
type Scheme struct {
// versionMap allows one to figure out the go type of an object with
// the given version and name.
/*
kubernetes中的类型对象都实现了Object接口,如下:
// Object interface must be supported by all API types registered with Scheme. Since objects in a scheme are
// expected to be serialized to the wire, the interface an Object must provide to the Scheme allows
// serializers to set the kind, version, and group the object is represented as. An Object may choose
// to return a no-op ObjectKindAccessor in cases where it is not expected to be serialized.
type Object interface {
GetObjectKind() schema.ObjectKind
DeepCopyObject() Object
}

通过该接口可以获取到某个对象关联的资源类型GroupVersionKind:所属的Group、版本以及资源名;
有了gvkToType和typeToGVK所提供的信息,就可以结局数据对象的序列化和反序列化问题。
*/
gvkToType map[schema.GroupVersionKind]reflect.Type

// typeToGroupVersion allows one to find metadata for a given go object.
// The reflect.Type we index by should *not* be a pointer.
typeToGVK map[reflect.Type][]schema.GroupVersionKind

// unversionedTypes are transformed without conversion in ConvertToVersion.
unversionedTypes map[reflect.Type]schema.GroupVersionKind

// unversionedKinds are the names of kinds that can be created in the context of any group
// or version
// TODO: resolve the status of unversioned types.
unversionedKinds map[string]reflect.Type

// Map from version and resource to the corresponding func to convert
// resource field labels in that version to internal version.
fieldLabelConversionFuncs map[schema.GroupVersionKind]FieldLabelConversionFunc

// defaulterFuncs is an array of interfaces to be called with an object to provide defaulting
// the provided object must be a pointer.
//初始化函数,每种类型都有自己的初始化函数
defaulterFuncs map[reflect.Type]func(interface{})

// converter stores all registered conversion functions. It also has
// default converting behavior.
converter *conversion.Converter

// versionPriority is a map of groups to ordered lists of versions for those groups indicating the
// default priorities of these versions as registered in the scheme
versionPriority map[string][]string

// observedVersions keeps track of the order we've seen versions during type registration
observedVersions []schema.GroupVersion

// schemeName is the name of this scheme. If you don't specify a name, the stack of the NewScheme caller will be used.
// This is useful for error reporting to indicate the origin of the scheme.
schemeName string
}

总的调用链路大概如下:
options.NewKubeletConfiguration() -> kubeletscheme.NewSchemeAndCodecs()(完成kubeletSchema的构建,包括类型的注册,初始化函数的注册) -> scheme.Default() -> SetDefaults_KubeletConfiguration()