• 本文实现一个Echo TCP Server

interface/tcp/Handler.go

type Handler interface {
   Handle(ctx context.Context, conn net.Conn)
   Close() error
}
  • Handler:业务逻辑的处理接口
    • Handle(ctx context.Context, conn net.Conn) 处理连接

tcp/server.go

type Config struct {
    Address string
}

func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
    closeChan := make(chan struct{})
    listen, err := net.Listen("tcp", cfg.Address)
    if err != nil {
       return err
   }
    logger.Info("start listen")
    ListenAndServe(listen, handler, closeChan)
    return nil
}

func ListenAndServe(listener net.Listener,
                    handler tcp.Handler,
                    closeChan <-chan struct{}) {
    ctx := context.Background()
    var waitDone sync.WaitGroup
    for true {
        conn, err := listener.Accept()
        if err != nil {
            break
        }
        logger.Info("accept link")
        waitDone.Add(1)
        go func() {
            defer func() {
                waitDone.Done()
            }()
            handler.Handler(ctx, conn)
        }()
    }
    waitDone.Wait()
}
  • Config:启动tcp服务器的配置
    • Address:监听地址
  • ListenAndServe:ctx是上下文,可以传递一些参数。死循环中接收到新连接时,让一个协程去处理连接
  • 如果listener.Accept()出错了就会break跳出来,这时候需要等待已经服务的客户端退出。使用WaitGroup等待客服端退出
func ListenAndServe(listener net.Listener,
                    handler tcp.Handler,
                    closeChan <-chan struct{}) {

    go func() {
       <-closeChan
       logger.Info("shutting down...")
       _ = listener.Close()
       _ = handler.Close()
   }()

    defer func() {
       _ = listener.Close()
       _ = handler.Close()
   }()

    ......
}

listener和handler在退出的时候需要关掉。如果用户直接kill掉了程序,我们也需要关掉listener和handler,这时候要使用closeChan,一旦接收到关闭信号,就执行关闭逻辑

func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {

    closeChan := make(chan struct{})
    sigCh := make(chan os.Signal)
    signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
    go func() {
       sig := <-sigCh
       switch sig {
          case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
          closeChan <- struct{}{}
      }
   }()
    listen, err := net.Listen("tcp", cfg.Address)
    if err != nil {
       return err
   }
    logger.Info("start listen")
    ListenAndServe(listen, handler, closeChan)
    return nil
}

当系统对程序发送信号时,sigCh会接收到信号

tcp/echo.go

type EchoHandler struct {
   activeConn sync.Map
   closing    atomic.Boolean
}

EchoHandler:

  • activeConn:记录连接
  • closing:是否正在关闭,有并发竞争,使用atomic.Boolean
type EchoClient struct {
   Conn    net.Conn
   Waiting wait.Wait
}

func (c *EchoClient) Close() error {
	c.Waiting.WaitWithTimeout(10 * time.Second)
	_ = c.Conn.Close()
	return nil
}

EchoClient:一个客户端就是一个连接。Close方法关闭客户端连接,超时时间设置为10s

func MakeHandler() *EchoHandler {
	return &EchoHandler{}
}

func (h *EchoHandler) Handle(ctx context.Context, conn net.Conn) {
   // 连接正在关闭,不接收新连接
   if h.closing.Get() {
      _ = conn.Close()
   }

   client := &EchoClient{
      Conn: conn,
   }
   h.activeConn.Store(client, struct{}{})

   reader := bufio.NewReader(conn)
   for {
      msg, err := reader.ReadString('\n')
      if err != nil {
         if err == io.EOF {
            logger.Info("connection close")
            h.activeConn.Delete(client)
         } else {
            logger.Warn(err)
         }
         return
      }
      // 正在处理业务,不要关掉
      client.Waiting.Add(1)
      // 将数据原封不动写回去,测试
      b := []byte(msg)
      _, _ = conn.Write(b)
      client.Waiting.Done()
   }
}

func (h *EchoHandler) Close() error {
   logger.Info("handler shutting down...")
   h.closing.Set(true)
   h.activeConn.Range(func(key interface{}, val interface{}) bool {
      client := key.(*EchoClient)
      _ = client.Close()
      return true
   })
   return nil
}
  • MakeEchoHandler:创建EchoHandler
  • Handle:处理客户端的连接。
    • 1.连接正在关闭时,不接收新连接
    • 2.存储新连接,value用空结构体
    • 3.使用缓存区接收用户发来的数据,使用\n作为结束的标志
  • Close:将所有客户端连接关掉

main.go

const configFile string = "redis.conf"

var defaultProperties = &config.ServerProperties{
   Bind: "0.0.0.0",
   Port: 6379,
}

func fileExists(filename string) bool {
   info, err := os.Stat(filename)
   return err == nil && !info.IsDir()
}

func main() {
   logger.Setup(&logger.Settings{
      Path:       "logs",
      Name:       "godis",
      Ext:        "log",
      TimeFormat: "2022-02-02",
   })

   if fileExists(configFile) {
      config.SetupConfig(configFile)
   } else {
      config.Properties = defaultProperties
   }

   err := tcp.ListenAndServeWithSignal(
      &tcp.Config{
         Address: fmt.Sprintf("%s:%d",
            config.Properties.Bind,
            config.Properties.Port),
      },
      EchoHandler.MakeHandler())
   if err != nil {
      logger.Error(err)
   }
}

测试