离线聊天消息使用redis发布订阅模式

This commit is contained in:
junleea 2024-10-29 18:53:08 +08:00
parent e1bae50a04
commit 6ba661c0f8
2 changed files with 79 additions and 1 deletions

View File

@ -1,6 +1,7 @@
package handler package handler
import ( import (
"context"
"crypto/rand" "crypto/rand"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
@ -74,7 +75,7 @@ func SetUpIMGroup(router *gin.Engine) {
imGroup.POST("/get_group", GetGroups) imGroup.POST("/get_group", GetGroups)
imGroup.POST("/get_group_req_user", GetFriendRequest) imGroup.POST("/get_group_req_user", GetFriendRequest)
imGroup.GET("/sse_msg", ServerSendMsg) imGroup.GET("/sse_msg", ServerSendMsg)
imGroup.GET("/ws_v2", ServerSendMsgV2) imGroup.GET("/ws_v2", ServerSendMsgV3)
imGroup.POST("/get_friend_list", GetFriendList) //获取好友列表,包括群聊 imGroup.POST("/get_friend_list", GetFriendList) //获取好友列表,包括群聊
//获取好友请求 //获取好友请求
imGroup.POST("/get_friend_request", GetFriendRequest) imGroup.POST("/get_friend_request", GetFriendRequest)
@ -607,6 +608,28 @@ func ServerSendMsgV2(c *gin.Context) {
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
} }
} }
func ServerSendMsgV3(c *gin.Context) {
//ws
id, _ := c.Get("id")
user_id := int(id.(float64))
// 升级HTTP连接为WebSocket连接
ws, err1 := upgrader.Upgrade(c.Writer, c.Request, nil)
clients[ws] = true
if err1 != nil {
// log.Println(err)
fmt.Println(err1)
return
}
//设置用户在线状态
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60)
worker.SetRedisBitmap("im2_online_users", int64(user_id), 1)
worker.SetRedisSetAdd("im2_online_users_set", strconv.Itoa(user_id))
//发送消息
key := "user_" + strconv.Itoa(user_id) + "_msg_ids"
go subscribeAndHandleIMMessages(ws, key, user_id)
}
func ServerSendMsg(c *gin.Context) { func ServerSendMsg(c *gin.Context) {
//sse //sse
c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Content-Type", "text/event-stream")
@ -654,3 +677,54 @@ func ServerSendMsg(c *gin.Context) {
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
} }
} }
func subscribeAndHandleIMMessages(ws *websocket.Conn, chanel string, user_id int) {
ctx := context.Background()
pubsub := worker.RedisClient.Subscribe(ctx, chanel)
defer pubsub.Close()
defer ws.Close()
ch := pubsub.Channel()
for m := range ch {
msg_id := m.Payload //消息id
if msg_id != "" {
msg_id_num, _ := strconv.ParseInt(msg_id, 10, 64)
msgs := dao.FindMessageByID2(uint(msg_id_num))
if len(msgs) > 0 {
msg := msgs[0]
//发送消息
msg_str, _ := json.Marshal(msg)
var msg_ proto.Message
msg_.Type = "msg"
msg_.Msg = string(msg_str)
msg_str2, _ := json.Marshal(msg_)
err2 := ws.WriteMessage(websocket.TextMessage, msg_str2)
if err2 != nil {
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600)
clientsMux.Lock()
delete(clients, ws)
clientsMux.Unlock()
break
}
}
} else {
var msg proto.Message
msg.Type = "check"
msg.Msg = "check"
msg.From_user_id = -1
//发送心跳包
res3, _ := json.Marshal(msg)
err2 := ws.WriteMessage(websocket.TextMessage, res3)
if err2 != nil {
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600)
worker.SetRedisBitmap("im2_online_users", int64(user_id), 0)
worker.SetRedisSetRemove("im2_online_users_set", strconv.Itoa(user_id))
clientsMux.Lock()
delete(clients, ws)
clientsMux.Unlock()
break
}
}
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60)
}
}

View File

@ -45,6 +45,8 @@ func CreateGeneralMessageService(from_id, to_id, msg_type, group_id int, content
if res == "1" { if res == "1" {
//在线,存入redis //在线,存入redis
worker.PushRedisListWithExpire("user_"+strconv.Itoa(to_id)+"_msg_ids", strconv.Itoa(int(id)), time.Second*300) worker.PushRedisListWithExpire("user_"+strconv.Itoa(to_id)+"_msg_ids", strconv.Itoa(int(id)), time.Second*300)
//发布消息
worker.Publish("user_"+strconv.Itoa(to_id)+"_msg_ids", strconv.Itoa(int(id)), time.Second*300)
} }
//判断接收方是否是机器人 //判断接收方是否是机器人
id_str := strconv.Itoa(to_id) id_str := strconv.Itoa(to_id)
@ -96,6 +98,8 @@ func CreateGeneralMessageService(from_id, to_id, msg_type, group_id int, content
} }
//在线,存入redis //在线,存入redis
worker.PushRedisListWithExpire("user_"+user_id+"_msg_ids", strconv.Itoa(int(id)), time.Second*300) worker.PushRedisListWithExpire("user_"+user_id+"_msg_ids", strconv.Itoa(int(id)), time.Second*300)
//发布消息
worker.Publish("user_"+user_id+"_msg_ids", strconv.Itoa(int(id)), time.Second*300)
} }
case 3: case 3:
//user := dao.FindUserByID(to_id) //user := dao.FindUserByID(to_id)