package handler import ( "crypto/rand" "encoding/hex" "encoding/json" "fmt" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "log" "net/http" "strconv" "sync" "time" "videoplayer/dao" "videoplayer/proto" "videoplayer/service" "videoplayer/worker" ) type SMessage struct { To_user_id int `json:"to_user_id" form:"to_user_id"` Type int `json:"type" form:"type"` GroupID int `json:"group_id" form:"group_id"` Msg string `json:"msg" form:"msg"` } type Message struct { ID int `json:"id" form:"id" ` To_user_id int `json:"to_user_id" form:"to_user_id" ` From_user_id int `json:"from_user_id" form:"from_user_id" ` GroupID int `json:"group_id" form:"group_id"` Index int `json:"index" form:"index" ` Type int `json:"type" form:"type" ` } type CGroup struct { Group_name string `json:"group_name" form:"group_name"` Group_info string `json:"group_info" form:"group_info"` Group_type string `json:"group_type" form:"group_type"` Group_icon string `json:"group_icon" form:"group_icon"` } var ( upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { // 允许所有来源的连接 return true }, } ) // 创建一个用于存储WebSocket连接的map和互斥锁 var ( clients = make(map[*websocket.Conn]bool) clientsMux sync.Mutex ) func SetUpIMGroup(router *gin.Engine) { imGroup := router.Group("/im") imGroup.POST("/get_imKey", GetImKey) imGroup.GET("/ws", SRMessage) imGroup.POST("/send_message", SendMessage) imGroup.POST("/get_message", GetMessage) //接受邀请,确认好友关系 imGroup.POST("/accept_invite", AcceptInvite) //拒绝邀请 imGroup.POST("/reject_invite", RejectInvite) imGroup.POST("/create_group", CreateGroup) imGroup.POST("/get_group", GetGroups) imGroup.POST("/get_group_req_user", GetFriendRequest) imGroup.GET("/sse_msg", ServerSendMsg) imGroup.GET("/ws_v2", ServerSendMsgV2) imGroup.POST("/get_friend_list", GetFriendList) //获取好友列表,包括群聊 //获取好友请求 imGroup.POST("/get_friend_request", GetFriendRequest) imGroup.POST("/del_friend_or_group", DelFriendOrGroup) imGroup.POST("/get_group_users_info", GetGroupUsersInfo) } func generateRandomHexString(length int) (string, error) { bytes := make([]byte, length/2) // 16字节的字符串需要32个十六进制字符,即16个字节 if _, err := rand.Read(bytes); err != nil { return "", err } return hex.EncodeToString(bytes), nil } func GetGroupUsersInfo(c *gin.Context) { var req Message id, _ := c.Get("id") user_id := int(id.(float64)) if err := c.ShouldBind(&req); err == nil { if req.GroupID == 0 { c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"}) return } group := dao.FindGroupByID(req.GroupID) if len(group) == 0 { c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"}) return } if group[0].AuthID != user_id { c.JSON(http.StatusOK, gin.H{"error": "no permission", "code": proto.ParameterError, "message": "不是群主"}) return } data := dao.FindGroupUsersInfo(req.GroupID) c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"}) } else { c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"}) } } func GetGroups(c *gin.Context) { id, _ := c.Get("id") user_id := int(id.(float64)) data := service.GetGroups(user_id) c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"}) } func DelFriendOrGroup(c *gin.Context) { var req Message user_id, _ := c.Get("id") cid := int(user_id.(float64)) if err := c.ShouldBind(&req); err == nil { if req.Type == 1 { if req.To_user_id == 0 { c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"}) return } err2 := service.DelFriendService(req.To_user_id, cid) if err2 == nil { c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "message": "success"}) } else { c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"}) } } else if req.Type == 2 { if req.GroupID == 0 { c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"}) return } //退出群聊 err2 := service.QuitGroupService(cid, req.GroupID) if err2 == nil { c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "message": "success"}) } else { c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"}) } } else if req.Type == 3 { //群主解散群 if req.GroupID == 0 { c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"}) return } err2 := service.DelGroupService(cid, req.GroupID) if err2 == nil { c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "message": "success"}) } else { c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"}) } } else if req.Type == 4 { //群管理员删除群成员 if req.GroupID == 0 { c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed,group_id is null"}) return } //获取群 group := dao.FindGroupByID(req.GroupID) if len(group) == 0 { c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed group not found"}) return } //判断是否是群主 if group[0].AuthID != cid { c.JSON(http.StatusOK, gin.H{"error": "no permission", "code": proto.ParameterError, "message": "不是群主"}) return } err2 := service.QuitGroupService(req.To_user_id, req.GroupID) if err2 == nil { c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "message": "success"}) } else { c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"}) } } else { c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"}) } } else { c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"}) } } func GetFriendList(c *gin.Context) { id, _ := c.Get("id") user_id := int(id.(float64)) data := service.GetFriendList(user_id) c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"}) } func GetMessage(c *gin.Context) { var req Message user_id, _ := c.Get("id") id := int(user_id.(float64)) //解析参数 if err := c.ShouldBind(&req); err == nil { if req.Type == 2 { msgs, err2 := dao.GetMsgGroupByIndex(req.GroupID, req.Index) if err2 == nil { c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": msgs, "message": "success"}) } else { c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"}) } } else { msgs, err2 := service.GetMsgUserByIndexService(req.From_user_id, id, req.Index, req.Type, req.From_user_id, req.GroupID) if err2 == nil { c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": msgs, "message": "success"}) } else { c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"}) } } } else { c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"}) } } func GetFriendRequest(c *gin.Context) { var req Message id, _ := c.Get("id") user_id := int(id.(float64)) if err := c.ShouldBind(&req); err == nil { if req.Type == 1 { data := service.GetGroupRequestUsers(user_id) c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"}) } else { data := service.GetFriendRequest(user_id) c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"}) } } else { c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"}) } } func CreateGroup(c *gin.Context) { var req CGroup id, _ := c.Get("id") user_id := int(id.(float64)) if err := c.ShouldBind(&req); err == nil { err2, id := service.CreateGroup(req.Group_name, req.Group_info, req.Group_type, req.Group_icon, user_id) if err2 == nil { c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": id, "message": "success"}) } else { c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"}) } } else { c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"}) } } func AcceptInvite(c *gin.Context) { var req Message user_id, _ := c.Get("id") cid := int(user_id.(float64)) if err := c.ShouldBind(&req); err == nil { err2 := service.AddFriendService(req.ID, cid, req.To_user_id) if err2 == nil { c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "message": "success"}) } else { c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"}) } } else { c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"}) } } func RejectInvite(c *gin.Context) { var req Message user_id, _ := c.Get("id") cid := int(user_id.(float64)) if err := c.ShouldBind(&req); err == nil { err2 := service.RejectFriendService(req.ID, cid, req.To_user_id) if err2 == nil { c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "message": "success"}) } else { c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"}) } } else { c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"}) } } func SendMessage(c *gin.Context) { var req SMessage user_id, _ := c.Get("id") id := int(user_id.(float64)) if err := c.ShouldBind(&req); err == nil { var err2 error var mid uint err2, mid = service.CreateGeneralMessageService(id, req.To_user_id, req.Type, req.GroupID, req.Msg) if err2 == nil { c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "message": "success", "data": mid}) } else { c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.MsgSendFailed, "message": "failed"}) } } else { c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"}) } } func GetImKey(c *gin.Context) { id, _ := c.Get("id") var req proto.ImKeyReq if err := c.ShouldBind(&req); err == nil { id1 := int(id.(float64)) var redis_key string if id1 < req.To_user_id { redis_key = strconv.Itoa(id1) + "_" + strconv.Itoa(req.To_user_id) + "_imKey" } else { redis_key = strconv.Itoa(req.To_user_id) + "_" + strconv.Itoa(id1) + "_imKey" } if worker.IsContainKey(redis_key) == true { res := worker.GetRedis(redis_key) var retrievedData map[string]interface{} err2 := json.Unmarshal([]byte(res), &retrievedData) if err2 != nil { c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"}) return } if int(retrievedData["from_user_id"].(float64)) == id1 { if int(retrievedData["is_read"].(float64)) == 1 { worker.DelRedis(redis_key) } c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"}) return } else if int(retrievedData["to_user_id"].(float64)) == id1 { retrievedData["is_read"] = 1 str, _ := json.Marshal(retrievedData) res3 := worker.SetRedisWithExpire(redis_key, string(str), time.Second*300) res2 := worker.SetRedisWithExpire(redis_key+"_connection", retrievedData["im_session"].(string), time.Second*300) if res2 == false || res3 == false { c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"}) return } c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"}) return } else { c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"}) return } } //生成随机字符串 imkey, err2 := generateRandomHexString(16) imSession, err3 := generateRandomHexString(8) if err2 != nil || err3 != nil { c.JSON(http.StatusOK, gin.H{"error": "generate key failed", "code": proto.OperationFailed, "message": "failed"}) return } data := make(map[string]interface{}) data["im_key"] = imkey data["from_user_id"] = id1 data["to_user_id"] = req.To_user_id data["im_session"] = imSession data["is_read"] = 0 str, _ := json.Marshal(data) //将key存入redis res := worker.SetRedisWithExpire(redis_key, string(str), time.Second*300) if res == false { c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"}) return } c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"}) return } else { c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"}) } } // 接收及发送消息 func SRMessage(c *gin.Context) { id, _ := c.Get("id") id1 := int(id.(float64)) var redis_key string to_user_id := c.Query("to_user_id") to_user_id_num, _ := strconv.ParseInt(to_user_id, 10, 64) if id1 < int(to_user_id_num) { redis_key = strconv.Itoa(id1) + "_" + to_user_id + "_imKey" } else { redis_key = to_user_id + "_" + strconv.Itoa(id1) + "_imKey" } if worker.IsContainKey(redis_key+"_connection") == false { c.JSON(http.StatusOK, gin.H{"code": proto.OperationFailed, "message": "failed"}) return } // 升级HTTP连接为WebSocket连接 ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) clients[ws] = true if err != nil { // log.Println(err) fmt.Println(err) return } defer ws.Close() res := worker.GetRedis(redis_key + "_connection") worker.SetRedisWithExpire("user_"+strconv.Itoa(id1)+"_status", "1", time.Second*5) if res == "" { return } // 接收客户端消息并发送到指定用户 go func(ws *websocket.Conn, session string, to_id string) { for { _, message, err_r := ws.ReadMessage() if err_r != nil { // 当连接关闭时,退出循环 clientsMux.Lock() clients[ws] = false clientsMux.Unlock() //设置ws关闭状态信息 break } // 将接收到的消息解析为JSON var msg proto.Message if err2 := json.Unmarshal(message, &msg); err2 != nil { log.Println("unmarshal:", err2) continue } if msg.Type == "msg" { // 将消息发送到指定用户 worker.PushRedisListWithExpire(session+"_"+to_user_id, msg.Msg, time.Hour*1) //将消息存入redis } } }(ws, res, to_user_id) var status_cnt int // 从Redis中读取消息并发送到客户端 for { if v := clients[ws]; v == true { res2 := worker.PopRedisListLeft(res + "_" + strconv.Itoa(id1)) var res3 []byte var msg proto.Message if res2 != "" { //若有消息则发送消息 msg.Type = "msg" msg.Msg = res2 msg.From_user_id = id1 msg.Session = res res3, _ = json.Marshal(msg) } else { //若无消息则发送心跳包 msg.Type = "check" msg.Msg = "check" msg.From_user_id = -1 msg.Session = res res3, _ = json.Marshal(msg) } //判断对方是否在线,若不在线则发送离线消息,否则正常发送消息 if worker.IsContainKey("user_"+to_user_id+"_status") == true { if worker.GetRedis("user_"+to_user_id+"_status") == "0" { msg.Type = "offline" msg.Msg = "offline" msg.From_user_id = -1 msg.Session = res res3, _ = json.Marshal(msg) } } else { if status_cnt > 5 { //对方不在线 msg.Type = "offline" msg.Msg = "offline" msg.From_user_id = -1 msg.Session = res res3, _ = json.Marshal(msg) } else { status_cnt++ } } err2 := ws.WriteMessage(websocket.TextMessage, res3) if err2 != nil { worker.SetRedisWithExpire("user_"+strconv.Itoa(id1)+"_status", "0", time.Second*120) //设置用户在线状态,1为在线,0为离线,5秒后过期 } else { worker.SetRedisWithExpire("user_"+strconv.Itoa(id1)+"_status", "1", time.Second*5) //设置用户在线状态,1为在线,0为离线,5秒后过期 } time.Sleep(time.Second * 1) // 每1秒查询一次 } else { clientsMux.Lock() delete(clients, ws) clientsMux.Unlock() break } } } func ServerSendMsgV2(c *gin.Context) { //wss id, _ := c.Get("id") user_id := int(id.(float64)) // 升级HTTP连接为WebSocket连接 ws, err1 := upgrader.Upgrade(c.Writer, c.Request, nil) clients[ws] = true if err1 != nil { // log.Println(err) fmt.Println(err1) return } defer ws.Close() //设置用户在线状态 worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60) worker.SetRedisBitmap("im2_online_users", int64(user_id), 1) //发送消息 key := "user_" + strconv.Itoa(user_id) + "_msg_ids" for { msg_id := worker.PopRedisListLeft(key) if msg_id != "" { msg_id_num, _ := strconv.ParseInt(msg_id, 10, 64) msgs := dao.FindMessageByID2(uint(msg_id_num)) if len(msgs) > 0 { msg := msgs[0] //发送消息 msg_str, _ := json.Marshal(msg) var msg_ proto.Message msg_.Type = "msg" msg_.Msg = string(msg_str) msg_str2, _ := json.Marshal(msg_) err2 := ws.WriteMessage(websocket.TextMessage, msg_str2) if err2 != nil { worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600) clientsMux.Lock() delete(clients, ws) clientsMux.Unlock() break } } } else { var msg proto.Message msg.Type = "check" msg.Msg = "check" msg.From_user_id = -1 //发送心跳包 res3, _ := json.Marshal(msg) err2 := ws.WriteMessage(websocket.TextMessage, res3) if err2 != nil { worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600) worker.SetRedisBitmap("im2_online_users", int64(user_id), 0) clientsMux.Lock() delete(clients, ws) clientsMux.Unlock() break } } worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60) time.Sleep(time.Second * 1) } } func ServerSendMsg(c *gin.Context) { //sse c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Connection", "keep-alive") c.Writer.Header().Set("Access-Control-Allow-Origin", "*") id, _ := c.Get("id") user_id := int(id.(float64)) //设置用户在线状态 worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60) //发送消息 key := "user_" + strconv.Itoa(user_id) + "_msg_ids" for { msg_id := worker.PopRedisListLeft(key) if msg_id != "" { msg_id_num, _ := strconv.ParseInt(msg_id, 10, 64) msgs := dao.FindMessageByID(uint(msg_id_num)) if len(msgs) > 0 { msg := msgs[0] //发送消息 msg_str, _ := json.Marshal(msg) _, err := c.Writer.Write([]byte("data: " + string(msg_str) + "\n\n")) if err != nil { worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600) break } } } else { var msg proto.Message msg.Type = "check" msg.Msg = "check" msg.From_user_id = -1 //发送心跳包 res3, _ := json.Marshal(msg) _, err := c.Writer.Write([]byte("data: " + string(res3) + "\n\n")) if err != nil { worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600) break } } time.Sleep(time.Second * 1) } }