comet是客户端直接连接的节点,设计上是无状态的。通过rpc与logic服务交互,对外提供TCP、HTTP、WebSocket连接方式,自己也作为push这个rpc服务的提供方
1 | //main.go |
InitTCP
InitXXX的作用是暴露不同的服务给客户端使用,选一个看就可以了。
在多个gorutine中调用了AcceptTCP,充分发挥多核能力1
2
3for i := 0; i < accept; i++ {
go acceptTCP(DefaultServer, listener)
}
accept之后,核心逻辑实现在serveTCP中,首先调用auth服务,获得subKey,然后把channel放进bucket里
1 | // ... |
serveTCP方法中,当前gorutine负责读数据,处理心跳,把数据封装成proto对象然后保存到channel的CliProto中,然后通知dispatchTCP处理1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34for {
if p, err = ch.CliProto.Set(); err != nil { // 从channel中申请一个buffer用来存放proto
break
}
if white {
DefaultWhitelist.Log.Printf("key: %s start read proto\n", key)
}
if err = p.ReadTCP(rr); err != nil { // 读proto
break
}
if white {
DefaultWhitelist.Log.Printf("key: %s read proto:%v\n", key, p)
}
if p.Operation == define.OP_HEARTBEAT { // 维持心跳
tr.Set(trd, hb)
p.Body = nil
p.Operation = define.OP_HEARTBEAT_REPLY
if Debug {
log.Debug("key: %s receive heartbeat", key)
}
} else {
if err = server.operator.Operate(p); err != nil {
break
}
}
if white {
DefaultWhitelist.Log.Printf("key: %s process proto:%v\n", key, p)
}
ch.CliProto.SetAdv()
ch.Signal() //通知dispatchTCP处理
if white {
DefaultWhitelist.Log.Printf("key: %s signal\n", key)
}
}
dispatchTCP中,如果收到proto.ProtoReady,就表示读取到了一个proto,然后原样写回?
1 | case proto.ProtoReady: |
Round
goim自己进行了buffer的管理,避免了频繁申请内存的开销。通过自定义的Pool结构来分配Buffer,因为分配时要加锁,使用Round来组合多个Pool,通过mod运算随机获取一个Pool,来减缓锁的争用。
1 | // round.go |
Pool内部使用一条单链表,维护一个free指针指向未分配的buffer
1 | // libs/buffer.go |
Timer
goim的Timer也是基于堆结构改写的,内部只有一个timer,不断把定时器设置成堆顶元素的触发时间来提高效率。
Channel
TCP连接会被封装到Channel这个结构中,使用CliProto来处理封包拆包
1 | type Channel struct { |
Ring
Ring是Channel内部用来保存并重用proto的一个结构1
2
3
4
5
6
7
8
9
10
11type Ring struct {
// read
rp uint64
num uint64
mask uint64
// TODO split cacheline, many cpu cache line size is 64
// pad [40]byte
// write
wp uint64
data []proto.Proto
}
Bucket
bucket是channel的容器1
2
3
4
5
6
7
8
9
10
11//main.go
buckets := make([]*Bucket, Conf.Bucket)
for i := 0; i < Conf.Bucket; i++ {
buckets[i] = NewBucket(BucketOptions{
ChannelSize: Conf.BucketChannel,
RoomSize: Conf.BucketRoom,
RoutineAmount: Conf.RoutineAmount,
RoutineSize: Conf.RoutineSize,
})
}