From 0edfbd22eefc830f8865502f36d29f39d6907abc Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Thu, 27 Jun 2024 10:53:47 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0im=E6=B6=88=E6=81=AF=E9=83=A8?= =?UTF-8?q?=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 1 + go.sum | 2 + handler/im.go | 169 ++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 11 ++-- proto/im.go | 14 +++++ 5 files changed, 192 insertions(+), 5 deletions(-) create mode 100644 handler/im.go create mode 100644 proto/im.go diff --git a/go.mod b/go.mod index 833e475..c36b8b9 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-redis/redis/v8 v8.11.5 github.com/golang-jwt/jwt v3.2.2+incompatible github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 gorm.io/driver/mysql v1.5.6 gorm.io/gorm v1.25.7 ) diff --git a/go.sum b/go.sum index 86c8ef1..27961a6 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= diff --git a/handler/im.go b/handler/im.go new file mode 100644 index 0000000..32e8a79 --- /dev/null +++ b/handler/im.go @@ -0,0 +1,169 @@ +package handler + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" + "log" + "net/http" + "strconv" + "time" + "videoplayer/proto" + "videoplayer/worker" +) + +var ( + upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } +) + +func SetUpIMGroup(router *gin.Engine) { + imGroup := router.Group("/im") + imGroup.POST("/get_imKey", GetImKey) + imGroup.GET("/ws", SRMessage) + +} +func generateRandomHexString(length int) (string, error) { + bytes := make([]byte, length/2) // 16字节的字符串需要32个十六进制字符,即16个字节 + if _, err := rand.Read(bytes); err != nil { + return "", err + } + return hex.EncodeToString(bytes), nil +} + +func GetImKey(c *gin.Context) { + id, _ := c.Get("id") + var req proto.ImKeyReq + if err := c.ShouldBindJSON(&req); err == nil { + id1 := int(id.(float64)) + var redis_key string + if id1 < req.To_user_id { + redis_key = strconv.Itoa(id1) + "_" + strconv.Itoa(req.To_user_id) + "_imKey" + } else { + redis_key = strconv.Itoa(req.To_user_id) + "_" + strconv.Itoa(id1) + "_imKey" + } + if worker.IsContainKey(redis_key) == true { + res := worker.GetRedis(redis_key) + var retrievedData map[string]interface{} + err2 := json.Unmarshal([]byte(res), &retrievedData) + if err2 != nil { + c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"}) + return + } + if retrievedData["from_user_id"] == id1 { + c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"}) + return + } else { + retrievedData["is_read"] = 1 + res2 := worker.SetRedisWithExpire(redis_key+"_connection", retrievedData["im_session"].(string), time.Second*30) + if res2 == false { + c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"}) + return + } + c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"}) + return + } + } + + //生成随机字符串 + imkey, err2 := generateRandomHexString(16) + imSession, err3 := generateRandomHexString(8) + if err2 != nil || err3 != nil { + c.JSON(http.StatusOK, gin.H{"error": "generate key failed", "code": proto.OperationFailed, "message": "failed"}) + return + } + + data := make(map[string]interface{}) + data["im_key"] = imkey + data["from_user_id"] = id1 + data["to_user_id"] = req.To_user_id + data["im_session"] = imSession + data["is_read"] = 0 + str, _ := json.Marshal(data) + //将key存入redis + res := worker.SetRedisWithExpire(redis_key, string(str), time.Second*30) + if res == false { + c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"}) + return + } + c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"}) + return + } else { + c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"}) + } +} + +// 接收及发送消息 +func SRMessage(c *gin.Context) { + id, _ := c.Get("id") + id1 := int(id.(float64)) + var redis_key string + to_user_id := c.Query("to_user_id") + to_user_id_num, _ := strconv.ParseInt(to_user_id, 10, 64) + if id1 < int(to_user_id_num) { + redis_key = strconv.Itoa(id1) + "_" + to_user_id + "_imKey" + } else { + redis_key = to_user_id + "_" + strconv.Itoa(id1) + "_imKey" + } + if worker.IsContainKey(redis_key) == false { + c.JSON(http.StatusOK, gin.H{"code": proto.OperationFailed, "message": "failed"}) + return + } + + // 升级HTTP连接为WebSocket连接 + ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + log.Println(err) + return + } + defer ws.Close() + res := worker.GetRedis(redis_key + "_connection") + if res == "" { + return + } + + // 接收客户端消息并发送到指定用户 + go func(ws *websocket.Conn, session string, to_id string) { + for { + _, message, err := ws.ReadMessage() + if err != nil { + // 当连接关闭时 + break + } + + // 将接收到的消息解析为JSON + var msg proto.Message + if err2 := json.Unmarshal(message, &msg); err2 != nil { + log.Println("unmarshal:", err2) + continue + } + if msg.Type == "msg" { + // 将消息发送到指定用户 + worker.PushRedisList(session+"_"+to_user_id, msg.Msg) //将消息存入redis + } + } + }(ws, res, to_user_id) + + // 从Redis中读取消息并发送到客户端 + go func(ws *websocket.Conn, session string, id int) { + for { + res2 := worker.PopRedisList(session + "_" + strconv.Itoa(id)) + if res2 != "" { + var msg proto.Message + msg.Type = "msg" + msg.Msg = res2 + msg.From_user_id = id + msg.Session = session + res3, _ := json.Marshal(msg) + err2 := ws.WriteMessage(websocket.TextMessage, res3) + if err2 != nil { + break + } + } + } + }(ws, res, id1) +} diff --git a/main.go b/main.go index fcb0673..eeccbda 100644 --- a/main.go +++ b/main.go @@ -19,11 +19,12 @@ func main() { dao.Init() worker.InitRedis() r.Use(handler.CrosHandler()) - r.Use(JWTAuthMiddleware()) // 使用 JWT 认证中间件 - handler.SetUpVideoGroup(r) - handler.SetUpUserGroup(r) - handler.SetUpDeviceGroup(r) - r.Run(":8083") // listen and serve on 0.0.0.0:8082 + r.Use(JWTAuthMiddleware()) // 使用 JWT 认证中间件 + handler.SetUpVideoGroup(r) // Video + handler.SetUpUserGroup(r) // User + handler.SetUpDeviceGroup(r) // Device + handler.SetUpIMGroup(r) // IM + r.Run(":8083") // listen and serve on 0.0.0.0:8082 defer dao.Close() defer worker.CloseRedis() } diff --git a/proto/im.go b/proto/im.go new file mode 100644 index 0000000..49f4c3e --- /dev/null +++ b/proto/im.go @@ -0,0 +1,14 @@ +package proto + +type ImKeyReq struct { + To_user_id int `json:"to_user_id" form:"to_user_id" binding:"required"` +} + +// 定义WebSocket消息格式 +type Message struct { + Type string `json:"type"` + Msg string `json:"data"` + To_user_id int `json:"to_user_id"` + From_user_id int `json:"from_user_id"` + Session string `json:"session"` +}