diff --git a/handler/im.go b/handler/im.go index 32e8a79..bda9e8e 100644 --- a/handler/im.go +++ b/handler/im.go @@ -9,6 +9,7 @@ import ( "log" "net/http" "strconv" + "sync" "time" "videoplayer/proto" "videoplayer/worker" @@ -21,6 +22,12 @@ var ( } ) +// 创建一个用于存储WebSocket连接的map和互斥锁 +var ( + clients = make(map[*websocket.Conn]bool) + clientsMux sync.Mutex +) + func SetUpIMGroup(router *gin.Engine) { imGroup := router.Group("/im") imGroup.POST("/get_imKey", GetImKey) @@ -116,6 +123,7 @@ func SRMessage(c *gin.Context) { // 升级HTTP连接为WebSocket连接 ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) + clients[ws] = true if err != nil { log.Println(err) return @@ -129,9 +137,13 @@ func SRMessage(c *gin.Context) { // 接收客户端消息并发送到指定用户 go func(ws *websocket.Conn, session string, to_id string) { for { - _, message, err := ws.ReadMessage() - if err != nil { - // 当连接关闭时 + _, message, err_r := ws.ReadMessage() + if err_r != nil { + // 当连接关闭时,退出循环 + clientsMux.Lock() + clients[ws] = false + clientsMux.Unlock() + //设置ws关闭状态信息 break } @@ -149,21 +161,27 @@ func SRMessage(c *gin.Context) { }(ws, res, to_user_id) // 从Redis中读取消息并发送到客户端 - go func(ws *websocket.Conn, session string, id int) { - for { - res2 := worker.PopRedisList(session + "_" + strconv.Itoa(id)) + for { + if v := clients[ws]; v == true { + res2 := worker.PopRedisListLeft(res + "_" + strconv.Itoa(id1)) if res2 != "" { var msg proto.Message msg.Type = "msg" msg.Msg = res2 - msg.From_user_id = id - msg.Session = session + msg.From_user_id = id1 + msg.Session = res res3, _ := json.Marshal(msg) err2 := ws.WriteMessage(websocket.TextMessage, res3) if err2 != nil { break } } + time.Sleep(time.Microsecond * 100) // 每100毫秒查询一次 + } else { + clientsMux.Lock() + delete(clients, ws) + clientsMux.Unlock() + break } - }(ws, res, id1) + } } diff --git a/worker/redis.go b/worker/redis.go index 64cc41d..4d5bd37 100644 --- a/worker/redis.go +++ b/worker/redis.go @@ -166,6 +166,17 @@ func PopRedisList(key string) string { return val } +// pop redis list from left,as queue +func PopRedisListLeft(key string) string { + ctx := context.Background() + val, err := redisClient.LPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 + if err != nil { + fmt.Println(key, " Error reading from Redis: %v", err) + return "" + } + return val +} + // push redis list from right func PushRedisList(key string, value string) bool { ctx := context.Background()