From 6ba661c0f82fae6ce02bc58dbcdde3c81db87ff9 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Tue, 29 Oct 2024 18:53:08 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A6=BB=E7=BA=BF=E8=81=8A=E5=A4=A9=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E4=BD=BF=E7=94=A8redis=E5=8F=91=E5=B8=83=E8=AE=A2?= =?UTF-8?q?=E9=98=85=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/im.go | 76 +++++++++++++++++++++++++++++++++++++++++++- service/imService.go | 4 +++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/handler/im.go b/handler/im.go index bc872a1..020f486 100644 --- a/handler/im.go +++ b/handler/im.go @@ -1,6 +1,7 @@ package handler import ( + "context" "crypto/rand" "encoding/hex" "encoding/json" @@ -74,7 +75,7 @@ func SetUpIMGroup(router *gin.Engine) { imGroup.POST("/get_group", GetGroups) imGroup.POST("/get_group_req_user", GetFriendRequest) imGroup.GET("/sse_msg", ServerSendMsg) - imGroup.GET("/ws_v2", ServerSendMsgV2) + imGroup.GET("/ws_v2", ServerSendMsgV3) imGroup.POST("/get_friend_list", GetFriendList) //获取好友列表,包括群聊 //获取好友请求 imGroup.POST("/get_friend_request", GetFriendRequest) @@ -607,6 +608,28 @@ func ServerSendMsgV2(c *gin.Context) { time.Sleep(time.Second * 1) } } + +func ServerSendMsgV3(c *gin.Context) { + //ws + id, _ := c.Get("id") + user_id := int(id.(float64)) + + // 升级HTTP连接为WebSocket连接 + ws, err1 := upgrader.Upgrade(c.Writer, c.Request, nil) + clients[ws] = true + if err1 != nil { + // log.Println(err) + fmt.Println(err1) + return + } + //设置用户在线状态 + worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60) + worker.SetRedisBitmap("im2_online_users", int64(user_id), 1) + worker.SetRedisSetAdd("im2_online_users_set", strconv.Itoa(user_id)) + //发送消息 + key := "user_" + strconv.Itoa(user_id) + "_msg_ids" + go subscribeAndHandleIMMessages(ws, key, user_id) +} func ServerSendMsg(c *gin.Context) { //sse c.Writer.Header().Set("Content-Type", "text/event-stream") @@ -654,3 +677,54 @@ func ServerSendMsg(c *gin.Context) { time.Sleep(time.Second * 1) } } + +func subscribeAndHandleIMMessages(ws *websocket.Conn, chanel string, user_id int) { + ctx := context.Background() + pubsub := worker.RedisClient.Subscribe(ctx, chanel) + defer pubsub.Close() + defer ws.Close() + ch := pubsub.Channel() + for m := range ch { + msg_id := m.Payload //消息id + if msg_id != "" { + msg_id_num, _ := strconv.ParseInt(msg_id, 10, 64) + msgs := dao.FindMessageByID2(uint(msg_id_num)) + if len(msgs) > 0 { + msg := msgs[0] + //发送消息 + msg_str, _ := json.Marshal(msg) + var msg_ proto.Message + msg_.Type = "msg" + msg_.Msg = string(msg_str) + msg_str2, _ := json.Marshal(msg_) + + err2 := ws.WriteMessage(websocket.TextMessage, msg_str2) + if err2 != nil { + worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600) + clientsMux.Lock() + delete(clients, ws) + clientsMux.Unlock() + break + } + } + } else { + var msg proto.Message + msg.Type = "check" + msg.Msg = "check" + msg.From_user_id = -1 + //发送心跳包 + res3, _ := json.Marshal(msg) + err2 := ws.WriteMessage(websocket.TextMessage, res3) + if err2 != nil { + worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600) + worker.SetRedisBitmap("im2_online_users", int64(user_id), 0) + worker.SetRedisSetRemove("im2_online_users_set", strconv.Itoa(user_id)) + clientsMux.Lock() + delete(clients, ws) + clientsMux.Unlock() + break + } + } + worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60) + } +} diff --git a/service/imService.go b/service/imService.go index 60185c3..b402d76 100644 --- a/service/imService.go +++ b/service/imService.go @@ -45,6 +45,8 @@ func CreateGeneralMessageService(from_id, to_id, msg_type, group_id int, content if res == "1" { //在线,存入redis worker.PushRedisListWithExpire("user_"+strconv.Itoa(to_id)+"_msg_ids", strconv.Itoa(int(id)), time.Second*300) + //发布消息 + worker.Publish("user_"+strconv.Itoa(to_id)+"_msg_ids", strconv.Itoa(int(id)), time.Second*300) } //判断接收方是否是机器人 id_str := strconv.Itoa(to_id) @@ -96,6 +98,8 @@ func CreateGeneralMessageService(from_id, to_id, msg_type, group_id int, content } //在线,存入redis worker.PushRedisListWithExpire("user_"+user_id+"_msg_ids", strconv.Itoa(int(id)), time.Second*300) + //发布消息 + worker.Publish("user_"+user_id+"_msg_ids", strconv.Itoa(int(id)), time.Second*300) } case 3: //user := dao.FindUserByID(to_id)