0%

项目上有一个性能优化的需求,需要进一步优化远程调用Redis导致的RT偏高问题,目前的想法是做两级缓存,通过加入一个基于嵌入式数据库引擎的本地缓存,来降低对远端redis的依赖;大致的结构如下:

业务服务 —> 本地数据库引擎 -> 远端redis

之所以用数据库引擎,是因为在满足QPS的情况下,想尽可能本地化更多的数据,直接用内存太奢侈了。

对于读操作:优先读取本地,本地没有的话,再读区远端redis,远端的值会写回到本地

对于写操作:直接写本地,然后后台线程异步同步至远端,其中业务逻辑可以容忍短时的数据不一致情况。

WiredTiger是mongodb收购过来的一个存储引擎,从2.3版本之后,便是mongodb的默认引擎。

除了WiredTiger之外,一些本地的嵌入式数据库引擎还有:地址

相比Google的LevelDb引擎的性能对比:链接

本文主要记录WiredTiger安装的流程,以及一个简单的Java操作示例,安装的环境:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$system_profiler SPSoftwareDataType
Software:

System Software Overview:

System Version: macOS 10.14.6 (18G48f)
Kernel Version: Darwin 18.7.0
Boot Volume: Untitled
Boot Mode: Normal
Computer Name: yuanke’s MacBook Ali
User Name: yuanke wei (yuankewei)
Secure Virtual Memory: Enabled
System Integrity Protection: Enabled
Time since boot: 17 days 5:20

(1)git下载相关的代码,并切换到稳定分支

1
2
3
4
5
$git clone git://github.com/wiredtiger/wiredtiger.git

$cd wiredtiger/

$git checkout -b mongodb-4.4 origin/mongodb-4.4

(2)安装相关编译工具

1
2
#采用brew安装
$brew install autoconf automake libtool swig

(3)开始编译

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$./autogen.sh

#mac下查看JAVA_HOME的命令为:
$/usr/libexec/java_home

#**编辑生成的configure文件,
17620行开始,替换

case "$host_os" in
darwin*) _JTOPDIR=`echo "$_JTOPDIR" | sed -e 's:/[^/]*$::'`
_JINC="$_JTOPDIR/Headers";;
*) _JINC="$_JTOPDIR/include";;
esac



case "$host_os" in
*) _JINC="$_JTOPDIR/include";;
esac

$./configure --enable-java --enable-python

#开始编译
$make

#安装相关库文件到/usr/local/
$make install

(4)采用idea建立一个maven工程

1
2
3
4
5
6
7
8
9
10
#wiredtiger安装之后,java的jar包和jni本地库的位置为:
$ls /usr/local/share/java/wiredtiger-3.2.0/
libwiredtiger_java.0.dylib libwiredtiger_java.dylib wiredtiger.jar
libwiredtiger_java.a libwiredtiger_java.la

编辑idea的工程,引入对应的jar包
Project Settings -> Libraries

另外,运行测试程序时,还需要加入jni本地库的路径,idea的jvm运行参数重加入:
-Djava.library.path=/usr/local/share/java/wiredtiger-3.2.0

完整的测试程序如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import com.wiredtiger.db.*;

public class Main {
public static void main(String[] args) {
Connection conn;
Session s;
Cursor c;

try {
//保证目录是存在的/Users/yuankewei/temp/WT_HOME
conn = wiredtiger.open("/Users/yuankewei/temp/WT_HOME", "create");
s = conn.open_session(null);
} catch (WiredTigerException wte) {
System.err.println("WiredTigerException: " + wte);
return;
}

/*! [access example connection] */
try {
/*! [access example table create] */
//创建了一个table,key是一个String,Value也是一个String
s.create("table:t", "key_format=S,value_format=S");
/*! [access example table create] */
/*! [access example cursor open] */
c = s.open_cursor("table:t", null, null);
/*! [access example cursor open] */
} catch (WiredTigerException wte) {
System.err.println("WiredTigerException: " + wte);
return;
}

/*! [access example cursor insert] */
//插入一个数据
try {
c.putKeyString("foo");
c.putValueString("bar");
c.insert();
} catch (WiredTigerPackingException wtpe) {
System.err.println("WiredTigerPackingException: " + wtpe);
} catch (WiredTigerException wte) {
System.err.println("WiredTigerException: " + wte);
}

/*! [access example cursor insert] */
//再插入一行数据
try {
c.putKeyString("foo1");
c.putValueByteArray("bar1".getBytes());
c.insert();
} catch (WiredTigerPackingException wtpe) {
System.err.println("WiredTigerPackingException: " + wtpe);
} catch (WiredTigerException wte) {
System.err.println("WiredTigerException: " + wte);
}

//根据key查询value
try {
c.reset();
c.putKeyString("foo1");
c.search();
System.out.println(c.getValueString());
} catch (Exception e) {
}



/*! [access example cursor insert] */
/*! [access example cursor list] */
//遍历数据
try {
c.reset();
while (c.next() == 0) {
System.out.println("Got: " + c.getKeyString() + ", " + new String(c.getValueByteArray()));

}
} catch (WiredTigerPackingException wtpe) {
System.err.println("WiredTigerPackingException: " + wtpe);
} catch (WiredTigerException wte) {
System.err.println("WiredTigerException: " + wte);
}
}
}

程序的数据为:

1
2
3
bar1
Got: foo, bar
Got: foo1, bar1

kube-scheduler是k8s中相对比较简单的一个服务,它监听api server获取新建的Pod,从众多的Node中选择一个合适的,来运行该Pod。

选择的过程分两个阶段:预选阶段 & 优选阶段

  • 预选阶段:根据Pod创建的要求,筛选出所有符合要求的Node,通过该阶段的Node理论上都可以运行目标Pod
  • 优选阶段:给上一步筛选出来的Node打分,选择一个分数最高的Node

本文简单的跟进一下kube-scheduler执行的整个流程。

入口代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#cmd/kube-scheduler/app/server.go:62
#同样基于cobra包开发,
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary.`,
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, args, opts); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}


// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
//构建调度所需的配置文件:主要包括kubeclient、eventclient、
cc, err := opts.Config()

stopCh := make(chan struct{})

//根据当前的feature gates对调度的算法做一些裁剪
// Apply algorithms based on feature gates.
// TODO: make configurable?
algorithmprovider.ApplyFeatureGates()

//启动调度服务
return Run(cc, stopCh)
}

继续看Run函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Run executes the scheduler based on the given configuration. It only return on error or when stopCh is closed.
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
// Create the scheduler.
//构造一个scheduler,(Scheduler),
//构造调度的预选策略列表、优选策略列表、为各个informer关联handler处理函数
sched, err := scheduler.New(cc.Client, ...,

// Start all informers.
//启动各个infomer,监听相关的变化
go cc.PodInformer.Informer().Run(stopCh)
cc.InformerFactory.Start(stopCh)

// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(stopCh)

// Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}

ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
defer cancel()

go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()


// Leader election is disabled, so runCommand inline until done.
run(ctx)
return fmt.Errorf("finished without leader elect")
}

这里边重要的函数就两个,

一个是scheduler.New() 构建了一个Scheduler对象,关联了各个informer的动作;

一个是run(ctx),启动调度服务,run(ctx)最终会调用函数:scheduleOne;

先看下scheduleOne函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
//从待调度队列中拿到一个需要调度的Pod
pod := sched.config.NextPod()
// pod could be nil when schedulerQueue is closed
if pod == nil {
return
}

//采用调度算法选择一个合适的Node来运行该Pod
scheduleResult, err := sched.schedule(pod)

assumedPod := pod.DeepCopy()

#根据调度结果scheduleResult, 将pod绑定到对应的Node上
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#coding: utf-8
import tensorflow as tf
from numpy.random import RandomState

#每轮训练输入的数据量
batch_size = 8

'''
含有一个隐藏层的神经网络
输入层 隐藏层 输出层
A1



(X1)


A2 Y


(X2)

A3
'''

#w1是输入层和隐藏层之前的全连接权重, 相应的w2是隐藏层和输出层之间的连接权重
#这里采用tensorflow内置的随机正态分布对连接权重进行初始化
w1 = tf.Variable(tf.random_normal([2, 3], stddev=1, seed=1))
w2 = tf.Variable(tf.random_normal([3, 1], stddev=1, seed=1))

#定义输入数据的占位
x = tf.placeholder(tf.float32, shape=(None, 2), name="x-input")
y_= tf.placeholder(tf.float32, shape=(None, 1), name="y-input")

#定义神经网络的前向传播过程
a = tf.matmul(x, w1)
y = tf.matmul(a, w2)

#采用sigmoid激活函数
y = tf.sigmoid(y)

#采用交叉墒损失函数(y 和 y_ 之前的差距,其中y是神经网络的预测值,y_是实际值)
#clip_by_value对变量的值做上下限处理,将其限制在某个范围内
val = y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)) + (1 - y_) * tf.log(tf.clip_by_value(1-y, 1e-10, 1.0))
cross_entropy = -tf.reduce_mean(val)

#反向传播优化方法,除了AdamOptimizer之外,常用的还有GradientDescentOptimizer MomentumOptimizer
#0.001是学习率,学习的目标就是最小化损失函数
train_step = tf.train.AdamOptimizer(0.001).minimize(cross_entropy)

#接下来我们造一点训练数据
rdm = RandomState(1)
dataset_size = 128 #128 组训练数据
X = rdm.rand(dataset_size, 2) #随机出来的训练数据输入
Y = [[int(x1+x2<1)] for (x1,x2) in X] #训练数据期望的输出

#运行整个训练过程
with tf.Session() as sess:
sess.run(tf.global_variables_initializer()) #初始化所有的变量

#变量初始化之后,看下w1和w2的当前值
print sess.run(w1)
print sess.run(w2)

#迭代5000次
for i in range(5000):
#每次采用batch_size组数据进行训练
start = (i*batch_size) % dataset_size
end = min(start + batch_size, dataset_size)

sess.run(train_step, feed_dict={x: X[start:end], y_:Y[start:end]})

#每100轮输出一下当前的训练结果:W1和W2和交叉墒
if i % 100 == 0:
total_entropy = sess.run(cross_entropy, feed_dict={x:X, y_:Y})
print("after %d training step, cross entropy is %g" % (i, total_entropy))

print sess.run(w1)
print sess.run(w2)



###
2019-07-13 12:05:25.725879: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA

[[-0.8113182 1.4845988 0.06532937]
[-2.4427042 0.0992484 0.5912243 ]]
[[-0.8113182 ]
[ 1.4845988 ]
[ 0.06532937]]
after 0 training step, cross entropy is 1.89805
after 100 training step, cross entropy is 1.62943
after 200 training step, cross entropy is 1.40099
after 300 training step, cross entropy is 1.19732
after 400 training step, cross entropy is 1.02375
after 500 training step, cross entropy is 0.887612
after 600 training step, cross entropy is 0.790222
after 700 training step, cross entropy is 0.727325
after 800 training step, cross entropy is 0.689437
after 900 training step, cross entropy is 0.667623
after 1000 training step, cross entropy is 0.655075
after 1100 training step, cross entropy is 0.647813
after 1200 training step, cross entropy is 0.643196
after 1300 training step, cross entropy is 0.639896
after 1400 training step, cross entropy is 0.637246
after 1500 training step, cross entropy is 0.635031
after 1600 training step, cross entropy is 0.633027
after 1700 training step, cross entropy is 0.631151
after 1800 training step, cross entropy is 0.629368
after 1900 training step, cross entropy is 0.627724
after 2000 training step, cross entropy is 0.626172
after 2100 training step, cross entropy is 0.624696
after 2200 training step, cross entropy is 0.623293
after 2300 training step, cross entropy is 0.622006
after 2400 training step, cross entropy is 0.620801
after 2500 training step, cross entropy is 0.619664
after 2600 training step, cross entropy is 0.618592
after 2700 training step, cross entropy is 0.617622
after 2800 training step, cross entropy is 0.616723
after 2900 training step, cross entropy is 0.615883
after 3000 training step, cross entropy is 0.615096
after 3100 training step, cross entropy is 0.614397
after 3200 training step, cross entropy is 0.613756
after 3300 training step, cross entropy is 0.61316
after 3400 training step, cross entropy is 0.612608
after 3500 training step, cross entropy is 0.612126
after 3600 training step, cross entropy is 0.611689
after 3700 training step, cross entropy is 0.611285
after 3800 training step, cross entropy is 0.610913
after 3900 training step, cross entropy is 0.610594
after 4000 training step, cross entropy is 0.610309
after 4100 training step, cross entropy is 0.610046
after 4200 training step, cross entropy is 0.609804
after 4300 training step, cross entropy is 0.609603
after 4400 training step, cross entropy is 0.609423
after 4500 training step, cross entropy is 0.609258
after 4600 training step, cross entropy is 0.609106
after 4700 training step, cross entropy is 0.608983
after 4800 training step, cross entropy is 0.608874
after 4900 training step, cross entropy is 0.608772
[[ 0.02476983 0.56948674 1.6921941 ]
[-2.1977348 -0.23668921 1.1143895 ]]
[[-0.45544702]
[ 0.49110925]
[-0.98110336]]

kubernetes的Client库——go-client中提供了如下三种类型的client

ClientSet:可以访问集群中所有的原生资源,如pods、deployment等,是最常用的一种

dynamicClient: 可以处理集群中所有的资源,包括crd(自定义资源),另外它的返回是一个map[string]interface{}类型;目前主要用在garbage collector和namespace controller中。

RestClient:前面两种client的基础, 更为底层一些。

相关的示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package main

import (
"flag"
"fmt"
"k8s.io/api/core/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

var (
//集群配置文件路径
kubeconfigStr = flag.String("kubeconfig", "default value", "kubernetes config file")
)

func main() {
//解析参数
flag.Parse()

testClientSet()

fmt.Println("\nrest")

testRestClient()

fmt.Println("\n.....")
testDynamicClient()
}

func testRestClient() {
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfigStr)
if err != nil {
panic(err)
}

//原生接口都在/api下,扩展接口在/apis下
config.APIPath = "/api"
//pods资源相关的group为空
config.GroupVersion = &schema.GroupVersion{
Group: "",
Version: "v1",
}
//序列化方式,目前json和protocal buf
config.ContentType = runtime.ContentTypeJSON
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}

restClient, err := rest.RESTClientFor(config)
if err != nil {
panic(err)
}

podList := v1.PodList{}
//除了Do()方法之外,还有DoRaw(),返回原始的bytes; Do()会做一下类型的转化
restClient.Get().Resource("pods").Namespace("").Do().Into(&podList)

fmt.Println(podList)
}

func testClientSet() {
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfigStr)
if err != nil {
panic(err)
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}

//一行代码指定group、version、resource、以及动作
podList, err := clientset.CoreV1().Pods("").List(v12.ListOptions{})

fmt.Println(podList.Items)
}

func testDynamicClient() {
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfigStr)
if err != nil {
panic(err)
}

dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err)
}

//指定group、version以及要访问的资源
testrcGVR := schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}

unstr, err := dynamicClient.Resource(testrcGVR).List(v12.ListOptions{})

fmt.Println(unstr)
}