diff --git a/handler/im.go b/handler/im.go index f4e2a94..ac1b10b 100644 --- a/handler/im.go +++ b/handler/im.go @@ -12,6 +12,7 @@ import ( "strconv" "sync" "time" + "videoplayer/dao" "videoplayer/proto" "videoplayer/service" "videoplayer/worker" @@ -64,6 +65,7 @@ func SetUpIMGroup(router *gin.Engine) { //接受邀请,确认好友关系 imGroup.POST("/accept_invite", AcceptInvite) imGroup.POST("/create_group", CreateGroup) + imGroup.GET("/sse_msg", ServerSendMsg) } func generateRandomHexString(length int) (string, error) { bytes := make([]byte, length/2) // 16字节的字符串需要32个十六进制字符,即16个字节 @@ -326,3 +328,50 @@ func SRMessage(c *gin.Context) { } } } + +func ServerSendMsg(c *gin.Context) { + //sse + c.Writer.Header().Set("Content-Type", "text/event-stream") + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + c.Writer.Header().Set("Access-Control-Allow-Origin", "*") + + id, _ := c.Get("id") + user_id := int(id.(float64)) + + //设置用户在线状态 + worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60) + //发送消息 + + key := "user_" + strconv.Itoa(user_id) + "_msg_ids" + + for { + msg_id := worker.PopRedisListLeft(key) + if msg_id != "" { + msg_id_num, _ := strconv.ParseInt(msg_id, 10, 64) + msgs := dao.FindMessageByID(uint(msg_id_num)) + if len(msgs) > 0 { + msg := msgs[0] + //发送消息 + _, err := c.Writer.Write([]byte("data: " + msg.Msg + "\n\n")) + if err != nil { + worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600) + break + } + } + } else { + var msg proto.Message + msg.Type = "check" + msg.Msg = "check" + msg.From_user_id = -1 + //发送心跳包 + res3, _ := json.Marshal(msg) + _, err := c.Writer.Write([]byte("data: " + string(res3) + "\n\n")) + if err != nil { + worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600) + break + } + } + time.Sleep(time.Second * 1) + } +} diff --git a/service/imService.go b/service/imService.go index 7c038d3..2387a19 100644 --- a/service/imService.go +++ b/service/imService.go @@ -2,7 +2,10 @@ package service import ( "errors" + "strconv" + "time" "videoplayer/dao" + "videoplayer/worker" ) func CreateGeneralMessageService(from_id, to_id, msg_type int, content string) (error, uint) { @@ -17,6 +20,11 @@ func CreateGeneralMessageService(from_id, to_id, msg_type int, content string) ( return errors.New("not a friend"), 0 } err, id = dao.CreateSimpleMessage(from_id, to_id, content) + res := worker.GetRedis("user_" + strconv.Itoa(to_id) + "_status_v2") + if res == "1" { + //在线,存入redis + worker.PushRedisListWithExpire("user_"+strconv.Itoa(to_id)+"_msg_ids", strconv.Itoa(int(id)), time.Second*300) + } case 2: err, id = dao.CreateGeneralMessage(from_id, to_id, msg_type, 0, 0, content) case 3: