videoplayer/handler/im.go

188 lines
5.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package handler
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"log"
"net/http"
"strconv"
"sync"
"time"
"videoplayer/proto"
"videoplayer/worker"
)
var (
upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
)
// 创建一个用于存储WebSocket连接的map和互斥锁
var (
clients = make(map[*websocket.Conn]bool)
clientsMux sync.Mutex
)
func SetUpIMGroup(router *gin.Engine) {
imGroup := router.Group("/im")
imGroup.POST("/get_imKey", GetImKey)
imGroup.GET("/ws", SRMessage)
}
func generateRandomHexString(length int) (string, error) {
bytes := make([]byte, length/2) // 16字节的字符串需要32个十六进制字符即16个字节
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}
func GetImKey(c *gin.Context) {
id, _ := c.Get("id")
var req proto.ImKeyReq
if err := c.ShouldBindJSON(&req); err == nil {
id1 := int(id.(float64))
var redis_key string
if id1 < req.To_user_id {
redis_key = strconv.Itoa(id1) + "_" + strconv.Itoa(req.To_user_id) + "_imKey"
} else {
redis_key = strconv.Itoa(req.To_user_id) + "_" + strconv.Itoa(id1) + "_imKey"
}
if worker.IsContainKey(redis_key) == true {
res := worker.GetRedis(redis_key)
var retrievedData map[string]interface{}
err2 := json.Unmarshal([]byte(res), &retrievedData)
if err2 != nil {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
return
}
if retrievedData["from_user_id"] == id1 {
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"})
return
} else {
retrievedData["is_read"] = 1
res2 := worker.SetRedisWithExpire(redis_key+"_connection", retrievedData["im_session"].(string), time.Second*30)
if res2 == false {
c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"})
return
}
}
//生成随机字符串
imkey, err2 := generateRandomHexString(16)
imSession, err3 := generateRandomHexString(8)
if err2 != nil || err3 != nil {
c.JSON(http.StatusOK, gin.H{"error": "generate key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
data := make(map[string]interface{})
data["im_key"] = imkey
data["from_user_id"] = id1
data["to_user_id"] = req.To_user_id
data["im_session"] = imSession
data["is_read"] = 0
str, _ := json.Marshal(data)
//将key存入redis
res := worker.SetRedisWithExpire(redis_key, string(str), time.Second*30)
if res == false {
c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"})
return
} else {
c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"})
}
}
// 接收及发送消息
func SRMessage(c *gin.Context) {
id, _ := c.Get("id")
id1 := int(id.(float64))
var redis_key string
to_user_id := c.Query("to_user_id")
to_user_id_num, _ := strconv.ParseInt(to_user_id, 10, 64)
if id1 < int(to_user_id_num) {
redis_key = strconv.Itoa(id1) + "_" + to_user_id + "_imKey"
} else {
redis_key = to_user_id + "_" + strconv.Itoa(id1) + "_imKey"
}
if worker.IsContainKey(redis_key) == false {
c.JSON(http.StatusOK, gin.H{"code": proto.OperationFailed, "message": "failed"})
return
}
// 升级HTTP连接为WebSocket连接
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
clients[ws] = true
if err != nil {
log.Println(err)
return
}
defer ws.Close()
res := worker.GetRedis(redis_key + "_connection")
if res == "" {
return
}
// 接收客户端消息并发送到指定用户
go func(ws *websocket.Conn, session string, to_id string) {
for {
_, message, err_r := ws.ReadMessage()
if err_r != nil {
// 当连接关闭时,退出循环
clientsMux.Lock()
clients[ws] = false
clientsMux.Unlock()
//设置ws关闭状态信息
break
}
// 将接收到的消息解析为JSON
var msg proto.Message
if err2 := json.Unmarshal(message, &msg); err2 != nil {
log.Println("unmarshal:", err2)
continue
}
if msg.Type == "msg" {
// 将消息发送到指定用户
worker.PushRedisList(session+"_"+to_user_id, msg.Msg) //将消息存入redis
}
}
}(ws, res, to_user_id)
// 从Redis中读取消息并发送到客户端
for {
if v := clients[ws]; v == true {
res2 := worker.PopRedisListLeft(res + "_" + strconv.Itoa(id1))
if res2 != "" {
var msg proto.Message
msg.Type = "msg"
msg.Msg = res2
msg.From_user_id = id1
msg.Session = res
res3, _ := json.Marshal(msg)
err2 := ws.WriteMessage(websocket.TextMessage, res3)
if err2 != nil {
break
}
}
time.Sleep(time.Microsecond * 100) // 每100毫秒查询一次
} else {
clientsMux.Lock()
delete(clients, ws)
clientsMux.Unlock()
break
}
}
}