星河

醉后不知天在水,满船清梦压星河

Golang 简易WS服务 - 服务端

mingzaily / 2023-02-21


本次开发主要使用了Gorilla Websocket软件包

客户端结构体

维护socket连接,保存客户端信息

代码

package contorller

import (
	"encoding/json"
	"fmt"
	"git.myscrm.cn/vr/server-yk-app-builder/model"
	"git.myscrm.cn/vr/server-yk-app-builder/pkg/enum"
	"git.myscrm.cn/vr/server-yk-app-builder/pkg/logger"
	"git.myscrm.cn/vr/server-yk-app-builder/pkg/response"
	"github.com/gorilla/websocket"
	"strconv"
	"time"
)

const (
	// pingWait is the maximum time in seconds to wait for a ping from
	pingWait = 20 * time.Second
	// Maximum message size allowed from peer.
	maxMessageSize = 512
)

type wsClient struct {
	manager  *wsManager
	id       string
	desc     string
	conn     *websocket.Conn
	status   enum.ClientStatus
	send     chan []byte
	isClosed chan bool
	lastPing time.Time
}

func newWsClient(manager *wsManager, conn *websocket.Conn, id, name, ip string) *wsClient {
	return &wsClient{
		manager:  manager,
		id:       id,
		desc:     fmt.Sprintf("客户端(%s, %s)", name, ip),
		conn:     conn,
		status:   enum.ClientStatusOffline,
		send:     make(chan []byte),
		isClosed: make(chan bool),
		lastPing: time.Now(),
	}
}

// Read 读取客户端发送过来的消息
func (c *wsClient) Read() {
	defer func() {
		c.unRegister()
		logger.Info(c.desc, "read协程退出")
	}()

	c.conn.SetReadLimit(maxMessageSize)
	c.conn.SetPingHandler(func(text string) error {
		// 只需要知道客户端还活着就行,不需要回复
		c.lastPing = time.Now()
		// 更新客户端状态
		clientStatus, _ := strconv.ParseInt(text, 10, 32)
		c.status = enum.ClientStatus(int32(clientStatus))
		return nil
	})

	for {
		msgType, data, err := c.conn.ReadMessage()
		if err != nil {
			logger.Error(c.desc, "c.conn.ReadMessage", err.Error())
			break
		}

		switch msgType {
		case websocket.TextMessage:
			var msg *model.WsMessage
			err = json.Unmarshal(data, &msg)
			if err != nil {
				logger.Error(c.desc, "json.Unmarshal", err.Error())
				break
			}

			switch msg.Type {
			default:
				logger.Info(c.desc, "未知消息类型", string(data))
				c.send <- data
			}
		}
	}
}

// Write 把对应消息写回客户端
func (c *wsClient) Write() {
	defer func() {
		logger.Info(c.desc, "write协程退出")
		c.unRegister()
	}()
	for {
		select {
		case <-c.isClosed:
			return
		case msg := <-c.send:
			err := c.conn.WriteMessage(websocket.TextMessage, msg)
			if err != nil {
				logger.Error(c.desc, "c.conn.WriteMessage", err.Error())
				return
			}
		}
	}
}

// Check 检测客户端是否超时
func (c *wsClient) Check() {
	defer func() {
		logger.Info(c.desc, "check协程退出")
	}()
	ticker := time.NewTicker(pingWait / 6)
	for {
		select {
		case <-c.isClosed:
			return
		case <-ticker.C:
			// 主动关闭连接
			if time.Now().Sub(c.lastPing) > pingWait {
				response.WsReturnErr(c.conn, enum.WsDataErr, "客户端超时,主动关闭连接")
				logger.Info(c.desc, "客户端超时,主动关闭连接")
				return
			}
		}
	}
}

func (c *wsClient) unRegister() {
	if c.manager.clients[c.id] != nil {
		c.manager.unRegister <- c
	}
}

使用

应当在控制器/入口处

func (c *cSocket) Ws(ctx *gin.Context) {
	ws, err := c.upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
	if err != nil {
		return nil, "", err
	}

	ip := ctx.ClientIP()
	name := ctx.GetString("device_name")

	client := newWsClient(WsManager, ws, clientId, name, ip)
	client.manager.register <- client

	go client.Read()
	go client.Write()
	go client.Check()
}

客户端管理器

管理客户端,支持登录、注销、广播

代码

package contorller

import (
	"encoding/json"
	"git.myscrm.cn/vr/server-yk-app-builder/model"
	"git.myscrm.cn/vr/server-yk-app-builder/pkg/enum"
	"git.myscrm.cn/vr/server-yk-app-builder/pkg/logger"
)

var WsManager = NewWsManager()

type wsManager struct {
	clients    map[string]*wsClient // 记录在线用户
	broadcast  chan []byte          // 触发消息广播
	register   chan *wsClient       // 触发设备或用户登陆
	unRegister chan *wsClient       // 触发设备或用户退出
}

func NewWsManager() *wsManager {
	return &wsManager{
		clients:    make(map[string]*wsClient),
		broadcast:  make(chan []byte),
		register:   make(chan *wsClient),
		unRegister: make(chan *wsClient),
	}
}

func (m *wsManager) Start() {
	for {
		select {
		case conn := <-m.register:
			m.clients[conn.id] = conn
			conn.status = enum.ClientStatusOnline
			logger.Info(conn.desc, "已连接", "当前在线客户端", len(m.clients))
			data, _ := json.Marshal(&model.WsMessage{Type: enum.WsDataNotify, Data: "socket 初始化成功"})
			conn.send <- data
		}
	}
}

func (m *wsManager) BoardCast() {
	for {
		select {
		case message := <-m.broadcast:
			for _, conn := range m.clients {
				conn.send <- message
			}
		}
	}
}

func (m *wsManager) Quit() {
	for {
		select {
		case conn := <-m.unRegister:
			if _, ok := m.clients[conn.id]; ok {
				delete(m.clients, conn.id)
				conn.conn.Close()
				conn.status = enum.ClientStatusOffline
				close(conn.isClosed)
			}
		}
	}
}

func (m *wsManager) GetClient(id string) *wsClient {
	if client, ok := m.clients[id]; ok {
		return client
	}
	return nil
}

使用

一般为main.go或启动处

func RegisterHttpRoute(serverMux *gin.Engine) error {
	go contorller.WsManager.Start()
	go contorller.WsManager.Quit()

	// 设置恐慌恢复,错误处理
	serverMux.Use(middleware.PanicHandler(), middleware.ErrorHandler())
	serverMux.NoMethod(middleware.NotFoundHandler())
	serverMux.NoRoute(middleware.NotFoundHandler())

	// 注册 websocket 接口
	serverMux.GET("/socket", middleware.SocketAuth(), contorller.Socket.Ws)

	...
}