err := cmd.Start() if err != nil { log.Fatalf("gracefulRestart: Failed to launch, error: %v", err) }
然后在子进程中使用net.FileListener来从fd创建一个Listener
1 2 3
func FileListener func FileListener(f *os.File) (ln Listener, err error) FileListener returns a copy of the network listener corresponding to the open file f. It is the caller's responsibility to close ln when finished. Closing ln does not affect f, and closing f does not affect ln.
1 2 3 4 5 6 7 8 9 10
flag.BoolVar(&gracefulChild, "graceful", false, "listen on fd open 3 (internal use only)")
if gracefulChild { log.Print("main: Listening to existing file descriptor 3.") f := os.NewFile(3, "") // 3就是我们传递的listening fd l, err = net.FileListener(f) } else { log.Print("main: Listening on a new file descriptor.") l, err = net.Listen("tcp", server.Addr) }
// get get a free timer data. func(t *Timer)get()(td *TimerData) { if td = t.free; td == nil { t.grow() td = t.free } t.free = td.next return }
// put put back a timer data. func(t *Timer)put(td *TimerData) { td.fn = nil td.next = t.free t.free = td }
// Push pushes the element x onto the heap. The complexity is // O(log(n)) where n = h.Len(). func(t *Timer)add(td *TimerData) { var d itime.Duration td.index = len(t.timers) // add to the minheap last node t.timers = append(t.timers, td) t.up(td.index) if td.index == 0 { // if first node, signal start goroutine d = td.Delay() t.signal.Reset(d) if Debug { log.Debug("timer: add reset delay %d ms", int64(d)/int64(itime.Millisecond)) } } if Debug { log.Debug("timer: push item key: %s, expire: %s, index: %d", td.Key, td.ExpireString(), td.index) } return }
func(t *Timer)del(td *TimerData) { var ( i = td.index last = len(t.timers) - 1 ) if i < 0 || i > last || t.timers[i] != td { // already remove, usually by expire if Debug { log.Debug("timer del i: %d, last: %d, %p", i, last, td) } return } if i != last { t.swap(i, last) t.down(i, last) t.up(i) } // remove item is the last node t.timers[last].index = -1// for safety t.timers = t.timers[:last] if Debug { log.Debug("timer: remove item key: %s, expire: %s, index: %d", td.Key, td.ExpireString(), td.index) } return }
// Get get a free memory buffer. func(p *Pool)Get()(b *Buffer) { p.lock.Lock() if b = p.free; b == nil { p.grow() b = p.free } p.free = b.next p.lock.Unlock() return }
// Put put back a memory buffer to free. func(p *Pool)Put(b *Buffer) { p.lock.Lock() b.next = p.free p.free = b p.lock.Unlock() return }
type Cleaner struct { cLock sync.Mutex size int root CleanData maps map[int64]*CleanData }
func (c *Cleaner) PushFront(key int64, expire time.Duration) { c.cLock.Lock() if e, ok := c.maps[key]; ok { // update time e.expireTime = time.Now().Add(expire) c.moveToFront(e) } else { e = new(CleanData) e.Key = key e.expireTime = time.Now().Add(expire) c.maps[key] = e at := &c.root n := at.next at.next = e e.prev = at e.next = n n.prev = e c.size++ } c.cLock.Unlock() }
type Ring struct { // read rp uint64 num uint64 mask uint64 // TODO split cacheline, many cpu cache line size is 64 // pad [40]byte // write wp uint64 data []proto.Proto }
funcNewRing(num int) *Ring { r := new(Ring) r.init(uint64(num)) return r }
func(p *Pool)grow() { var ( i int b *Buffer bs []Buffer buf []byte ) buf = make([]byte, p.max) // 所有的Buffer都从这里分配 bs = make([]Buffer, p.num) // Buffer数组 p.free = &bs[0] //构造Buffer链 b = p.free for i = 1; i < p.num; i++ { b.buf = buf[(i-1)*p.size : i*p.size] b.next = &bs[i] b = b.next } b.buf = buf[(i-1)*p.size : i*p.size] b.next = nil return }
// Get get a free memory buffer. func(p *Pool)Get()(b *Buffer) { p.lock.Lock() if b = p.free; b == nil { p.grow() b = p.free } p.free = b.next p.lock.Unlock() return }
type Channel struct { RoomId int32 CliProto Ring signal chan *proto.Proto Writer bufio.Writer Reader bufio.Reader Next *Channel Prev *Channel }
Ring
Ring是Channel内部用来保存并重用proto的一个结构
1 2 3 4 5 6 7 8 9 10 11
type Ring struct { // read rp uint64 num uint64 mask uint64 // TODO split cacheline, many cpu cache line size is 64 // pad [40]byte // write wp uint64 data []proto.Proto }
Bucket
bucket是channel的容器
1 2 3 4 5 6 7 8 9 10 11
//main.go
buckets := make([]*Bucket, Conf.Bucket) for i := 0; i < Conf.Bucket; i++ { buckets[i] = NewBucket(BucketOptions{ ChannelSize: Conf.BucketChannel, RoomSize: Conf.BucketRoom, RoutineAmount: Conf.RoutineAmount, RoutineSize: Conf.RoutineSize, }) }
funcInitSignal() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGSTOP) for { s := <-c log.Info("router[%s] get a signal %s", VERSION, s.String()) switch s { case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGSTOP, syscall.SIGINT: return case syscall.SIGHUP: reload() default: return } } }
func(c *Cleaner)remove(key int64) { if e, ok := c.maps[key]; ok { //通过map定位 delete(c.maps, key) // 从map中删除 // 从链表中删除 e.prev.next = e.next e.next.prev = e.prev e.next = nil// avoid memory leaks e.prev = nil// avoid memory leaks c.size-- } }
func(c *Cleaner)Clean()(keys []int64) { var ( i int e *CleanData ) keys = make([]int64, 0, maxCleanNum) c.cLock.Lock() for i = 0; i < maxCleanNum; i++ { // 每次最多只清理maxCleanNum个节点,don't know why if e = c.back(); e != nil { if e.expire() { c.remove(e.Key) keys = append(keys, e.Key) continue } } break } // next time c.cLock.Unlock() return }
// get service b info if resp, err := cli.Get(context.TODO(), infoB.id); err != nil { log.Fatal(err) } else { for _, ev := range resp.Kvs { fmt.Printf("%s : %s\n", ev.Key, ev.Value) infoB.address = string(ev.Value) } }
// watch changes rch := cli.Watch(context.TODO(), infoB.id, clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { log.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) infoB.address = string(ev.Kv.Value) } } }