有了前两篇的铺垫,相信大家已经对 Golang 中 WebSocket
的使用有一定的了解了,
今天我们以一个更加真实的例子来学习如何在 Golang 中使用 WebSocket
。
在实际的项目中,往往有一些任务耗时比较长,然后我们会把这些任务做异步的处理,但是又要及时给客户端反馈任务的处理进度。
对于这种场景,我们可以使用 WebSocket
来实现。其他可以使用 WebSocket
进行通知的场景还有像管理后台一些通知(比如新订单通知)等。
在本篇文章中,就是要实现一个这样的消息推送系统,具体来说,它会有以下功能:
WebSocket
连接与用户 ID 之间的关联WebSocket
连接与用户的关联,并且关闭这个 WebSocket
连接WebSocket
消息:只要传递用户 ID 以及需要推送的消息即可下面是一个最简单版本的框架图:
它包含如下几个角色:
Client
客户端,也就是实际中接收消息通知的浏览器Server
服务端,在我们的例子中,服务端实际不处理业务逻辑,只处理跟客户端的消息交互:维持 WebSocket
连接,推送消息到特定的 WebSocket
连接Server
推送的数据是来自业务逻辑处理的结果设计成这样的目的是为了将技术跟业务进行分离,业务逻辑上的变化不影响到底层技术,同样的,WebSocket
推送中心的技术上的变动也不会影响到实际的业务。
Client
结构体的变化type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
// 新增字段
uid int
}
因为我们需要建立起 WebSocket
连接与用户之间的关联,因此我们需要一个额外的字段来记录用户 ID,也就是上面的 uid
字段。
这个字段会在客户端建立连接后写入。
Hub
结构体的变化type Hub struct {
clients map[*Client]bool
register chan *Client
unregister chan *Client
// 记录 uid 跟 client 的对应关系
userClients map[int]*Client
// 读写锁,保护 userClients 以及 clients 的读写
sync.RWMutex
}
Hub
中的 broadcast
字段。取而代之的是,我们会直接在消息推送接口中写入到 uid
对应的 Client
的 send
通道。
当然我们也可以在 Hub
中另外加一个字段来记录要推送给不同 uid
的消息,但是我们的 Hub
的 run
方法是一个协程处理的,当需要推送的数据较多或者其中有
网络延迟的时候,会直接影响到推送给其他用户的消息。当然我们也可以改造一下 run
方法,启动多个协程来处理,不过这样比较复杂,本文会在 writePump
中处理。
(也就是建立 WebSocket
连接时的那个写操作协程)
uid
来获取对应的 WebSocket
连接,新增了一个 userClients
字段。这是一个 map
类型的字段,key
是 uid
,值是对应的 Client
指针。
Mutex
互斥锁因为,在用户实际进行登录的时候需要写入 userClients
字段,而这是一个 map
类型字段,并不支持并发读写。
如果我们在接受并发连接的时候同时修改 userClients
的时候会导致 panic
,因此我们使用了一个互斥锁来保证 userClients
的读写安全。
同时,clients
也是一个 map
,但上一篇文章中没有使用 sync.Mutex
来保护它的读写,在并发操作的时候也是会有问题的,
所以 Mutex
同时也需要保护 clients
的读写。
func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.Lock()
h.clients[client] = true
h.Unlock()
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
h.Lock()
delete(h.userClients, client.uid)
delete(h.clients, client)
h.Unlock()
close(client.send)
}
}
}
}
最后,我们会在 Hub
的 run
方法中写 userClients
或者 clients
字段的时候,先获取锁,写成功的时候释放锁。
在本篇中,将会继续沿用上一篇的代码,只是其中一些细节会有所改动。建立连接这步操作,跟上一篇的一样:
// 将 HTTP 转换为 WebSocket 连接的 Upgrader
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// 处理 WebSocket 连接请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
// 升级为 WebSocket 连接
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
// 新建一个 Client
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
// 注册到 Hub
client.hub.register <- client
// 推送消息的协程
go client.writePump()
// 结束消息的协程
go client.readPump()
}
由于我们要做的只是一个推送消息的系统,所以我们只处理用户发来的登录请求,其他的消息会全部丢弃:
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
_ = c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Time{}) // 永不超时
for {
// 从客户端接收消息
_, message, err := c.conn.ReadMessage()
if err != nil {
log.Println("readPump error: ", err)
break
}
// 只处理登录消息
var data = make(map[string]string)
err = json.Unmarshal(message, &data)
if err != nil {
break
}
// 写入 uid 以及 Hub 的 userClients
if uid, ok := data["uid"]; ok {
c.uid = uid
c.hub.Lock()
c.hub.userClients[uid] = c
c.hub.Unlock()
}
}
}
在本文中,假设客户端的登录消息格式为 {"uid": "123456"}
这种 json
格式。
在这里也操作了
userClients
字段,同样需要使用互斥锁来保证操作的安全性。
// 发送消息的接口
// 参数:
// 1. uid:接收消息的用户 ID
// 2. message:需要发送给这个用户的消息
http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) {
send(hub, w, r)
})
// 发送消息的方法
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
uid := r.FormValue("uid")
// 参数错误
if uid == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
// 从 hub 中获取 client
hub.Lock()
client, ok := hub.userClients[uid]
hub.Unlock()
// 尚未建立连接
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
// 发送消息
message := r.FormValue("message")
client.send <- []byte(message)
}
在 writePump
方法中,我们会将从 /send
接收到的数据发送给对应的用户:
// 发送消息的协程
func (c *Client) writePump() {
defer func() {
_ = c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
// 设置写超时时间
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
// 连接已经被关闭了
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// 获取一个发送消息的 Writer
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
// 写入消息到 Writer
w.Write(message)
// 关闭 Writer
if err := w.Close(); err != nil {
return
}
}
}
}
在这个方法中,我们会从 c.send
这个 chan
中获取需要发送给客户端的消息,然后进行发送操作。
main
程序go run main.go
ws = new WebSocket('ws://127.0.0.1:8181/ws')
ws.send('{"uid": "123"}')
这两行代码的作用是与 WebSocket
服务器建立连接,然后发送一个登录信息。
然后我们打开控制台的 Network -> WS -> Message
就可以看到浏览器发给服务端的消息:
假设我们的
WebSocket
服务器绑定的端口为8181
打开终端,执行以下命令:
curl "http://localhost:8181/send?uid=123&message=Hello%20World"
然后我们可以在 Network -> WS -> Message
看到接收到了消息 Hello World
。
到此为止,我们已经实现了一个初步可工作的 WebSocket
应用,当然还有很多可以优化的地方,
比如:
Hub
状态目前对外部来说是一个黑盒子,我们可以加个接口返回一下 Hub
的当前状态,比如当前连接数这些功能会在后续继续完善,今天就到此为止了。