Compare commits

..

9 Commits

Author SHA1 Message Date
junleea 11008cac8b git修复im传输问题,添加im过期时间 2024-06-29 13:59:47 +08:00
junleea c2b3658f72 修复Im消息发送问题 2024-06-28 18:10:22 +08:00
junleea 372b3127b5 修复查询用户错误 2024-06-28 10:38:55 +08:00
junleea e624700924 添加用户搜索功能,根据id搜索,根据关键字搜索 2024-06-28 10:10:23 +08:00
junleea db90205818 修复im发送无法获取session 2024-06-27 17:00:29 +08:00
junleea f3314d2d1d 修复获取key不交替进行,与imKey未及时删除 2024-06-27 16:50:41 +08:00
junleea f760b7a73f 修复im消息获取key,删除redis queue错误信息输出 2024-06-27 16:38:49 +08:00
junleea 3d1b22487c 修复ws消息发送,添加redis queue方式 2024-06-27 14:54:24 +08:00
junleea 0edfbd22ee 添加im消息部分 2024-06-27 10:53:47 +08:00
10 changed files with 313 additions and 9 deletions

View File

@ -3,6 +3,7 @@ package dao
import ( import (
"fmt" "fmt"
"gorm.io/gorm" "gorm.io/gorm"
"videoplayer/proto"
) )
type User struct { type User struct {
@ -27,10 +28,10 @@ func DeleteUserByID(id int) int {
return id return id
} }
func FindUserByID(id int) User { func FindUserByID(id int) []proto.User {
var user User var users []proto.User
DB.Debug().First(&user, id) DB.Debug().Where("id = ?", id).First(&users)
return user return users
} }
func FindUserByName(name string) User { func FindUserByName(name string) User {
@ -40,6 +41,13 @@ func FindUserByName(name string) User {
return user return user
} }
// 根据name模糊查询邮箱也是,不查询密码
func FindUserByNameLike(name string) []proto.User {
var users []proto.User
DB.Debug().Where("name LIKE ? OR email LIKE ?", "%"+name+"%", "%"+name+"%").Find(&users)
return users
}
func FindUserByEmail(email string) User { func FindUserByEmail(email string) User {
var user User var user User
DB.Debug().Where("email = ?", email).First(&user) DB.Debug().Where("email = ?", email).First(&user)

1
go.mod
View File

@ -7,6 +7,7 @@ require (
github.com/go-redis/redis/v8 v8.11.5 github.com/go-redis/redis/v8 v8.11.5
github.com/golang-jwt/jwt v3.2.2+incompatible github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
gorm.io/driver/mysql v1.5.6 gorm.io/driver/mysql v1.5.6
gorm.io/gorm v1.25.7 gorm.io/gorm v1.25.7
) )

2
go.sum
View File

@ -42,6 +42,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=

201
handler/im.go Normal file
View File

@ -0,0 +1,201 @@
package handler
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"log"
"net/http"
"strconv"
"sync"
"time"
"videoplayer/proto"
"videoplayer/worker"
)
var (
upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// 允许所有来源的连接
return true
},
}
)
// 创建一个用于存储WebSocket连接的map和互斥锁
var (
clients = make(map[*websocket.Conn]bool)
clientsMux sync.Mutex
)
func SetUpIMGroup(router *gin.Engine) {
imGroup := router.Group("/im")
imGroup.POST("/get_imKey", GetImKey)
imGroup.GET("/ws", SRMessage)
}
func generateRandomHexString(length int) (string, error) {
bytes := make([]byte, length/2) // 16字节的字符串需要32个十六进制字符即16个字节
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return hex.EncodeToString(bytes), nil
}
func GetImKey(c *gin.Context) {
id, _ := c.Get("id")
var req proto.ImKeyReq
if err := c.ShouldBind(&req); err == nil {
id1 := int(id.(float64))
var redis_key string
if id1 < req.To_user_id {
redis_key = strconv.Itoa(id1) + "_" + strconv.Itoa(req.To_user_id) + "_imKey"
} else {
redis_key = strconv.Itoa(req.To_user_id) + "_" + strconv.Itoa(id1) + "_imKey"
}
if worker.IsContainKey(redis_key) == true {
res := worker.GetRedis(redis_key)
var retrievedData map[string]interface{}
err2 := json.Unmarshal([]byte(res), &retrievedData)
if err2 != nil {
c.JSON(http.StatusOK, gin.H{"error": err2.Error(), "code": proto.OperationFailed, "message": "failed"})
return
}
if int(retrievedData["from_user_id"].(float64)) == id1 {
if int(retrievedData["is_read"].(float64)) == 1 {
worker.DelRedis(redis_key)
}
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"})
return
} else if int(retrievedData["to_user_id"].(float64)) == id1 {
retrievedData["is_read"] = 1
str, _ := json.Marshal(retrievedData)
res3 := worker.SetRedisWithExpire(redis_key, string(str), time.Second*300)
res2 := worker.SetRedisWithExpire(redis_key+"_connection", retrievedData["im_session"].(string), time.Second*300)
if res2 == false || res3 == false {
c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": retrievedData, "message": "success"})
return
} else {
c.JSON(http.StatusOK, gin.H{"error": "parameter error", "code": proto.ParameterError, "message": "failed"})
return
}
}
//生成随机字符串
imkey, err2 := generateRandomHexString(16)
imSession, err3 := generateRandomHexString(8)
if err2 != nil || err3 != nil {
c.JSON(http.StatusOK, gin.H{"error": "generate key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
data := make(map[string]interface{})
data["im_key"] = imkey
data["from_user_id"] = id1
data["to_user_id"] = req.To_user_id
data["im_session"] = imSession
data["is_read"] = 0
str, _ := json.Marshal(data)
//将key存入redis
res := worker.SetRedisWithExpire(redis_key, string(str), time.Second*300)
if res == false {
c.JSON(http.StatusOK, gin.H{"error": "set key failed", "code": proto.OperationFailed, "message": "failed"})
return
}
c.JSON(http.StatusOK, gin.H{"code": proto.SuccessCode, "data": data, "message": "success"})
return
} else {
c.JSON(http.StatusOK, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "failed"})
}
}
// 接收及发送消息
func SRMessage(c *gin.Context) {
id, _ := c.Get("id")
id1 := int(id.(float64))
var redis_key string
to_user_id := c.Query("to_user_id")
to_user_id_num, _ := strconv.ParseInt(to_user_id, 10, 64)
if id1 < int(to_user_id_num) {
redis_key = strconv.Itoa(id1) + "_" + to_user_id + "_imKey"
} else {
redis_key = to_user_id + "_" + strconv.Itoa(id1) + "_imKey"
}
if worker.IsContainKey(redis_key+"_connection") == false {
c.JSON(http.StatusOK, gin.H{"code": proto.OperationFailed, "message": "failed"})
return
}
// 升级HTTP连接为WebSocket连接
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
clients[ws] = true
if err != nil {
// log.Println(err)
fmt.Println(err)
return
}
defer ws.Close()
res := worker.GetRedis(redis_key + "_connection")
if res == "" {
return
}
// 接收客户端消息并发送到指定用户
go func(ws *websocket.Conn, session string, to_id string) {
for {
_, message, err_r := ws.ReadMessage()
if err_r != nil {
// 当连接关闭时,退出循环
clientsMux.Lock()
clients[ws] = false
clientsMux.Unlock()
//设置ws关闭状态信息
break
}
// 将接收到的消息解析为JSON
var msg proto.Message
if err2 := json.Unmarshal(message, &msg); err2 != nil {
log.Println("unmarshal:", err2)
continue
}
if msg.Type == "msg" {
// 将消息发送到指定用户
worker.PushRedisListWithExpire(session+"_"+to_user_id, msg.Msg, time.Hour*1) //将消息存入redis
}
}
}(ws, res, to_user_id)
// 从Redis中读取消息并发送到客户端
for {
if v := clients[ws]; v == true {
res2 := worker.PopRedisListLeft(res + "_" + strconv.Itoa(id1))
if res2 != "" {
var msg proto.Message
msg.Type = "msg"
msg.Msg = res2
msg.From_user_id = id1
msg.Session = res
res3, _ := json.Marshal(msg)
err2 := ws.WriteMessage(websocket.TextMessage, res3)
if err2 != nil {
break
}
}
time.Sleep(time.Second * 1) // 每100毫秒查询一次
} else {
clientsMux.Lock()
delete(clients, ws)
clientsMux.Unlock()
break
}
}
}

View File

@ -24,6 +24,7 @@ func SetUpUserGroup(router *gin.Engine) {
userGroup.POST("/gqr", GetQRStatus) userGroup.POST("/gqr", GetQRStatus)
userGroup.POST("/sqr", SetQRStatus) userGroup.POST("/sqr", SetQRStatus)
userGroup.POST("/confirm", ConfirmQRLogin) userGroup.POST("/confirm", ConfirmQRLogin)
userGroup.POST("/search", SearchHandler)
} }
type RLReq struct { type RLReq struct {
@ -40,6 +41,11 @@ type QRReq struct {
IP string `json:"ip" form:"ip"` IP string `json:"ip" form:"ip"`
} }
type SearchReq struct {
Keyword string `json:"keyword" form:"keyword"`
ID int `json:"id" form:"id"`
}
func GetScanUUID(c *gin.Context) { func GetScanUUID(c *gin.Context) {
var ReqData QRReq var ReqData QRReq
if err := c.ShouldBind(&ReqData); err != nil { if err := c.ShouldBind(&ReqData); err != nil {
@ -173,6 +179,25 @@ func GetQRStatus(c *gin.Context) {
} }
} }
func SearchHandler(c *gin.Context) {
var req_data SearchReq
if err := c.ShouldBind(&req_data); err == nil {
if req_data.ID != -1 {
user := service.GetUserByID(req_data.ID)
c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success", "data": user})
return
} else if req_data.Keyword != "" {
users := service.GetUserByNameLike(req_data.Keyword)
c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success", "data": users})
return
} else {
c.JSON(200, gin.H{"code": proto.ParameterError, "message": "error", "data": "无ID 与 关键字"})
}
} else {
c.JSON(200, gin.H{"error": err.Error(), "code": proto.ParameterError, "message": "error"})
}
}
func loginHandler(c *gin.Context) { func loginHandler(c *gin.Context) {
var req_data RLReq var req_data RLReq
tokenString := "" tokenString := ""

View File

@ -20,9 +20,10 @@ func main() {
worker.InitRedis() worker.InitRedis()
r.Use(handler.CrosHandler()) r.Use(handler.CrosHandler())
r.Use(JWTAuthMiddleware()) // 使用 JWT 认证中间件 r.Use(JWTAuthMiddleware()) // 使用 JWT 认证中间件
handler.SetUpVideoGroup(r) handler.SetUpVideoGroup(r) // Video
handler.SetUpUserGroup(r) handler.SetUpUserGroup(r) // User
handler.SetUpDeviceGroup(r) handler.SetUpDeviceGroup(r) // Device
handler.SetUpIMGroup(r) // IM
r.Run(":8083") // listen and serve on 0.0.0.0:8082 r.Run(":8083") // listen and serve on 0.0.0.0:8082
defer dao.Close() defer dao.Close()
defer worker.CloseRedis() defer worker.CloseRedis()

View File

@ -1,5 +1,7 @@
package proto package proto
import "gorm.io/gorm"
const ( const (
MYSQL_USER = "video_t2" MYSQL_USER = "video_t2"
MYSQL_DB = "video_t2" MYSQL_DB = "video_t2"
@ -14,3 +16,11 @@ const (
TOKEN_SECRET = "mfjurnc_32ndj9dfhj" TOKEN_SECRET = "mfjurnc_32ndj9dfhj"
) )
type User struct {
gorm.Model
Name string `gorm:"column:name"`
Age int `gorm:"column:age"`
Email string `gorm:"column:email"`
Gender string `gorm:"column:gender"`
}

14
proto/im.go Normal file
View File

@ -0,0 +1,14 @@
package proto
type ImKeyReq struct {
To_user_id int `json:"to_user_id" form:"to_user_id" binding:"required"`
}
// 定义WebSocket消息格式
type Message struct {
Type string `json:"type"`
Msg string `json:"data"`
To_user_id int `json:"to_user_id"`
From_user_id int `json:"from_user_id"`
Session string `json:"session"`
}

View File

@ -3,6 +3,7 @@ package service
import ( import (
"regexp" "regexp"
"videoplayer/dao" "videoplayer/dao"
"videoplayer/proto"
) )
func CreateUser(name string, password, email string) uint { func CreateUser(name string, password, email string) uint {
@ -32,3 +33,11 @@ func ContainsUser(name, email string) bool {
} }
return false return false
} }
func GetUserByID(id int) []proto.User {
return dao.FindUserByID(id)
}
func GetUserByNameLike(name string) []proto.User {
return dao.FindUserByNameLike(name)
}

View File

@ -166,6 +166,24 @@ func PopRedisList(key string) string {
return val return val
} }
// pop redis list from left,as queue
func PopRedisListLeft(key string) string {
ctx := context.Background()
val, err := redisClient.LPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误
if err != nil {
return ""
}
return val
}
func DelRedis(key string) {
ctx := context.Background()
err := redisClient.Del(ctx, key).Err()
if err != nil {
fmt.Println("Error deleting key: %v", err)
}
}
// push redis list from right // push redis list from right
func PushRedisList(key string, value string) bool { func PushRedisList(key string, value string) bool {
ctx := context.Background() ctx := context.Background()
@ -177,6 +195,21 @@ func PushRedisList(key string, value string) bool {
return true return true
} }
func PushRedisListWithExpire(key string, value string, expire time.Duration) bool {
ctx := context.Background()
err := redisClient.RPush(ctx, key, value).Err()
if err != nil {
fmt.Println("Error setting key: %v", err)
return false
}
err = redisClient.Expire(ctx, key, expire).Err()
if err != nil {
fmt.Println("Error setting key: %v", err)
return false
}
return true
}
// delete redis key // delete redis key
func delRedis(key string) { func delRedis(key string) {
ctx := context.Background() ctx := context.Background()