videoplayer/handler/im.go

227 lines
6.6 KiB
Go
Raw Normal View History

2024-06-27 10:53:47 +08:00
package handler
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
2024-06-28 18:10:22 +08:00
"fmt"
2024-06-27 10:53:47 +08:00
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"log"
"net/http"
"strconv"
"sync"
2024-06-27 10:53:47 +08:00
"time"
"videoplayer/proto"
"videoplayer/worker"
)
var (
upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
2024-06-28 18:10:22 +08:00
CheckOrigin: func(r *http.Request) bool {
// 允许所有来源的连接
return true
},
2024-06-27 10:53:47 +08:00
}
)
// 创建一个用于存储WebSocket连接的map和互斥锁
var (
clients = make(map[*websocket.Conn]bool)
clientsMux sync.Mutex
)
2024-06-27 10:53:47 +08:00
func SetUpIMGroup(router *gin.Engine) {
imGroup := router.Group("/im")
imGroup.POST("/get_imKey", GetImKey)
imGroup.GET("/ws", SRMessage)
}
func generateRandomHexString(length int) (string, error) {
bytes := make([]byte, length/2) // 16字节的字符串需要32个十六进制字符即16个字节
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}
func GetImKey(c *gin.Context) {
id, _ := c.Get("id")
var req proto.ImKeyReq
2024-06-28 18:10:22 +08:00
if err := c.ShouldBind(&req); err == nil {
2024-06-27 10:53:47 +08:00
id1 := int(id.(float64))
var redis_key string
if id1 < req.To_user_id {
redis_key = strconv.Itoa(id1) + "_" + strconv.Itoa(req.To_user_id) + "_imKey"
} else {
redis_key = strconv.Itoa(req.To_user_id) + "_" + strconv.Itoa(id1) + "_imKey"
}
2024-06-28 18:10:22 +08:00
if worker.IsContainKey(redis_key) == true {
2024-06-27 10:53:47 +08:00
res := worker.GetRedis(redis_key)
var retrievedData map[string]interface{}
err2 := json.Unmarshal([]byte(res), &retrievedData)
if err2 != nil {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
return
}
if int(retrievedData["from_user_id"].(float64)) == id1 {
if int(retrievedData["is_read"].(float64)) == 1 {
worker.DelRedis(redis_key)
}
2024-06-27 10:53:47 +08:00
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"})
return
} else if int(retrievedData["to_user_id"].(float64)) == id1 {
2024-06-27 10:53:47 +08:00
retrievedData["is_read"] = 1
str, _ := json.Marshal(retrievedData)
res3 := worker.SetRedisWithExpire(redis_key, string(str), time.Second*300)
res2 := worker.SetRedisWithExpire(redis_key+"_connection", retrievedData["im_session"].(string), time.Second*300)
if res2 == false || res3 == false {
2024-06-27 10:53:47 +08:00
c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"})
return
} else {
c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"})
return
2024-06-27 10:53:47 +08:00
}
}
//生成随机字符串
imkey, err2 := generateRandomHexString(16)
imSession, err3 := generateRandomHexString(8)
if err2 != nil || err3 != nil {
c.JSON(http.StatusOK, gin.H{"error": "generate key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
data := make(map[string]interface{})
data["im_key"] = imkey
data["from_user_id"] = id1
data["to_user_id"] = req.To_user_id
data["im_session"] = imSession
data["is_read"] = 0
str, _ := json.Marshal(data)
//将key存入redis
res := worker.SetRedisWithExpire(redis_key, string(str), time.Second*300)
2024-06-27 10:53:47 +08:00
if res == false {
c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"})
return
} else {
c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"})
}
}
// 接收及发送消息
func SRMessage(c *gin.Context) {
id, _ := c.Get("id")
id1 := int(id.(float64))
var redis_key string
to_user_id := c.Query("to_user_id")
to_user_id_num, _ := strconv.ParseInt(to_user_id, 10, 64)
if id1 < int(to_user_id_num) {
redis_key = strconv.Itoa(id1) + "_" + to_user_id + "_imKey"
} else {
redis_key = to_user_id + "_" + strconv.Itoa(id1) + "_imKey"
}
2024-06-28 18:10:22 +08:00
if worker.IsContainKey(redis_key+"_connection") == false {
2024-06-27 10:53:47 +08:00
c.JSON(http.StatusOK, gin.H{"code": proto.OperationFailed, "message": "failed"})
return
}
// 升级HTTP连接为WebSocket连接
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
clients[ws] = true
2024-06-27 10:53:47 +08:00
if err != nil {
2024-06-28 18:10:22 +08:00
// log.Println(err)
fmt.Println(err)
2024-06-27 10:53:47 +08:00
return
}
defer ws.Close()
res := worker.GetRedis(redis_key + "_connection")
if res == "" {
return
}
// 接收客户端消息并发送到指定用户
go func(ws *websocket.Conn, session string, to_id string) {
for {
_, message, err_r := ws.ReadMessage()
if err_r != nil {
// 当连接关闭时,退出循环
clientsMux.Lock()
clients[ws] = false
clientsMux.Unlock()
//设置ws关闭状态信息
2024-06-27 10:53:47 +08:00
break
}
// 将接收到的消息解析为JSON
var msg proto.Message
if err2 := json.Unmarshal(message, &msg); err2 != nil {
log.Println("unmarshal:", err2)
continue
}
if msg.Type == "msg" {
// 将消息发送到指定用户
worker.PushRedisListWithExpire(session+"_"+to_user_id, msg.Msg, time.Hour*1) //将消息存入redis
2024-06-27 10:53:47 +08:00
}
}
}(ws, res, to_user_id)
// 从Redis中读取消息并发送到客户端
for {
if v := clients[ws]; v == true {
res2 := worker.PopRedisListLeft(res + "_" + strconv.Itoa(id1))
2024-07-03 17:37:22 +08:00
var res3 []byte
var msg proto.Message
2024-06-27 10:53:47 +08:00
if res2 != "" {
//若有消息则发送消息
2024-06-27 10:53:47 +08:00
msg.Type = "msg"
msg.Msg = res2
msg.From_user_id = id1
msg.Session = res
2024-07-03 17:37:22 +08:00
res3, _ = json.Marshal(msg)
} else {
//若无消息则发送心跳包
msg.Type = "check"
msg.Msg = "check"
msg.From_user_id = -1
msg.Session = res
res3, _ = json.Marshal(msg)
}
//判断对方是否在线,若不在线则发送离线消息,否则正常发送消息
2024-07-16 16:17:21 +08:00
//if worker.IsContainKey(to_user_id+"_status") == true {
// if worker.GetRedis(to_user_id+"_status") == "0" {
// msg.Type = "offline"
// msg.Msg = "offline"
// msg.From_user_id = -1
// msg.Session = res
// res3, _ = json.Marshal(msg)
// }
//}
2024-07-03 17:37:22 +08:00
err2 := ws.WriteMessage(websocket.TextMessage, res3)
if err2 != nil {
worker.SetRedisWithExpire("user_"+id.(string)+"_status", "0", time.Second*120) //设置用户在线状态,1为在线,0为离线,5秒后过期
2024-07-03 17:37:22 +08:00
clientsMux.Lock()
delete(clients, ws)
clientsMux.Unlock()
break
} else {
worker.SetRedisWithExpire("user_"+id.(string)+"_status", "1", time.Second*5) //设置用户在线状态,1为在线,0为离线,5秒后过期
2024-06-27 10:53:47 +08:00
}
2024-07-03 17:37:22 +08:00
time.Sleep(time.Second * 1) // 每1秒查询一次
} else {
clientsMux.Lock()
delete(clients, ws)
clientsMux.Unlock()
break
2024-06-27 10:53:47 +08:00
}
}
2024-06-27 10:53:47 +08:00
}