在写im-go的过程中遇到了一些设计上的问题,于是想找目前有的开源im服务的源码看看。FishChatServer2在一些模块设计上和我的思路很相似,有种英雄所见略同的快感,所以选了它(FishChatServer2的拆包方式和我上一篇文章中提到的使用ReadFull
的方式是一样的,并且连模块名字都一样叫Codec)
主要看了libnet和server两个模块
libnet, 是所有server的基础公共库,封装了诸如Listen Accept之类的调用
server, 具体的服务,看了一下gateway和access两个服务的实现
gateway服务
gateway.go是gateway服务的入口,其实是一个access服务的负载均衡器,核心代码如下
1 | // 初始化对象 |
gwServer.Loop()
的核心代码在server.go中
1 | func (s *Server) sessionLoop(client *client.Client) { |
client.Parse最终调用了proto_proc.go里的client.procReqAccessServer来执行业务逻辑1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33func (c *Client) procReqAccessServer(reqData []byte) (err error) {
var addr string
var accessServerList []string
// 从之前提到的access服务地址数组中获取一个可用的access服务
// 没有看懂为什么要做一次额外的复制数组的操作?
for _, v := range job.AccessServerList {
accessServerList = append(accessServerList, v.IP)
}
// 处理错误情况
if len(accessServerList) == 0 {
if err = c.Session.Send(&external.ResSelectAccessServerForClient{
Cmd: external.ReqAccessServerCMD,
ErrCode: ecode.NoAccessServer.Uint32(),
ErrStr: ecode.NoAccessServer.String(),
}); err != nil {
glog.Error(err)
}
return
}
// 返回一个可用地址
addr = accessServerList[rand.Intn(len(accessServerList))]
if err = c.Session.Send(&external.ResSelectAccessServerForClient{
Cmd: external.ReqAccessServerCMD,
ErrCode: ecode.OK.Uint32(),
ErrStr: ecode.OK.String(),
Addr: addr,
}); err != nil {
glog.Error(err)
}
return
}
到此一次请求就结束了,可用看出代码的结构上非常清晰,很容易就能理解。
libnet
这个模块帮我们屏蔽了大量繁琐的网络细节,接下来就要看一下它的实现了。
从api.go入手,这里定义了对外的接口
1 | type Protocol interface { |
跳过客户的部分的实现,探索一下server.go,负责Accept一个连接,并且封装好一个session对象返回
1 | func (server *Server) Accept() (*Session, error) { |
manager.go用于管理session,会把session根据id mod 32以后,放进对应的map里, 这里使用了lock来保证并发安全, 但golang1.9以后,应该可以用内置的sync.Map替代了
1 |
|
Session
server在Accept之后,返回的是一个session对象,session负责收发数据,并且实现了优雅退出(gracefully shutdown)
1 | type Session struct { |
优雅退出的实现,先通过CAS设置一下closeFlag, 成功设置的gorutine可以执行清理操作,失败的gorutine返回SessionClosedError1
2
3
4
5
6
7
8
9
10
11
12
13func (session *Session) Close() error {
// 如果成功通过CAS设置了closeFlag
if atomic.CompareAndSwapInt32(&session.closeFlag, 0, 1) {
err := session.codec.Close() // 关闭net.Conn
close(session.closeChan) // 退出sendLoop
if session.manager != nil { // 从manager中移除session
session.manager.delSession(session)
}
session.invokeCloseCallbacks() // 执行callback
return err
}
return SessionClosedError
}
发送数据部分
1 | func (session *Session) sendLoop() { |
最后
其实本意是想找找有没有关于心跳和连接保持方面的代码,但没有什么收获.不过也看到了很多高质量的实现,例如idgen,粗粗瞟了一眼就发现,应该是使用了雪花算法,此外还有大量微服务的设计,以及一些我很感兴趣的流行开源技术栈(k8s docker etcd hbase kafka)可以看出是一整套经过深思熟虑的系统,决定过年期间要好好看一看这个库,吸收一下营养。