使用 Go 和 ReactJS 构建聊天系统 (四)
使用 Go 和 ReactJS 构建聊天系统 (四)
使用 Go 和 ReactJS 构建聊天系统 (四)
- 原文地址:Part 4 - Handling Multiple Clients
- 译文地址:https://github.com/watermelo/dailyTrans
- 原文作者:Elliot Forbes
- 译者:咔叽咔叽
- 译者水平有限,如有翻译或理解谬误,烦请帮忙指出
本节完整代码:GitHub
本文是关于使用 ReactJS 和 Go 构建聊天应用程序的系列文章的第 4 部分。你可以在这里找到第 3 部分 - 前端实现
这节主要实现处理多个客户端消息的功能,并将收到的消息广播到每个连接的客户端。在本系列的这一部分结束时,我们将:
- 实现了一个池机制,可以有效地跟踪 WebSocket 服务中的连接数。
- 能够将任何收到的消息广播到连接池中的所有连接。
- 当另一个客户端连接或断开连接时,能够通知现有的客户端。
在本课程的这一部分结束时,我们的应用程序看起来像这样:
拆分 Websocket 代码
现在已经完成了必要的基本工作,我们可以继续改进代码库。可以将一些应用程序拆分为子包以便于开发。
现在,理想情况下,你的 main.go
文件应该只是 Go 应用程序的入口,它应该相当小,并且可以调用项目中的其他包。
注意 - 我们将参考非官方标准的 Go 项目结构布局 - golang-standards/project-layout
让我们在后端项目目录中创建一个名为 pkg/
的新目录。在此期间,我们将要创建另一个名为 websocket/
的目录,该目录将包含 websocket.go
文件。
我们将把目前在 main.go
文件中使用的许多基于 WebSocket 的代码移动到这个新的 websocket.go
文件中。
注意 - 需要注意的一件事是,当复制函数时,需要将每个函数的第一个字母大写,我们希望这些函数对项目的其余部分可导出。
1package websocket
2
3import (
4 "fmt"
5 "io"
6 "log"
7 "net/http"
8
9 "github.com/gorilla/websocket"
10)
11
12var upgrader = websocket.Upgrader{
13 ReadBufferSize: 1024,
14 WriteBufferSize: 1024,
15 CheckOrigin: func(r *http.Request) bool { return true },
16}
17
18func Upgrade(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
19 ws, err := upgrader.Upgrade(w, r, nil)
20 if err != nil {
21 log.Println(err)
22 return ws, err
23 }
24 return ws, nil
25}
26
27func Reader(conn *websocket.Conn) {
28 for {
29 messageType, p, err := conn.ReadMessage()
30 if err != nil {
31 log.Println(err)
32 return
33 }
34
35 fmt.Println(string(p))
36
37 if err := conn.WriteMessage(messageType, p); err != nil {
38 log.Println(err)
39 return
40 }
41 }
42}
43
44func Writer(conn *websocket.Conn) {
45 for {
46 fmt.Println("Sending")
47 messageType, r, err := conn.NextReader()
48 if err != nil {
49 fmt.Println(err)
50 return
51 }
52 w, err := conn.NextWriter(messageType)
53 if err != nil {
54 fmt.Println(err)
55 return
56 }
57 if _, err := io.Copy(w, r); err != nil {
58 fmt.Println(err)
59 return
60 }
61 if err := w.Close(); err != nil {
62 fmt.Println(err)
63 return
64 }
65 }
66}
现在已经创建了这个新的 websocket
包,然后我们想要更新 main.go
文件来调用这个包。首先必须在文件顶部的导入列表中添加一个新的导入,然后可以通过使用 websocket.
来调用该包中的函数。像这样:
1package main
2
3import (
4 "fmt"
5 "net/http"
6
7 "realtime-chat-go-react/backend/pkg/websocket"
8)
9
10func serveWs(pool *websocket.Pool, w http.ResponseWriter, r *http.Request) {
11 fmt.Println("WebSocket Endpoint Hit")
12 conn, err := websocket.Upgrade(w, r)
13 if err != nil {
14 fmt.Fprintf(w, "%+v\n", err)
15 }
16
17 client := &websocket.Client{
18 Conn: conn,
19 Pool: pool,
20 }
21
22 pool.Register <- client
23 client.Read()
24}
25
26func setupRoutes() {
27 pool := websocket.NewPool()
28 go pool.Start()
29
30 http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
31 serveWs(pool, w, r)
32 })
33}
34
35func main() {
36 fmt.Println("Distributed Chat App v0.01")
37 setupRoutes()
38 http.ListenAndServe(":8080", nil)
39}
经过这些修改,我们应该检查一下这些是否破坏了现有的功能。尝试再次运行后端和前端,确保仍然可以发送和接收消息:
1$ cd backend/
2$ go run main.go
如果成功,我们可以继续扩展代码库来处理多客户端。
到目前为止,目录结构应如下所示:
1- backend/
2- - pkg/
3- - - websocket/
4- - - - websocket.go
5- - main.go
6- - go.mod
7- - go.sum
8- frontend/
9- ...
处理多客户端
现在已经完成了基本的操作,我们可以继续改进后端并实现处理多个客户端的功能。
为此,我们需要考虑如何处理与 WebSocket 服务的连接。每当建立新连接时,我们都必须将它们添加到现有连接池中,并确保每次发送消息时,该池中的每个人都会收到该消息。
使用 Channels
我们需要开发一个具有大量并发连接的系统。在该连接的持续时间内都会启动新的 goroutine
去处理每一个连接。这意味着我们必须关心这些并发 goroutine
之间的通信,并确保线程安全。
当进一步实现 Pool
结构时,我们必须考虑使用 sync.Mutex
来阻塞其他 goroutine
同时访问/修改数据,或者我们也可以使用 channels
。
对于这个项目,我认为最好使用 channels
并且以安全的方式在多个并发的 goroutine
中进行通信。
注意 - 如果想进一步了解 Go 中的
channels
,可以在这里查看我的其他文章:Go Channels Tutorial
client.go
我们先创建一个名为 client.go
新文件,它将存在于 pkg/websocket
目录中,在文件中将定义一个包含以下内容的 Client
结构体:
- ID:特定连接的唯一可识别字符串
- Conn:指向
websocket.Conn
的指针 - Pool:指向
Pool
的指针??
还需要定义一个 Read()
方法,该方法将一直监听此 Client
的 websocket 连接上发出的新消息。
如果收到新消息,它将把这些消息传递给池的 Broadcast
channel,该 channel 随后将接收的消息广播到池中的每个客户端。
1package websocket
2
3import (
4 "fmt"
5 "log"
6
7 "github.com/gorilla/websocket"
8)
9
10type Client struct {
11 ID string
12 Conn *websocket.Conn
13 Pool *Pool
14}
15
16type Message struct {
17 Type int `json:"type"`
18 Body string `json:"body"`
19}
20
21func (c *Client) Read() {
22 defer func() {
23 c.Pool.Unregister <- c
24 c.Conn.Close()
25 }()
26
27 for {
28 messageType, p, err := c.Conn.ReadMessage()
29 if err != nil {
30 log.Println(err)
31 return
32 }
33 message := Message{Type: messageType, Body: string(p)}
34 c.Pool.Broadcast <- message
35 fmt.Printf("Message Received: %+v\n", message)
36 }
37}
太棒了,我们已经在代码中定义了客户端,继续实现池。
Pool 结构体
我们在 pkg/websocket
目录下创建一个新文件 pool.go
。
首先定义一个 Pool
结构体,它将包含我们进行并发通信所需的所有 channels
,以及一个客户端 map
。
1package websocket
2
3import "fmt"
4
5type Pool struct {
6 Register chan *Client
7 Unregister chan *Client
8 Clients map[*Client]bool
9 Broadcast chan Message
10}
11
12func NewPool() *Pool {
13 return &Pool{
14 Register: make(chan *Client),
15 Unregister: make(chan *Client),
16 Clients: make(map[*Client]bool),
17 Broadcast: make(chan Message),
18 }
19}
我们需要确保应用程序中只有一个点能够写入 WebSocket 连接,否则将面临并发写入问题。所以,定义了 Start()
方法,该方法将一直监听传递给 Pool
channels 的内容,然后,如果它收到发送给其中一个 channel 的内容,它将采取相应的行动。
- Register - 当新客户端连接时,
Register channel
将向此池中的所有客户端发送New User Joined...
- Unregister - 注销用户,在客户端断开连接时通知池
- Clients - 客户端的布尔值映射。可以使用布尔值来判断客户端活动/非活动
- Broadcast - 一个 channel,当它传递消息时,将遍历池中的所有客户端并通过套接字发送消息。
代码:
1func (pool *Pool) Start() {
2 for {
3 select {
4 case client := <-pool.Register:
5 pool.Clients[client] = true
6 fmt.Println("Size of Connection Pool: ", len(pool.Clients))
7 for client, _ := range pool.Clients {
8 fmt.Println(client)
9 client.Conn.WriteJSON(Message{Type: 1, Body: "New User Joined..."})
10 }
11 break
12 case client := <-pool.Unregister:
13 delete(pool.Clients, client)
14 fmt.Println("Size of Connection Pool: ", len(pool.Clients))
15 for client, _ := range pool.Clients {
16 client.Conn.WriteJSON(Message{Type: 1, Body: "User Disconnected..."})
17 }
18 break
19 case message := <-pool.Broadcast:
20 fmt.Println("Sending message to all clients in Pool")
21 for client, _ := range pool.Clients {
22 if err := client.Conn.WriteJSON(message); err != nil {
23 fmt.Println(err)
24 return
25 }
26 }
27 }
28 }
29}
websocket.go
太棒了,我们再对 websocket.go
文件进行一些小修改,并删除一些不再需要的函数和方法:
1package websocket
2
3import (
4 "log"
5 "net/http"
6
7 "github.com/gorilla/websocket"
8)
9
10var upgrader = websocket.Upgrader{
11 ReadBufferSize: 1024,
12 WriteBufferSize: 1024,
13 CheckOrigin: func(r *http.Request) bool { return true },
14}
15
16func Upgrade(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
17 conn, err := upgrader.Upgrade(w, r, nil)
18 if err != nil {
19 log.Println(err)
20 return nil, err
21 }
22
23 return conn, nil
24}
更新 main.go
最后,我们需要更新 main.go
文件,在每个连接上创建一个新 Client
,并使用 Pool
注册该客户端:
1package main
2
3import (
4 "fmt"
5 "net/http"
6
7 "github.com/TutorialEdge/realtime-chat-go-react/pkg/websocket"
8)
9
10func serveWs(pool *websocket.Pool, w http.ResponseWriter, r *http.Request) {
11 fmt.Println("WebSocket Endpoint Hit")
12 conn, err := websocket.Upgrade(w, r)
13 if err != nil {
14 fmt.Fprintf(w, "%+v\n", err)
15 }
16
17 client := &websocket.Client{
18 Conn: conn,
19 Pool: pool,
20 }
21
22 pool.Register <- client
23 client.Read()
24}
25
26func setupRoutes() {
27 pool := websocket.NewPool()
28 go pool.Start()
29
30 http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
31 serveWs(pool, w, r)
32 })
33}
34
35func main() {
36 fmt.Println("Distributed Chat App v0.01")
37 setupRoutes()
38 http.ListenAndServe(":8080", nil)
39}
测试
现在已经做了所有必要的修改,我们应该测试已经完成的工作并确保一切按预期工作。
启动你的后端应用程序:
1$ go run main.go
2Distributed Chat App v0.01
如果你在几个浏览器中打开 http://localhost:3000,可以看到到它们会自动连接到后端 WebSocket 服务,现在我们可以发送和接收来自同一池内的其他客户端的消息!
总结
在本节中,我们设法实现了一种处理多个客户端的方法,并向连接池中连接的每个人广播消息。
现在开始变得有趣了。我们可以在下一节中添加新功能,例如自定义消息。
下一节:Part 5 - 优化前端