在写im-go的过程中遇到了一些设计上的问题,于是想找目前有的开源im服务的源码看看。FishChatServer2 在一些模块设计上和我的思路很相似,有种英雄所见略同的快感,所以选了它(FishChatServer2的拆包方式和我上一篇文章 中提到的使用ReadFull
的方式是一样的,并且连模块名字都一样叫Codec)
主要看了libnet和server两个模块
libnet, 是所有server的基础公共库,封装了诸如Listen Accept之类的调用
server, 具体的服务,看了一下gateway和access两个服务的实现
gateway服务 gateway.go 是gateway服务的入口,其实是一个access服务的负载均衡器,核心代码如下
1 2 3 4 5 6 7 8 9 10 11 12 gwServer := server.New() protobuf := codec.Protobuf() if gwServer.Server, err = libnet.Serve(conf.Conf.Server.Proto, conf.Conf.Server.Addr, protobuf, 0 ); err != nil { glog.Error(err) panic (err) } go job.ConfDiscoveryProc() gwServer.Loop()
gwServer.Loop()
的核心代码在server.go 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 func (s *Server) sessionLoop (client *client.Client) { for { reqData, err := client.Session.Receive() if err != nil { glog.Error(err) } if reqData != nil { baseCMD := &external.Base{} if err = proto.Unmarshal(reqData, baseCMD); err != nil { if err = client.Session.Send(&external.Error{ Cmd: external.ErrServerCMD, ErrCode: ecode.ServerErr.Uint32(), ErrStr: ecode.ServerErr.String(), }); err != nil { glog.Error(err) } continue } if err = client.Parse(baseCMD.Cmd, reqData); err != nil { glog.Error(err) continue } } } } func (s *Server) Loop () { glog.Info("loop" ) for { session, err := s.Server.Accept() if err != nil { glog.Error(err) } go s.sessionLoop(client.New(session)) } }
client.Parse最终调用了proto_proc.go 里的client.procReqAccessServer 来执行业务逻辑1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 func (c *Client) procReqAccessServer (reqData []byte ) (err error) { var addr string var accessServerList []string for _, v := range job.AccessServerList { accessServerList = append (accessServerList, v.IP) } if len (accessServerList) == 0 { if err = c.Session.Send(&external.ResSelectAccessServerForClient{ Cmd: external.ReqAccessServerCMD, ErrCode: ecode.NoAccessServer.Uint32(), ErrStr: ecode.NoAccessServer.String(), }); err != nil { glog.Error(err) } return } addr = accessServerList[rand.Intn(len (accessServerList))] if err = c.Session.Send(&external.ResSelectAccessServerForClient{ Cmd: external.ReqAccessServerCMD, ErrCode: ecode.OK.Uint32(), ErrStr: ecode.OK.String(), Addr: addr, }); err != nil { glog.Error(err) } return }
到此一次请求就结束了,可用看出代码的结构上非常清晰,很容易就能理解。
libnet 这个模块帮我们屏蔽了大量繁琐的网络细节,接下来就要看一下它的实现了。
从api.go 入手,这里定义了对外的接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 type Protocol interface { NewCodec(rw io.ReadWriter) Codec } type Codec interface { Receive() ([]byte , error) Send(interface {}) error Close() error } func Serve (network, address string , protocol Protocol, sendChanSize int ) (*Server, error) { listener, err := net.Listen(network, address) if err != nil { return nil , err } return NewServer(listener, protocol, sendChanSize), nil } func Connect (network, address string , protocol Protocol, sendChanSize int ) (*Session, error) { conn, err := net.Dial(network, address) if err != nil { return nil , err } return NewSession(protocol.NewCodec(conn), sendChanSize), nil } func ConnectTimeout (network, address string , timeout time.Duration, protocol Protocol, sendChanSize int ) (*Session, error) { conn, err := net.DialTimeout(network, address, timeout) if err != nil { return nil , err } return NewSession(protocol.NewCodec(conn), sendChanSize), nil }
跳过客户的部分的实现,探索一下server.go ,负责Accept一个连接,并且封装好一个session对象返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (server *Server) Accept () (*Session, error) { var tempDelay time.Duration for { conn, err := server.listener.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } time.Sleep(tempDelay) continue } if strings.Contains(err.Error(), "use of closed network connection" ) { return nil , io.EOF } return nil , err } return server.manager.NewSession( server.protocol.NewCodec(conn), server.sendChanSize, ), nil } }
manager.go 用于管理session,会把session根据id mod 32以后,放进对应的map里, 这里使用了lock来保证并发安全, 但golang1.9以后,应该可以用内置的sync.Map 替代了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (manager *Manager) NewSession (codec Codec, sendChanSize int ) *Session { session := newSession(manager, codec, sendChanSize) manager.putSession(session) return session } func (manager *Manager) putSession (session *Session) { smap := &manager.sessionMaps[session.id%sessionMapNum] smap.Lock() defer smap.Unlock() smap.sessions[session.id] = session manager.disposeWait.Add(1 ) }
Session server在Accept之后,返回的是一个session对象,session负责收发数据,并且实现了优雅退出(gracefully shutdown)
1 2 3 4 5 6 7 8 9 10 11 type Session struct { id uint64 codec Codec manager *Manager sendChan chan interface {} closeFlag int32 closeChan chan int closeMutex sync.Mutex closeCallbacks *list.List State interface {} }
优雅退出的实现,先通过CAS设置一下closeFlag, 成功设置的gorutine可以执行清理操作,失败的gorutine返回SessionClosedError1 2 3 4 5 6 7 8 9 10 11 12 13 func (session *Session) Close () error { if atomic.CompareAndSwapInt32(&session.closeFlag, 0 , 1 ) { err := session.codec.Close() close (session.closeChan) if session.manager != nil { session.manager.delSession(session) } session.invokeCloseCallbacks() return err } return SessionClosedError }
发送数据部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func (session *Session) sendLoop() { defer session.Close() for { // 使用select语句来保证,关闭closeChan之后可以退出sendLoop select { case msg := <-session.sendChan: if session.codec.Send(msg) != nil { return } case <-session.closeChan: return } } } func (session *Session) Send(msg interface{}) (err error) { // 在每次Send的时候,都会检查closeFlag,实现快速的退出 if session.IsClosed() { return SessionClosedError } if session.sendChan == nil { return session.codec.Send(msg) } // send block, 返回一个异常, 有点粗暴了 select { case session.sendChan <- msg: return nil default: return SessionBlockedError } }
最后 其实本意是想找找有没有关于心跳和连接保持方面的代码,但没有什么收获.不过也看到了很多高质量的实现,例如idgen ,粗粗瞟了一眼就发现,应该是使用了雪花算法,此外还有大量微服务的设计,以及一些我很感兴趣的流行开源技术栈(k8s docker etcd hbase kafka)可以看出是一整套经过深思熟虑的系统,决定过年期间要好好看一看这个库,吸收一下营养。