Moon's blog

write the code, change the world.

Time Wheel-高效定时器

之前的文章中有提到过基于最小堆构建定时器的方式,这里介绍另一种更酷的实现–Time Wheel(时间轮)

数据结构分析

1348926970_9123.png

解释一下上图,环里是一个个的时间间隔,链表中是到达这个时间点后需要触发的定时事件。

举个例子,假设环中的一格代表1s。

如果我有一个需要延迟3s执行的任务,当前TimeWheel位于1处,那么就把它放入第4格的链表中。

如果需要延迟10s,那么10%8 + 1 = 3, 就放入3处。当然,我们需要记录一下,这个事件需要等TimeWheel第二次转到3处才能触发。

复杂度

底层结构 添加定时器 触发定时器 PerTickBookkeeping
最小堆 O(lg(n)) O(1) O(1)
时间轮 O(1) O(1) O(1)

缺点与优化

可以看出,如果我需要延迟1天执行,那么会创建一个很大的环形链表,虽然可以通过降低精度来减少内存消耗,但我们需要更好的方案。

我们可以组合多个timewheel,来减少内存开销,同时保持很高的效率

TimingWheels2.png

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package timewheel

import (
"container/list"
"time"
)

// @author qiang.ou<qingqianludao@gmail.com>

// Job 延时任务回调函数
type Job func(TaskData)

// TaskData 回调函数参数类型
type TaskData map[interface{}]interface{}

// TimeWheel 时间轮
type TimeWheel struct {
interval time.Duration // 指针每隔多久往前移动一格
ticker *time.Ticker
slots []*list.List // 时间轮槽
// key: 定时器唯一标识 value: 定时器所在的槽, 主要用于删除定时器, 不会出现并发读写,不加锁直接访问
timer map[interface{}]int
currentPos int // 当前指针指向哪一个槽
slotNum int // 槽数量
job Job // 定时器回调函数
addTaskChannel chan Task // 新增任务channel
removeTaskChannel chan interface{} // 删除任务channel
stopChannel chan bool // 停止定时器channel
}

// Task 延时任务
type Task struct {
delay time.Duration // 延迟时间
circle int // 时间轮需要转动几圈
key interface{} // 定时器唯一标识, 用于删除定时器
data TaskData // 回调函数参数
}

// New 创建时间轮
func New(interval time.Duration, slotNum int, job Job) *TimeWheel {
if interval <= 0 || slotNum <= 0 || job == nil {
return nil
}
tw := &TimeWheel{
interval: interval,
slots: make([]*list.List, slotNum),
timer: make(map[interface{}]int),
currentPos: 0,
job: job,
slotNum: slotNum,
addTaskChannel: make(chan Task),
removeTaskChannel: make(chan interface{}),
stopChannel: make(chan bool),
}

for i := 0; i < tw.slotNum; i++ {
tw.slots[i] = list.New()
}

return tw
}

// Start 启动时间轮
func (tw *TimeWheel) Start() {
tw.ticker = time.NewTicker(tw.interval)
go tw.start()
}

// Stop 停止时间轮
func (tw *TimeWheel) Stop() {
tw.stopChannel <- true
}

// AddTimer 添加定时器 key为定时器唯一标识
func (tw *TimeWheel) AddTimer(delay time.Duration, key interface{}, data TaskData) {
if delay <= 0 || key == nil {
return
}
tw.addTaskChannel <- Task{delay: delay, key: key, data: data}
}

// RemoveTimer 删除定时器 key为添加定时器时传递的定时器唯一标识
func (tw *TimeWheel) RemoveTimer(key interface{}) {
if key == nil {
return
}
tw.removeTaskChannel <- key
}

func (tw *TimeWheel) start() {
for {
select {
case <-tw.ticker.C:
tw.tickHandler()
case task := <-tw.addTaskChannel:
tw.addTask(&task)
case key := <-tw.removeTaskChannel:
tw.removeTask(key)
case <-tw.stopChannel:
tw.ticker.Stop()
return
}
}
}

func (tw *TimeWheel) tickHandler() {
l := tw.slots[tw.currentPos]
tw.scanAndRunTask(l)
if tw.currentPos == tw.slotNum-1 {
tw.currentPos = 0
} else {
tw.currentPos++
}
}

// 扫描链表中过期定时器, 并执行回调函数
func (tw *TimeWheel) scanAndRunTask(l *list.List) {
for e := l.Front(); e != nil; {
task := e.Value.(*Task)
if task.circle > 0 {
task.circle--
e = e.Next()
continue
}

go tw.job(task.data)
next := e.Next()
l.Remove(e)
delete(tw.timer, task.key)
e = next
}
}

// 新增任务到链表中
func (tw *TimeWheel) addTask(task *Task) {
pos, circle := tw.getPositionAndCircle(task.delay)
task.circle = circle

tw.slots[pos].PushBack(task)

tw.timer[task.key] = pos
}

// 获取定时器在槽中的位置, 时间轮需要转动的圈数
func (tw *TimeWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {
delaySeconds := int(d.Seconds())
intervalSeconds := int(tw.interval.Seconds())
circle = int(delaySeconds / intervalSeconds / tw.slotNum)
pos = int(tw.currentPos+delaySeconds/intervalSeconds) % tw.slotNum

return
}

// 从链表中删除任务
func (tw *TimeWheel) removeTask(key interface{}) {
// 获取定时器所在的槽
position, ok := tw.timer[key]
if !ok {
return
}
// 获取槽指向的链表
l := tw.slots[position]
for e := l.Front(); e != nil; {
task := e.Value.(*Task)
if task.key == key {
delete(tw.timer, task.key)
l.Remove(e)
}

e = e.Next()
}
}

参考

https://github.com/ouqiang/timewheel
https://www.ibm.com/developerworks/cn/linux/l-cn-timers/
http://www.lpnote.com/2017/11/16/hashed-and-hierarchical-timing-wheels
https://www.confluent.io/blog/apache-kafka-purgatory-hierarchical-timing-wheels/

awk like a boss!

linux提供了很多文本处理的屠龙刀,awk就是其中的佼佼者,称为一门面向文本的编程语言也不为过(awk中可以定义变量、进行运算,处理分支条件等等)。

如果早点学会awk,财务系统就不会写的这么费劲了,直接导出一份csv,写写awk脚本就好。

执行流程

完整的awd命令,可以由BEGIN BODY END三部分组成,其中BEGIN和END只会在awk执行前后运行一次,而BODY部分会每行不断处理。

QQ截图20180403123540.png

Example

下面是几个几个awk的例子

1
2
3
4
5
6
7
8
9
10
# 只有 body 区域:
awk -F: '{print $1}' /etc/passwd

# 同时具有 begin,body 和 end 区域:
awk –F: 'BEGIN{printf "username\n-------\n"}\
{ print $1 }\
END {print "----------" }' /etc/passwd

# 只有 begin 和 body 区域:
awk –F: 'BEGIN {print "UID"} {print $3}' /etc/passwd

我们以awk -F: '{print $1}' /etc/passwd为例来看分析一下

  1. -F:指定了以:作为分隔符
  2. ‘{ print $1 }’ 表示输出第一个
  3. /etc/passwd是输入的文件

这个命令会输出

1
2
3
4
5
6
7
8
9
10
11
root
daemon
bin
sys
sync
games
man
lp
mail
news
...

指定分隔符

默认的输入分隔符是空格,可以通过-F选项来指定输入字段分隔符,例如

1
2
3
awk -F: '{print $2}' /etc/passwd

awk -F, '{print $2}' example.csv

也可以通过设置OFS变量来指定输出字段分隔符

1
awk -F, 'BEGIN {OFS=":"} {print $2, $3}' employee.txt

当一行容纳了多组数据时,awk允许你指定数据之间的分隔符,输入记录分隔符

1
awk -F, 'BEGIN { RS=":" } {print $2}' employee-one-line.txt

在输出时,记录直接允许指定输出记录分隔符

1
awk 'BEGIN {FS=",";ORS="\n---\n"} {print $2,$3}' employee.txt

$1、$2这样的变量代表分割后的字符串数组中的位置, $0代表整行

Pattern Maching

当然,我们可以只处理一行中的某些部分,通过正则实现

1
2
3
4
5
6
7
8
9
10
11
# employee.txt
# Jason Smith IT Manager
# Jane Miller Sales Manager
# vagrant@z:~/awk$ cat employee.txt
# 101,John Doe,CEO
# 102,Jason Smith,IT Manager
# 103,Raj Reddy,Sysadmin
# 104,Anand Ram,Developer
# 105,Jane Miller,Sales Manager

awk -F, '/Manager/ {print $2, $3}' employee.txt

awk变量、运算

一旦涉及变量定义这种比较复杂的功能,我就偏向于写awk脚本,然后awk -f xx.awk somefile这样来执行,常见的数学操作(+-*/%,++ –)都支持。

1
2
3
4
5
6
7
8
9
10
11
BEGIN {
FS=",";
total=0;
}
{
print $2 "'s salary is: " $4;
total+=$4;
}
END {
print "----\nTotal Company Salary=$" total;
}

awk比较

awk支持以下比较

QQ截图20180403134602.png

可以理解成sql中的where条件

1
2
3
4
awk -F, '$5<=5' items.txt
awk -F "," '$4 < 900 || $5 <= 5' items.txt
awk -F ':' '$3 > maxuid { maxuid = $3; maxline = $0 } END { print maxuid,maxline }' /etc/passwd
awk -F ':' '$3 >= 100 && $NF ~ /\/bin\/sh/ ' /etc/passwd # $NF 代表最后一列, ~表示正则匹配, !~也就是不匹配咯

awk控制流

awk支持if,if\else,while、do-while都常见控制流,for(;;)当然也不在话下,连break、continue、exit都有哦。

1
2
3
4
5
6
7
8
9
10
11
12
cat dowhile.awk

{
i=2;
total=0;
do {
total = total + $i;
i++;
}
while(i<=NF)
print "Item",$1,":",total,"quantities sold";
}
1
echo "1 2 3 4" | awk '{ for (i=1;i<=NF;i++) total = total + $i } END { print total }'

awk关联数组

没错,这个就是lua中的关联数组的概念

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
$ cat array-assign.awk
BEGIN {
item[101]="HD Camcorder";
item[102]="Refrigerator";
item[103]="MP3 Player";
item[104]="Tennis Racket";
item[105]="Laser Printer";
item[1001]="Tennis Ball";
item[55]="Laptop";
item["na"]="Not Available";
print item["101"];
print item[102];
print item["103"];
print item[104];
print item["105"];
print item[1001];
print item["na"];
}
$ awk -f array-assign.awk
HD Camcorder
Refrigerator
MP3 Player
Tennis Racket
Laser Printer
Tennis Ball
Not Available

数组遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ cat array-for-loop.awk
BEGIN {
item[101]="HD Camcorder";
item[102]="Refrigerator";
item[103]="MP3 Player";
item[104]="Tennis Racket";
item[105]="Laser Printer";
item[1001]="Tennis Ball";
item[55]="Laptop";
item["no"]="Not Available";
for(x in item)
print item[x]
}
$ awk -f array-for-loop.awk
Not Available
Laptop
HD Camcorder
Refrigerator
MP3 Player
Tennis Racket
Laser Printer
Tennis Ball

数组删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ cat array-delete.awk
BEGIN {
item[101]="HD Camcorder";
item[102]="Refrigerator";
item[103]="MP3 Player";
item[104]="Tennis Racket";
item[105]="Laser Printer";
item[1001]="Tennis Ball";
item[55]="Laptop";
item["no"]="Not Available";
delete item[102]
item[103]=""
delete item[104]
delete item[1001]
delete item["na"]
for(x in item)
print "Index",x,"contains",item[x]
}
$ awk -f array-delete.awk
Index no contains Not Available
Index 55 contains Laptop
Index 101 contains HD Camcorder
Index 103 contains
Index 105 contains Laser Printer

awk支持的功能还有很多,这里不一一介绍,总之,凡是文本数据处理,用awk就没错了!

参考

https://www.thegeekstuff.com/sed-awk-101-hacks-ebook/

golang热更新的魔法

当我们写一个服务端程序的时候,在更新时可能不可避免的需要停止程序再重启,这里介绍一种非常酷的热更新实现,真正做到zero downtime。

思路

  1. 更换硬盘上的可执行程序
  2. 以相同的参数启动一个子进程,并把正在listen的fd传递给子进程
  3. 子进程通过这个fd进行listen,这样父子进程可以同时Accept连接
  4. 立马通知父进程停止接受连接,然后父进程gracefully shutdown

实现细节

POSIX提供了fork和exec调用来启动一个新进程,fork复制父进程,然后通过exec来替换自己要执行的程序。在go中,我们使用exec.Command或者os.StartProcess来达到类似效果。
在启动子进程时,需要让子进程知道,我正处于热更新过程中。通常使用环境变量或者参数来实现,例子中使用了-graceful这个参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
file := netListener.File() // this returns a Dup()
path := "/path/to/executable"
args := []string{
"-graceful"}

cmd := exec.Command(path, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.ExtraFiles = []*os.File{file}

err := cmd.Start()
if err != nil {
log.Fatalf("gracefulRestart: Failed to launch, error: %v", err)
}

然后在子进程中使用net.FileListener来从fd创建一个Listener

1
2
3
func FileListener
func FileListener(f *os.File) (ln Listener, err error)
FileListener returns a copy of the network listener corresponding to the open file f. It is the caller's responsibility to close ln when finished. Closing ln does not affect f, and closing f does not affect ln.
1
2
3
4
5
6
7
8
9
10
flag.BoolVar(&gracefulChild, "graceful", false, "listen on fd open 3 (internal use only)")

if gracefulChild {
log.Print("main: Listening to existing file descriptor 3.")
f := os.NewFile(3, "") // 3就是我们传递的listening fd
l, err = net.FileListener(f)
} else {
log.Print("main: Listening on a new file descriptor.")
l, err = net.Listen("tcp", server.Addr)
}

到这里,子进程就可以Accept并接受连接了,现在我们还需要立刻干掉父进程。使用getpid调用获取到父进程的id,然后kill它。

1
2
parent := syscall.Getppid()
syscall.Kill(parent, syscall.SIGTERM)

当然,更加完美的方式还需要父进程可以优雅退出,即不再接受新连接,并且处理完当前所有连接后再退出,如果一段时间内没能处理完,也可以选择直接退出。准备另写文章介绍这个内容。

参考

http://grisha.org/blog/2014/06/03/graceful-restart-in-golang

goim中的数据结构

goim中数据结构的设计非常出彩,值得仔细品味。

Timer

在长连接这样的场景下,有N条连接需要维护心跳信息,凡人的做法可能就是开启N个gorutine,但goim使用最小堆高效处理了这个问题。

Timer就是定时器的结构,对外提供Add、Del、Set三个方法用于添加,删除、修改TimerData。

TimerData存储单个定时器的信息,到期则执行回调函数fn。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// libs/time/timer.go

type Timer struct {
lock sync.Mutex
free *TimerData
timers []*TimerData
signal *itime.Timer
num int
}

type TimerData struct {
Key string
expire itime.Time
fn func()
index int
next *TimerData
}

先看一下添加删除timer的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// libs/time/timer.go

func (t *Timer) Add(expire itime.Duration, fn func()) (td *TimerData) {
t.lock.Lock()
td = t.get()
td.expire = itime.Now().Add(expire)
td.fn = fn
t.add(td)
t.lock.Unlock()
return
}


func (t *Timer) Del(td *TimerData) {
t.lock.Lock()
t.del(td)
t.put(td)
t.lock.Unlock()
return
}

func (t *Timer) Set(td *TimerData, expire itime.Duration) {
t.lock.Lock()
t.del(td)
td.expire = itime.Now().Add(expire)
t.add(td)
t.lock.Unlock()
return
}

除去加锁部分,内部就是调用了add、、del、get、put这几个方法。

get和put非常简单,只是根据当前的free指针获得或者放回去一个TimerData。

add和del是典型的堆操作,就是往timers这个堆里添加删除元素。

Timer在初始化时就会构造好一条free链表,在Add时,先取出free指向的节点,加入到timers堆中。在Del时,先从堆中删除,再放回链表中。

这条free链表是为了避免频繁申请内存做的优化!get和put负责在链表中申请和释放节点,add和del在获取到节点(TimerData)后进行堆的调整!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// libs/time/timer.go

// get get a free timer data.
func (t *Timer) get() (td *TimerData) {
if td = t.free; td == nil {
t.grow()
td = t.free
}
t.free = td.next
return
}

// put put back a timer data.
func (t *Timer) put(td *TimerData) {
td.fn = nil
td.next = t.free
t.free = td
}

// Push pushes the element x onto the heap. The complexity is
// O(log(n)) where n = h.Len().
func (t *Timer) add(td *TimerData) {
var d itime.Duration
td.index = len(t.timers)
// add to the minheap last node
t.timers = append(t.timers, td)
t.up(td.index)
if td.index == 0 {
// if first node, signal start goroutine
d = td.Delay()
t.signal.Reset(d)
if Debug {
log.Debug("timer: add reset delay %d ms", int64(d)/int64(itime.Millisecond))
}
}
if Debug {
log.Debug("timer: push item key: %s, expire: %s, index: %d", td.Key, td.ExpireString(), td.index)
}
return
}

func (t *Timer) del(td *TimerData) {
var (
i = td.index
last = len(t.timers) - 1
)
if i < 0 || i > last || t.timers[i] != td {
// already remove, usually by expire
if Debug {
log.Debug("timer del i: %d, last: %d, %p", i, last, td)
}
return
}
if i != last {
t.swap(i, last)
t.down(i, last)
t.up(i)
}
// remove item is the last node
t.timers[last].index = -1 // for safety
t.timers = t.timers[:last]
if Debug {
log.Debug("timer: remove item key: %s, expire: %s, index: %d", td.Key, td.ExpireString(), td.index)
}
return
}

那么,timer定时这块是怎么实现的呢?

1
2
3
4
5
6
7
func (t *Timer) init(num int) {
t.signal = itime.NewTimer(infiniteDuration)
t.timers = make([]*TimerData, 0, num)
t.num = num
t.grow()
go t.start() // 此处开始轮询
}

可以看到,只启动了一个gorutine来管理所有的timer,start内部是一个无限循环,expire()负责设置一个最近的定期器,然后阻塞等待即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (t *Timer) start() {
for {
t.expire()
<-t.signal.C
}
}

func (t *Timer) expire() {
var (
fn func()
td *TimerData
d itime.Duration
)
t.lock.Lock()
for {
if len(t.timers) == 0 { // 没有定时器,无限睡眠
d = infiniteDuration
if Debug {
log.Debug("timer: no other instance")
}
break
}
td = t.timers[0] // 取第一个元素,如何还没到期就根据剩余时间重置定时器
if d = td.Delay(); d > 0 {
break
}
fn = td.fn
// let caller put back, usually by Del()
t.del(td) // 从堆中删除
t.lock.Unlock()
if fn == nil {
log.Warn("expire timer no fn")
} else {
if Debug {
log.Debug("timer key: %s, expire: %s, index: %d expired, call fn", td.Key, td.ExpireString(), td.index)
}
fn() // 执行回调
}
t.lock.Lock()
}
t.signal.Reset(d)
if Debug {
log.Debug("timer: expire reset delay %d ms", int64(d)/int64(itime.Millisecond))
}
t.lock.Unlock()
return
}

Buffer Pool

Pool可以理解成内存,free是空闲内存的指针,Buffer是内存分配的基本单元

1
2
3
4
5
6
7
8
9
10
11
type Pool struct {
lock sync.Mutex
free *Buffer
max int
num int
}

type Buffer struct {
buf []byte
next *Buffer // next free buffer
}

在Pool初始化时,会申请一块很大的buf,然后构建Buffer链表,每个Buffer都通过slice指向这个buf的一部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (p *Pool) grow() {
var (
i int
b *Buffer
bs []Buffer
buf []byte
)
buf = make([]byte, p.max)
bs = make([]Buffer, p.num)
p.free = &bs[0]
b = p.free
for i = 1; i < p.num; i++ {
b.buf = buf[(i-1)*p.size : i*p.size]
b.next = &bs[i]
b = b.next
}
b.buf = buf[(i-1)*p.size : i*p.size]
b.next = nil
return
}

这样,申请和释放Buffer只需要操作链表的指针即可,复杂度O(1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Get get a free memory buffer.
func (p *Pool) Get() (b *Buffer) {
p.lock.Lock()
if b = p.free; b == nil {
p.grow()
b = p.free
}
p.free = b.next
p.lock.Unlock()
return
}

// Put put back a memory buffer to free.
func (p *Pool) Put(b *Buffer) {
p.lock.Lock()
b.next = p.free
p.free = b
p.lock.Unlock()
return
}

Router模块中的Cleaner

Cleaner中主要使用了一条双向循环链表,但额外设计了一个map,来提供快速的节点定位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type CleanData struct {
Key int64
expireTime time.Time
next, prev *CleanData
}

type Cleaner struct {
cLock sync.Mutex
size int
root CleanData
maps map[int64]*CleanData
}

func (c *Cleaner) PushFront(key int64, expire time.Duration) {
c.cLock.Lock()
if e, ok := c.maps[key]; ok {
// update time
e.expireTime = time.Now().Add(expire)
c.moveToFront(e)
} else {
e = new(CleanData)
e.Key = key
e.expireTime = time.Now().Add(expire)
c.maps[key] = e
at := &c.root
n := at.next
at.next = e
e.prev = at
e.next = n
n.prev = e
c.size++
}
c.cLock.Unlock()
}

Comet模块中的RingBuf

RingBuf是一个环形缓冲区,其中保存的是空闲的proto对象,负责TCP数据的拆包封包。每个连接都会初始化自己的RingBuf。

值得一提的是,RingBuf使用sequence & (array length-1) = array index这样的方式来定位元素,非常高效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
type Ring struct {
// read
rp uint64
num uint64
mask uint64
// TODO split cacheline, many cpu cache line size is 64
// pad [40]byte
// write
wp uint64
data []proto.Proto
}

func NewRing(num int) *Ring {
r := new(Ring)
r.init(uint64(num))
return r
}

func (r *Ring) Init(num int) {
r.init(uint64(num))
}

func (r *Ring) init(num uint64) {
// 2^N
if num&(num-1) != 0 {
for num&(num-1) != 0 {
num &= (num - 1)
}
num = num << 1
}
r.data = make([]proto.Proto, num)
r.num = num
r.mask = r.num - 1
}

func (r *Ring) Get() (proto *proto.Proto, err error) {
if r.rp == r.wp {
return nil, ErrRingEmpty
}
proto = &r.data[r.rp&r.mask]
return
}

func (r *Ring) GetAdv() {
r.rp++
if Debug {
log.Debug("ring rp: %d, idx: %d", r.rp, r.rp&r.mask)
}
}

func (r *Ring) Set() (proto *proto.Proto, err error) {
if r.wp-r.rp >= r.num {
return nil, ErrRingFull
}
proto = &r.data[r.wp&r.mask]
return
}

func (r *Ring) SetAdv() {
r.wp++
if Debug {
log.Debug("ring wp: %d, idx: %d", r.wp, r.wp&r.mask)
}
}

func (r *Ring) Reset() {
r.rp = 0
r.wp = 0
// prevent pad compiler optimization
// r.pad = [40]byte{}
}

参考资料

http://ifeve.com/dissecting-disruptor-whats-so-special/
https://github.com/Terry-Mao/goim

goim-comet模块源码分析

comet是客户端直接连接的节点,设计上是无状态的。通过rpc与logic服务交互,对外提供TCP、HTTP、WebSocket连接方式,自己也作为push这个rpc服务的提供方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//main.go

if err := InitLogicRpc(Conf.LogicAddrs); err != nil {
log.Warn("logic rpc current can't connect, retry")
}
// start monitor
if Conf.MonitorOpen {
InitMonitor(Conf.MonitorAddrs)
}

... //bucket round server的初始化,下面会讲

// white list
// tcp comet
if err := InitTCP(Conf.TCPBind, Conf.MaxProc); err != nil {
panic(err)
}
// websocket comet
if err := InitWebsocket(Conf.WebsocketBind); err != nil {
panic(err)
}
// flash safe policy
if Conf.FlashPolicyOpen {
if err := InitFlashPolicy(); err != nil {
panic(err)
}
}
// wss comet
if Conf.WebsocketTLSOpen {
if err := InitWebsocketWithTLS(Conf.WebsocketTLSBind, Conf.WebsocketCertFile, Conf.WebsocketPrivateFile); err != nil {
panic(err)
}
}
// start rpc
if err := InitRPCPush(Conf.RPCPushAddrs); err != nil {
panic(err)
}

InitTCP

InitXXX的作用是暴露不同的服务给客户端使用,选一个看就可以了。

在多个gorutine中调用了AcceptTCP,充分发挥多核能力

1
2
3
for i := 0; i < accept; i++ {
go acceptTCP(DefaultServer, listener)
}

accept之后,核心逻辑实现在serveTCP中,首先调用auth服务,获得subKey,然后把channel放进bucket里

1
2
3
4
5
6
7
// ... 
if p, err = ch.CliProto.Set(); err == nil {
if key, ch.RoomId, hb, err = server.authTCP(rr, wr, p); err == nil {
b = server.Bucket(key)
err = b.Put(key, ch)
}
}

serveTCP方法中,当前gorutine负责读数据,处理心跳,把数据封装成proto对象然后保存到channel的CliProto中,然后通知dispatchTCP处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
for {
if p, err = ch.CliProto.Set(); err != nil { // 从channel中申请一个buffer用来存放proto
break
}
if white {
DefaultWhitelist.Log.Printf("key: %s start read proto\n", key)
}
if err = p.ReadTCP(rr); err != nil { // 读proto
break
}
if white {
DefaultWhitelist.Log.Printf("key: %s read proto:%v\n", key, p)
}
if p.Operation == define.OP_HEARTBEAT { // 维持心跳
tr.Set(trd, hb)
p.Body = nil
p.Operation = define.OP_HEARTBEAT_REPLY
if Debug {
log.Debug("key: %s receive heartbeat", key)
}
} else {
if err = server.operator.Operate(p); err != nil {
break
}
}
if white {
DefaultWhitelist.Log.Printf("key: %s process proto:%v\n", key, p)
}
ch.CliProto.SetAdv()
ch.Signal() //通知dispatchTCP处理
if white {
DefaultWhitelist.Log.Printf("key: %s signal\n", key)
}
}

dispatchTCP中,如果收到proto.ProtoReady,就表示读取到了一个proto,然后原样写回?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
case proto.ProtoReady:
// fetch message from svrbox(client send)
for {
if p, err = ch.CliProto.Get(); err != nil {
err = nil // must be empty error
break
}
if white {
DefaultWhitelist.Log.Printf("key: %s start write client proto%v\n", key, p)
}
if err = p.WriteTCP(wr); err != nil {
goto failed
}
if white {
DefaultWhitelist.Log.Printf("key: %s write client proto%v\n", key, p)
}
p.Body = nil // avoid memory leak
ch.CliProto.GetAdv()
}

Round

goim自己进行了buffer的管理,避免了频繁申请内存的开销。通过自定义的Pool结构来分配Buffer,因为分配时要加锁,使用Round来组合多个Pool,通过mod运算随机获取一个Pool,来减缓锁的争用。

1
2
3
4
5
6
7
8
9
10
// round.go
// Reader get a reader memory buffer.
func (r *Round) Reader(rn int) *bytes.Pool {
return &(r.readers[rn%r.options.Reader])
}

// Writer get a writer memory buffer pool.
func (r *Round) Writer(rn int) *bytes.Pool {
return &(r.writers[rn%r.options.Writer])
}

Pool内部使用一条单链表,维护一个free指针指向未分配的buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// libs/buffer.go

func (p *Pool) grow() {
var (
i int
b *Buffer
bs []Buffer
buf []byte
)
buf = make([]byte, p.max) // 所有的Buffer都从这里分配
bs = make([]Buffer, p.num) // Buffer数组
p.free = &bs[0] //构造Buffer链
b = p.free
for i = 1; i < p.num; i++ {
b.buf = buf[(i-1)*p.size : i*p.size]
b.next = &bs[i]
b = b.next
}
b.buf = buf[(i-1)*p.size : i*p.size]
b.next = nil
return
}

// Get get a free memory buffer.
func (p *Pool) Get() (b *Buffer) {
p.lock.Lock()
if b = p.free; b == nil {
p.grow()
b = p.free
}
p.free = b.next
p.lock.Unlock()
return
}

Timer

goim的Timer也是基于堆结构改写的,内部只有一个timer,不断把定时器设置成堆顶元素的触发时间来提高效率。

Channel

TCP连接会被封装到Channel这个结构中,使用CliProto来处理封包拆包

1
2
3
4
5
6
7
8
9
type Channel struct {
RoomId int32
CliProto Ring
signal chan *proto.Proto
Writer bufio.Writer
Reader bufio.Reader
Next *Channel
Prev *Channel
}

Ring

Ring是Channel内部用来保存并重用proto的一个结构

1
2
3
4
5
6
7
8
9
10
11
type Ring struct {
// read
rp uint64
num uint64
mask uint64
// TODO split cacheline, many cpu cache line size is 64
// pad [40]byte
// write
wp uint64
data []proto.Proto
}

Bucket

bucket是channel的容器

1
2
3
4
5
6
7
8
9
10
11
//main.go

buckets := make([]*Bucket, Conf.Bucket)
for i := 0; i < Conf.Bucket; i++ {
buckets[i] = NewBucket(BucketOptions{
ChannelSize: Conf.BucketChannel,
RoomSize: Conf.BucketRoom,
RoutineAmount: Conf.RoutineAmount,
RoutineSize: Conf.RoutineSize,
})
}

goim-router模块源码分析

这个模块是用于保存状态信息的(例如在线的session)

文档里是这样描述的
router 属于有状态节点,logic可以使用一致性hash配置节点,增加多个router节点(目前还不支持动态扩容),提前预估好在线和压力情况

从main.go入手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func main() {
flag.Parse()
if err := InitConfig(); err != nil {
panic(err)
}
runtime.GOMAXPROCS(Conf.MaxProc)
log.LoadConfiguration(Conf.Log)
defer log.Close()
log.Info("router[%s] start", VERSION)
// start prof
perf.Init(Conf.PprofAddrs)
// start monitor
if Conf.MonitorOpen {
InitMonitor(Conf.MonitorAddrs)
}
// start rpc
buckets := make([]*Bucket, Conf.Bucket)
for i := 0; i < Conf.Bucket; i++ {
buckets[i] = NewBucket(Conf.Session, Conf.Server, Conf.Cleaner)
}
if err := InitRPC(buckets); err != nil {
panic(err)
}
// block until a signal is received.
InitSignal()
}

忽略flag和config部分的处理,这里主要涉及了perfmonitor监控,主RPC逻辑,以及signal处理

perf

内部使用了net/http/pprof进行性能分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func Init(pprofBind []string) {
pprofServeMux := http.NewServeMux()
pprofServeMux.HandleFunc("/debug/pprof/", pprof.Index)
pprofServeMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
pprofServeMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
pprofServeMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
for _, addr := range pprofBind {
go func() {
if err := http.ListenAndServe(addr, pprofServeMux); err != nil {
log.Error("http.ListenAndServe(\"%s\", pprofServeMux) error(%v)", addr, err)
panic(err)
}
}()
}
}

monitor

是一个简单ping请求处理,一看就是用来监测服务存活状态的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func InitMonitor(binds []string) {
m := new(Monitor)
monitorServeMux := http.NewServeMux()
monitorServeMux.HandleFunc("/monitor/ping", m.Ping)
for _, addr := range binds {
go func(bind string) {
if err := http.ListenAndServe(bind, monitorServeMux); err != nil {
log.Error("http.ListenAndServe(\"%s\", pprofServeMux) error(%v)", addr, err)
panic(err)
}
}(addr)
}
}

// monitor ping
func (m *Monitor) Ping(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
}

signal

信号处理,收到SIGHUP重载配置,这是符合linux上惯用约定的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func InitSignal() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGSTOP)
for {
s := <-c
log.Info("router[%s] get a signal %s", VERSION, s.String())
switch s {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP, syscall.SIGINT:
return
case syscall.SIGHUP:
reload()
default:
return
}
}
}

func reload() {
newConf, err := ReloadConfig()
if err != nil {
log.Error("ReloadConfig() error(%v)", err)
return
}
Conf = newConf
}

主RPC逻辑

这里有Session、Cleaner、Bucket这三个主要的结构

Bucket是Session的容器,为了减少锁争夺,会有多个Bucket,根据用户id与Bucket数量进行mod运算来确定,这个Session放到哪个Bucket中,是一种很常见的sharding

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//rpc.go
func (r *RouterRPC) bucket(userId int64) *Bucket {
idx := int(userId % r.BucketIdx) // mod
// fix panic
if idx < 0 {
idx = 0
}
return r.Buckets[idx]
}

func (r *RouterRPC) Put(arg *proto.PutArg, reply *proto.PutReply) error {
reply.Seq = r.bucket(arg.UserId).Put(arg.UserId, arg.Server, arg.RoomId)
return nil
}

// bucket.go
func (b *Bucket) Put(userId int64, server int32, roomId int32) (seq int32) {
var (
s *Session
ok bool
)
b.bLock.Lock() //加锁,只影响这个Bucket
if s, ok = b.sessions[userId]; !ok {
s = NewSession(b.server)
b.sessions[userId] = s
}
if roomId != define.NoRoom {
seq = s.PutRoom(server, roomId)
} else {
seq = s.Put(server)
}
b.counter(userId, server, roomId, true)
b.bLock.Unlock()
return
}

Cleaner是与Session一一对应的一个结构,用于清理Session信息,每个Bucket会有一个单独的gorutine进行定时清理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//bucket.go
func NewBucket(session, server, cleaner int) *Bucket {
b := new(Bucket)
b.sessions = make(map[int64]*Session, session)
b.roomCounter = make(map[int32]int32)
b.serverCounter = make(map[int32]int32)
b.userServerCounter = make(map[int32]map[int64]int32)
b.cleaner = NewCleaner(cleaner)
b.server = server
b.session = session
go b.clean() //启动清理gorutine
return b
}

func (b *Bucket) clean() {
var (
i int
userIds []int64
)
for {
userIds = b.cleaner.Clean()
if len(userIds) != 0 {
b.bLock.Lock()
for i = 0; i < len(userIds); i++ {
b.delEmpty(userIds[i]) 从sessions map中删掉对应的session
}
b.bLock.Unlock()
continue
}
time.Sleep(Conf.BucketCleanPeriod) //休息一段时间
}
}

Cleaner本身的结构是经过精心设计的,使用了一条双向循环链表来记录当前所有Session的信息,为了克服移除一个节点时需要遍历链表,额外用了一个map来快速定位到节点,然后操作这个节点的指针来进行删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (c *Cleaner) remove(key int64) {
if e, ok := c.maps[key]; ok { //通过map定位
delete(c.maps, key) // 从map中删除
// 从链表中删除
e.prev.next = e.next
e.next.prev = e.prev
e.next = nil // avoid memory leaks
e.prev = nil // avoid memory leaks
c.size--
}
}

func (c *Cleaner) Clean() (keys []int64) {
var (
i int
e *CleanData
)
keys = make([]int64, 0, maxCleanNum)
c.cLock.Lock()
for i = 0; i < maxCleanNum; i++ { // 每次最多只清理maxCleanNum个节点,don't know why
if e = c.back(); e != nil {
if e.expire() {
c.remove(e.Key)
keys = append(keys, e.Key)
continue
}
}
break
}
// next time
c.cLock.Unlock()
return
}

有一个问题是,没有看懂过期的逻辑。从链表尾部开始清理,却在Del时把节点放到头部?

使用influxdb收集用户行为数据

目前我们需要收集一些用户在站点上的行为数据,又不想投入精力研发一整套的体系,于是找到了influxdb。

influxdb内置了HTTP API,免去了编写接入代码的繁琐,并带有数据查询和展示的组件,非常适合。

安装过程略去不提,参考官方文档

目录

  • Influxdb
  • Chronograf
  • 例子

Influxdb

概念

一条influxdb的记录的结构是这样的

1
2
3
4
5
6
7
8
<measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>[,<field2-key>=<field2-value>...] [unix-nano-timestamp]

例如

cpu,host=serverA,region=us_west value=0.64
payment,device=mobile,product=Notepad,method=credit billed=33,licenses=3i 1434067467100293230
stock,symbol=AAPL bid=127.46,ask=127.48
temperature,machine=unit42,type=assembly external=25,internal=37 1434067467000000000

类比于RDMBS

influxdb rdbms
unix-nano-timestamp primary key
measurement table
tag indexed column
fields column

HTTP API

influxdb的所有操作都可以通过HTTP API来完成

写数据

1
curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'

Authentication & Authorization

通过HTTP API可以进行DDL操作,我们需要有一些权限机制来保证安全.

  1. 修改配置文件/etc/influxdb/influxdb.conf并重启
1
2
3
4
[http]
...
auth-enabled = true
...
  1. 创建用户

创建超级用户admin

1
CREATE USER admin WITH PASSWORD '123456' WITH ALL PRIVILEGES

创建普通用户

1
2
CREATE USER reader WITH PASSWORD '123456'
GRANT READ ON "mydb" TO "reader"

Rentention Policy

时序数据的量可能会非常大,需要定义保存数据的策略

1
CREATE RETENTION POLICY "one_day_only" ON "mydb" DURATION 1d REPLICATION 1 DEFAULT

Chronograf

官方有一个叫TICK的技术栈推荐,但对于我们的场景,只要结合其中的IC就可以了,即influxdb和chronograf.

安装启动过程不提

在使用它之前,需要先学习一下基本的influxdb查询语法

这样,就可以轻松的在Chronograf中查询数据了,如果需要额外的数据,可以让业务方导出csv自行分析。

linux服务管理-upstart


当我们写完一个go程序时,部署只需要把二进制包拷贝到服务器上即可。

但在真正的生产环境中,如果程序出于某种原因崩溃了,我们会面临两个问题

  1. 如何得知程序崩溃,如果靠人工的方式,无疑是有巨大时间滞后的
  2. 如何方便的在崩溃后重启

在Ruby的世界中,例如unicorn这样的应用服务器,会有一个master进程,专门解决以上问题,当worker崩溃时,master就会重新fork出一个worker进程。

upstart简介

upstart是linux主流发行版(ubuntu,RHEL…)本自带的进程管理系统

程序开发时需要注意的事项

作为程序开发人员,在编写系统服务时,需要了解 upstart 的一些特殊要求。只有符合这些要求的软件才可以被 upstart 管理。

规则一,派生次数需声明。

很多 Linux 后台服务都通过派生两次的技巧将自己变成后台服务程序。如果您编写的服务也采用了这个技术,就必须通过文档或其它的某种方式明确地让 upstart 的维护人员知道这一点,这将影响 upstart 的 expect stanza,我们在前面已经详细介绍过这个 stanza 的含义。

规则二,派生后即可用。

后台程序在完成第二次派生的时候,必须保证服务已经可用。因为 upstart 通过派生计数来决定服务是否处于就绪状态。

规则三,遵守 SIGHUP 的要求。

upstart 会给守护进程发送 SIGHUP 信号,此时,upstart 希望该守护进程做以下这些响应工作:

•完成所有必要的重新初始化工作,比如重新读取配置文件。这是因为 upstart 的命令”initctl reload”被设计为可以让服务在不重启的情况下更新配置。

•守护进程必须继续使用现有的 PID,即收到 SIGHUP 时不能调用 fork。如果服务必须在这里调用 fork,则等同于派生两次,参考上面的规则一的处理。这个规则保证了 upstart 可以继续使用 PID 管理本服务。

规则四,收到 SIGTEM 即 shutdown。

•当收到 SIGTERM 信号后,upstart 希望守护进程进程立即干净地退出,释放所有资源。如果一个进程在收到 SIGTERM 信号后不退出,upstart 将对其发送 SIGKILL 信号。

日常upstart命令

全部 简写
initctl start start
initctl restart restart
initctl reload reload
initctl stop stop

编写upstart配置文件

  1. upstart脚本必须包含一个exec或者script片段,用于启动你的程序
  2. pre-start script and post-stop script是一些钩子,可以在程序启动前后做一些事
  3. start on and stop on定义了何时启动、停止你的程序

golang多用于写网络服务,以Nginx的官方upstart脚本为参考比较合适

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# http://wiki.nginx.org/upstart
# nginx

description "nginx http daemon"
author "George Shammas <georgyo@gmail.com>"

start on (filesystem and net-device-up IFACE!=lo) 两个条件 文件系统和网络设备启动以后
stop on runlevel [!2345]

env DAEMON=/usr/sbin/nginx
env PID=/var/run/nginx.pid

expect fork
respawn
respawn limit 10 5
#oom never

pre-start script
$DAEMON -t
if [ $? -ne 0 ] 判断nginx -t命令的返回值是否是0(不是EXIT_SUCCESS)
then exit $?
fi
end script

exec $DAEMON

简单写一个

1
2
3
4
5
6
7
8
9
10
11
12
# /etc/init/naievehttpserver.conf
description "naive http golang server"
author "xxx"

start on (filesystem and net-device-up IFACE!=lo)
stop on runlevel [!2345]

respawn

chdir /vagrant

exec ./naivehttpserver

这样,就可以使用sudo start naivehttpserver这样的命令来管理naivehttpserver服务了。

更全的用法可以看http://upstart.ubuntu.com/cookbook/http://upstart.ubuntu.com/wiki/

systemd

目前upstart有被systemd取代的趋势, 列一些资料

参考资料

浅析 Linux 初始化 init 系统,第 2 部分

http://upstart.ubuntu.com/getting-started.html

http://blog.fens.me/linux-upstart/

请求优化-百倍性能提升

发现慢请求

我们的整个web服务是基于Rails的,应用服务器是基于多进程模型的unicorn,因此,慢请求会对系统造成比较大的影响。

最近又到了春节促销活动时期,为了更及时地发现整个后端系统的问题,我基于Kibana的Visulize功能制作了一个慢请求监控图

通过这个统计图,可以很方便的看出过去N分钟内,响应时间最慢的接口分布情况。

优化-Cache

对于图中这个api/v1/publishers接口,会出现很多耗时100ms以上的请求,这个接口做了以下几件事。

  1. 查询数据库,加载一个几乎不怎么变动的publisher列表
  2. 渲染成json返回

显而易见的优化策略就是cache掉数据库查询,做完这一步的确也能获得比较大的提升,普通的优化工作可能到这一步就结束了。

但是还有一个特殊的地方,这个列表可能有几百个条目,相对来说还是比较大的。而ruby在做这种CPU密集的渲染时,性能是非常差的。因此,也要想办法解决掉渲染的问题。

优化-Render

只render一次。

相比于之前cache数据库查询的结果的方式,这次在代码中计算出最终返回的json,直接cache这个json字符串。

然后,在渲染的时候,使用渲染文本,并手动设置content-type的方式来返回json

1
render plain: "字符串", content_type: 'application/json'

效果

做完以上两步,在日志中观察到单个请求的时间降低到0.8ms左右!

Future

以上优化差不多是Rails框架内的极致,再往上,可以考虑直接把数据静态化成json文件,交给Nginx管理。

也谈服务注册与发现

背景

公司后端是基于ROR的,应用服务器是unicorn,搞活动高峰期,缺乏一种动态改变配置后,就可以控制程序行为的能力。

受限于Rails和Unicorn的多进程模型,初步的方案就是往Cache里扔一个值,接口每次都检查这个值,但这样就多了一次网络IO,不太Geek。

深入思考了一下,觉得可以做一个配置中心,unicorn启动的时候,建立长连接到配置中心获取数据,配置发生改变的时候通过长连接通知到unicorn,从而动态改变了所有服务器上的本地内存。然后又在考虑,是每个unicorn worker都保持一个连接?还是只有master保持连接,通过一些进程间通信技术通知到worker?又或者单独做一个agent,让agent去保持连接,然后写到unix socket里,unicorn读这个socket?…

最终由于改不动unicorn的代码, 放弃了。

后来转念一想,这不就是etcd解决的问题!只不过详细的方案还需要基于etcd自己设计。

设计

服务有一个唯一id,id是调用方和提供方都知道的。

大致流程:

  • 提供方: 往etcd里put一个键值对,key是服务id,值是对应的配置
  • 掉用方: 根据key向etcd获取配置,并进行watch
  • 配置改变时,调用方可以通过watch得知

实践

根据官方文档装好etcd,https://coreos.com/etcd/docs/latest/dl_build.html

1
2
3
4
git clone https://github.com/coreos/etcd.git
cd etcd
./build
./bin/etcd

写一个简单的服务注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"context"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)

type ServiceInfo struct {
id string
address string
}

var info = ServiceInfo{"/service/b", "10.1.1.60:8081"}

func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()

if _, err := cli.Put(context.TODO(), info.id, info.address); err != nil {
log.Fatal(err)
}

if resp, err := cli.Get(context.TODO(), info.id); err != nil {
log.Fatal(err)
} else {
log.Println("resp: ", resp)
}
}

写个简单的go服务a, 通过etcd来发现服务b的地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package main

import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"net/http"
"time"
)

type ServiceInfo struct {
id string
address string
}

var info = ServiceInfo{"/service/a", "10.1.1.59:8080"}
var infoB = ServiceInfo{id: "/service/b"}

func main() {
go func() {
http.HandleFunc("/", hello)
http.ListenAndServe(info.address, nil)
}()
etcd()
}

func hello(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "hello this is %s, B address is %s", info.id, infoB.address)
}

func etcd() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()

// regist self
if _, err := cli.Put(context.TODO(), info.id, info.address); err != nil {
log.Fatal(err)
}

// get service b info
if resp, err := cli.Get(context.TODO(), infoB.id); err != nil {
log.Fatal(err)
} else {
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
infoB.address = string(ev.Value)
}
}

// watch changes
rch := cli.Watch(context.TODO(), infoB.id, clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
log.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
infoB.address = string(ev.Kv.Value)
}
}
}

结果是, a服务可以获取到b的地址,在/service/b这个key改变时,也可以通过watch得到最新的值。

服务挂掉的处理

以上的例子可能有点不太恰当,因为偏向于一个动态的配置中心,我们不需要监测服务是否存活,因为etcd本身就是服务。

对于一般的服务,如果挂了,以上的方式是无法处理的。解决方式是给key设置一个ttl,每隔一段时间刷新ttl。通过这种类似心跳的方式来实现。

参考