videoplayer/handler/im.go

613 lines
19 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package handler
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"log"
"net/http"
"strconv"
"sync"
"time"
"videoplayer/dao"
"videoplayer/proto"
"videoplayer/service"
"videoplayer/worker"
)
type SMessage struct {
To_user_id int `json:"to_user_id" form:"to_user_id"`
Type int `json:"type" form:"type"`
GroupID int `json:"group_id" form:"group_id"`
Msg string `json:"msg" form:"msg"`
}
type Message struct {
ID int `json:"id" form:"id" `
To_user_id int `json:"to_user_id" form:"to_user_id" `
From_user_id int `json:"from_user_id" form:"from_user_id" `
GroupID int `json:"group_id" form:"group_id"`
Index int `json:"index" form:"index" `
Type int `json:"type" form:"type" `
}
type CGroup struct {
Group_name string `json:"group_name" form:"group_name"`
Group_info string `json:"group_info" form:"group_info"`
Group_type string `json:"group_type" form:"group_type"`
Group_icon string `json:"group_icon" form:"group_icon"`
}
var (
upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// 允许所有来源的连接
return true
},
}
)
// 创建一个用于存储WebSocket连接的map和互斥锁
var (
clients = make(map[*websocket.Conn]bool)
clientsMux sync.Mutex
)
func SetUpIMGroup(router *gin.Engine) {
imGroup := router.Group("/im")
imGroup.POST("/get_imKey", GetImKey)
imGroup.GET("/ws", SRMessage)
imGroup.POST("/send_message", SendMessage)
imGroup.POST("/get_message", GetMessage)
//接受邀请,确认好友关系
imGroup.POST("/accept_invite", AcceptInvite)
//拒绝邀请
imGroup.POST("/reject_invite", RejectInvite)
imGroup.POST("/create_group", CreateGroup)
imGroup.POST("/get_group", GetGroups)
imGroup.POST("/get_group_req_user", GetFriendRequest)
imGroup.GET("/sse_msg", ServerSendMsg)
imGroup.GET("/ws_v2", ServerSendMsgV2)
imGroup.POST("/get_friend_list", GetFriendList) //获取好友列表,包括群聊
//获取好友请求
imGroup.POST("/get_friend_request", GetFriendRequest)
imGroup.POST("/del_friend_or_group", DelFriendOrGroup)
imGroup.POST("/get_group_users_info", GetGroupUsersInfo)
}
func generateRandomHexString(length int) (string, error) {
bytes := make([]byte, length/2) // 16字节的字符串需要32个十六进制字符即16个字节
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}
func GetGroupUsersInfo(c *gin.Context) {
var req Message
id, _ := c.Get("id")
user_id := int(id.(float64))
if err := c.ShouldBind(&req); err == nil {
if req.GroupID == 0 {
c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"})
return
}
group := dao.FindGroupByID(req.GroupID)
if len(group) == 0 {
c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"})
return
}
if group[0].AuthID != user_id {
c.JSON(http.StatusOK, gin.H{"error": "no permission", "code": proto.ParameterError, "message": "不是群主"})
return
}
data := dao.FindGroupUsersInfo(req.GroupID)
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"})
} else {
c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"})
}
}
func GetGroups(c *gin.Context) {
id, _ := c.Get("id")
user_id := int(id.(float64))
data := service.GetGroups(user_id)
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"})
}
func DelFriendOrGroup(c *gin.Context) {
var req Message
user_id, _ := c.Get("id")
cid := int(user_id.(float64))
if err := c.ShouldBind(&req); err == nil {
if req.Type == 1 {
if req.To_user_id == 0 {
c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"})
return
}
err2 := service.DelFriendService(req.To_user_id, cid)
if err2 == nil {
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "message": "success"})
} else {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
}
} else if req.Type == 2 {
if req.GroupID == 0 {
c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"})
return
}
//退出群聊
err2 := service.QuitGroupService(cid, req.GroupID)
if err2 == nil {
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "message": "success"})
} else {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
}
} else if req.Type == 3 {
//群主解散群
if req.GroupID == 0 {
c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"})
return
}
err2 := service.DelGroupService(cid, req.GroupID)
if err2 == nil {
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "message": "success"})
} else {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
}
} else {
c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"})
}
} else if req.Type == 4 {
//群管理员删除群成员
if req.GroupID == 0 {
c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"})
return
}
//获取群
group := dao.FindGroupByID(req.GroupID)
if len(group) == 0 {
c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"})
return
}
//判断是否是群主
if group[0].AuthID != cid {
c.JSON(http.StatusOK, gin.H{"error": "no permission", "code": proto.ParameterError, "message": "不是群主"})
return
}
err2 := service.QuitGroupService(req.To_user_id, req.GroupID)
if err2 == nil {
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "message": "success"})
} else {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
}
} else {
c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"})
}
}
func GetFriendList(c *gin.Context) {
id, _ := c.Get("id")
user_id := int(id.(float64))
data := service.GetFriendList(user_id)
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"})
}
func GetMessage(c *gin.Context) {
var req Message
user_id, _ := c.Get("id")
id := int(user_id.(float64))
//解析参数
if err := c.ShouldBind(&req); err == nil {
if req.Type == 2 {
msgs, err2 := dao.GetMsgGroupByIndex(req.GroupID, req.Index)
if err2 == nil {
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": msgs, "message": "success"})
} else {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
}
} else {
msgs, err2 := service.GetMsgUserByIndexService(req.From_user_id, id, req.Index, req.Type, req.From_user_id, req.GroupID)
if err2 == nil {
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": msgs, "message": "success"})
} else {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
}
}
} else {
c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"})
}
}
func GetFriendRequest(c *gin.Context) {
var req Message
id, _ := c.Get("id")
user_id := int(id.(float64))
if err := c.ShouldBind(&req); err == nil {
if req.Type == 1 {
data := service.GetGroupRequestUsers(user_id)
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"})
} else {
data := service.GetFriendRequest(user_id)
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"})
}
} else {
c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"})
}
}
func CreateGroup(c *gin.Context) {
var req CGroup
id, _ := c.Get("id")
user_id := int(id.(float64))
if err := c.ShouldBind(&req); err == nil {
err2, id := service.CreateGroup(req.Group_name, req.Group_info, req.Group_type, req.Group_icon, user_id)
if err2 == nil {
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": id, "message": "success"})
} else {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
}
} else {
c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"})
}
}
func AcceptInvite(c *gin.Context) {
var req Message
user_id, _ := c.Get("id")
cid := int(user_id.(float64))
if err := c.ShouldBind(&req); err == nil {
err2 := service.AddFriendService(req.ID, cid, req.To_user_id)
if err2 == nil {
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "message": "success"})
} else {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
}
} else {
c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"})
}
}
func RejectInvite(c *gin.Context) {
var req Message
user_id, _ := c.Get("id")
cid := int(user_id.(float64))
if err := c.ShouldBind(&req); err == nil {
err2 := service.RejectFriendService(req.ID, cid, req.To_user_id)
if err2 == nil {
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "message": "success"})
} else {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
}
} else {
c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"})
}
}
func SendMessage(c *gin.Context) {
var req SMessage
user_id, _ := c.Get("id")
id := int(user_id.(float64))
if err := c.ShouldBind(&req); err == nil {
var err2 error
var mid uint
err2, mid = service.CreateGeneralMessageService(id, req.To_user_id, req.Type, req.GroupID, req.Msg)
if err2 == nil {
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "message": "success", "data": mid})
} else {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.MsgSendFailed, "message": "failed"})
}
} else {
c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"})
}
}
func GetImKey(c *gin.Context) {
id, _ := c.Get("id")
var req proto.ImKeyReq
if err := c.ShouldBind(&req); err == nil {
id1 := int(id.(float64))
var redis_key string
if id1 < req.To_user_id {
redis_key = strconv.Itoa(id1) + "_" + strconv.Itoa(req.To_user_id) + "_imKey"
} else {
redis_key = strconv.Itoa(req.To_user_id) + "_" + strconv.Itoa(id1) + "_imKey"
}
if worker.IsContainKey(redis_key) == true {
res := worker.GetRedis(redis_key)
var retrievedData map[string]interface{}
err2 := json.Unmarshal([]byte(res), &retrievedData)
if err2 != nil {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
return
}
if int(retrievedData["from_user_id"].(float64)) == id1 {
if int(retrievedData["is_read"].(float64)) == 1 {
worker.DelRedis(redis_key)
}
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"})
return
} else if int(retrievedData["to_user_id"].(float64)) == id1 {
retrievedData["is_read"] = 1
str, _ := json.Marshal(retrievedData)
res3 := worker.SetRedisWithExpire(redis_key, string(str), time.Second*300)
res2 := worker.SetRedisWithExpire(redis_key+"_connection", retrievedData["im_session"].(string), time.Second*300)
if res2 == false || res3 == false {
c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"})
return
} else {
c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"})
return
}
}
//生成随机字符串
imkey, err2 := generateRandomHexString(16)
imSession, err3 := generateRandomHexString(8)
if err2 != nil || err3 != nil {
c.JSON(http.StatusOK, gin.H{"error": "generate key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
data := make(map[string]interface{})
data["im_key"] = imkey
data["from_user_id"] = id1
data["to_user_id"] = req.To_user_id
data["im_session"] = imSession
data["is_read"] = 0
str, _ := json.Marshal(data)
//将key存入redis
res := worker.SetRedisWithExpire(redis_key, string(str), time.Second*300)
if res == false {
c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"})
return
} else {
c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"})
}
}
// 接收及发送消息
func SRMessage(c *gin.Context) {
id, _ := c.Get("id")
id1 := int(id.(float64))
var redis_key string
to_user_id := c.Query("to_user_id")
to_user_id_num, _ := strconv.ParseInt(to_user_id, 10, 64)
if id1 < int(to_user_id_num) {
redis_key = strconv.Itoa(id1) + "_" + to_user_id + "_imKey"
} else {
redis_key = to_user_id + "_" + strconv.Itoa(id1) + "_imKey"
}
if worker.IsContainKey(redis_key+"_connection") == false {
c.JSON(http.StatusOK, gin.H{"code": proto.OperationFailed, "message": "failed"})
return
}
// 升级HTTP连接为WebSocket连接
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
clients[ws] = true
if err != nil {
// log.Println(err)
fmt.Println(err)
return
}
defer ws.Close()
res := worker.GetRedis(redis_key + "_connection")
worker.SetRedisWithExpire("user_"+strconv.Itoa(id1)+"_status", "1", time.Second*5)
if res == "" {
return
}
// 接收客户端消息并发送到指定用户
go func(ws *websocket.Conn, session string, to_id string) {
for {
_, message, err_r := ws.ReadMessage()
if err_r != nil {
// 当连接关闭时,退出循环
clientsMux.Lock()
clients[ws] = false
clientsMux.Unlock()
//设置ws关闭状态信息
break
}
// 将接收到的消息解析为JSON
var msg proto.Message
if err2 := json.Unmarshal(message, &msg); err2 != nil {
log.Println("unmarshal:", err2)
continue
}
if msg.Type == "msg" {
// 将消息发送到指定用户
worker.PushRedisListWithExpire(session+"_"+to_user_id, msg.Msg, time.Hour*1) //将消息存入redis
}
}
}(ws, res, to_user_id)
var status_cnt int
// 从Redis中读取消息并发送到客户端
for {
if v := clients[ws]; v == true {
res2 := worker.PopRedisListLeft(res + "_" + strconv.Itoa(id1))
var res3 []byte
var msg proto.Message
if res2 != "" {
//若有消息则发送消息
msg.Type = "msg"
msg.Msg = res2
msg.From_user_id = id1
msg.Session = res
res3, _ = json.Marshal(msg)
} else {
//若无消息则发送心跳包
msg.Type = "check"
msg.Msg = "check"
msg.From_user_id = -1
msg.Session = res
res3, _ = json.Marshal(msg)
}
//判断对方是否在线,若不在线则发送离线消息,否则正常发送消息
if worker.IsContainKey("user_"+to_user_id+"_status") == true {
if worker.GetRedis("user_"+to_user_id+"_status") == "0" {
msg.Type = "offline"
msg.Msg = "offline"
msg.From_user_id = -1
msg.Session = res
res3, _ = json.Marshal(msg)
}
} else {
if status_cnt > 5 {
//对方不在线
msg.Type = "offline"
msg.Msg = "offline"
msg.From_user_id = -1
msg.Session = res
res3, _ = json.Marshal(msg)
} else {
status_cnt++
}
}
err2 := ws.WriteMessage(websocket.TextMessage, res3)
if err2 != nil {
worker.SetRedisWithExpire("user_"+strconv.Itoa(id1)+"_status", "0", time.Second*120) //设置用户在线状态,1为在线,0为离线,5秒后过期
} else {
worker.SetRedisWithExpire("user_"+strconv.Itoa(id1)+"_status", "1", time.Second*5) //设置用户在线状态,1为在线,0为离线,5秒后过期
}
time.Sleep(time.Second * 1) // 每1秒查询一次
} else {
clientsMux.Lock()
delete(clients, ws)
clientsMux.Unlock()
break
}
}
}
func ServerSendMsgV2(c *gin.Context) {
//wss
id, _ := c.Get("id")
user_id := int(id.(float64))
// 升级HTTP连接为WebSocket连接
ws, err1 := upgrader.Upgrade(c.Writer, c.Request, nil)
clients[ws] = true
if err1 != nil {
// log.Println(err)
fmt.Println(err1)
return
}
defer ws.Close()
//设置用户在线状态
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60)
worker.SetRedisBitmap("im2_online_users", int64(user_id), 1)
//发送消息
key := "user_" + strconv.Itoa(user_id) + "_msg_ids"
for {
msg_id := worker.PopRedisListLeft(key)
if msg_id != "" {
msg_id_num, _ := strconv.ParseInt(msg_id, 10, 64)
msgs := dao.FindMessageByID2(uint(msg_id_num))
if len(msgs) > 0 {
msg := msgs[0]
//发送消息
msg_str, _ := json.Marshal(msg)
var msg_ proto.Message
msg_.Type = "msg"
msg_.Msg = string(msg_str)
msg_str2, _ := json.Marshal(msg_)
err2 := ws.WriteMessage(websocket.TextMessage, msg_str2)
if err2 != nil {
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600)
clientsMux.Lock()
delete(clients, ws)
clientsMux.Unlock()
break
}
}
} else {
var msg proto.Message
msg.Type = "check"
msg.Msg = "check"
msg.From_user_id = -1
//发送心跳包
res3, _ := json.Marshal(msg)
err2 := ws.WriteMessage(websocket.TextMessage, res3)
if err2 != nil {
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600)
worker.SetRedisBitmap("im2_online_users", int64(user_id), 0)
clientsMux.Lock()
delete(clients, ws)
clientsMux.Unlock()
break
}
}
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60)
time.Sleep(time.Second * 1)
}
}
func ServerSendMsg(c *gin.Context) {
//sse
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
id, _ := c.Get("id")
user_id := int(id.(float64))
//设置用户在线状态
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60)
//发送消息
key := "user_" + strconv.Itoa(user_id) + "_msg_ids"
for {
msg_id := worker.PopRedisListLeft(key)
if msg_id != "" {
msg_id_num, _ := strconv.ParseInt(msg_id, 10, 64)
msgs := dao.FindMessageByID(uint(msg_id_num))
if len(msgs) > 0 {
msg := msgs[0]
//发送消息
msg_str, _ := json.Marshal(msg)
_, err := c.Writer.Write([]byte("data: " + string(msg_str) + "\n\n"))
if err != nil {
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600)
break
}
}
} else {
var msg proto.Message
msg.Type = "check"
msg.Msg = "check"
msg.From_user_id = -1
//发送心跳包
res3, _ := json.Marshal(msg)
_, err := c.Writer.Write([]byte("data: " + string(res3) + "\n\n"))
if err != nil {
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600)
break
}
}
time.Sleep(time.Second * 1)
}
}