Moon's blog

write the code, change the world.

也谈服务注册与发现

背景

公司后端是基于ROR的,应用服务器是unicorn,搞活动高峰期,缺乏一种动态改变配置后,就可以控制程序行为的能力。

受限于Rails和Unicorn的多进程模型,初步的方案就是往Cache里扔一个值,接口每次都检查这个值,但这样就多了一次网络IO,不太Geek。

深入思考了一下,觉得可以做一个配置中心,unicorn启动的时候,建立长连接到配置中心获取数据,配置发生改变的时候通过长连接通知到unicorn,从而动态改变了所有服务器上的本地内存。然后又在考虑,是每个unicorn worker都保持一个连接?还是只有master保持连接,通过一些进程间通信技术通知到worker?又或者单独做一个agent,让agent去保持连接,然后写到unix socket里,unicorn读这个socket?…

最终由于改不动unicorn的代码, 放弃了。

后来转念一想,这不就是etcd解决的问题!只不过详细的方案还需要基于etcd自己设计。

设计

服务有一个唯一id,id是调用方和提供方都知道的。

大致流程:

  • 提供方: 往etcd里put一个键值对,key是服务id,值是对应的配置
  • 掉用方: 根据key向etcd获取配置,并进行watch
  • 配置改变时,调用方可以通过watch得知

实践

根据官方文档装好etcd,https://coreos.com/etcd/docs/latest/dl_build.html

1
2
3
4
git clone https://github.com/coreos/etcd.git
cd etcd
./build
./bin/etcd

写一个简单的服务注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"context"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)

type ServiceInfo struct {
id string
address string
}

var info = ServiceInfo{"/service/b", "10.1.1.60:8081"}

func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()

if _, err := cli.Put(context.TODO(), info.id, info.address); err != nil {
log.Fatal(err)
}

if resp, err := cli.Get(context.TODO(), info.id); err != nil {
log.Fatal(err)
} else {
log.Println("resp: ", resp)
}
}

写个简单的go服务a, 通过etcd来发现服务b的地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package main

import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"log"
"net/http"
"time"
)

type ServiceInfo struct {
id string
address string
}

var info = ServiceInfo{"/service/a", "10.1.1.59:8080"}
var infoB = ServiceInfo{id: "/service/b"}

func main() {
go func() {
http.HandleFunc("/", hello)
http.ListenAndServe(info.address, nil)
}()
etcd()
}

func hello(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "hello this is %s, B address is %s", info.id, infoB.address)
}

func etcd() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()

// regist self
if _, err := cli.Put(context.TODO(), info.id, info.address); err != nil {
log.Fatal(err)
}

// get service b info
if resp, err := cli.Get(context.TODO(), infoB.id); err != nil {
log.Fatal(err)
} else {
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
infoB.address = string(ev.Value)
}
}

// watch changes
rch := cli.Watch(context.TODO(), infoB.id, clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
log.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
infoB.address = string(ev.Kv.Value)
}
}
}

结果是, a服务可以获取到b的地址,在/service/b这个key改变时,也可以通过watch得到最新的值。

服务挂掉的处理

以上的例子可能有点不太恰当,因为偏向于一个动态的配置中心,我们不需要监测服务是否存活,因为etcd本身就是服务。

对于一般的服务,如果挂了,以上的方式是无法处理的。解决方式是给key设置一个ttl,每隔一段时间刷新ttl。通过这种类似心跳的方式来实现。

参考