diff --git a/handler/device.go b/handler/device.go index f5f4ee1..705c5d4 100644 --- a/handler/device.go +++ b/handler/device.go @@ -1,6 +1,7 @@ package handler import ( + "context" "encoding/json" "fmt" "github.com/gin-gonic/gin" @@ -211,65 +212,57 @@ func GetRealTimeImage(c *gin.Context) { id, _ := c.Get("id") id1 := int(id.(float64)) device_id := c.Query("device_id") - //字符串转int device_id_int, _ := strconv.Atoi(device_id) device := service.GetDevice(device_id_int, id1) if device.ID == 0 { c.JSON(http.StatusOK, gin.H{"code": proto.DataNotFound, "message": "device not found"}) return } - //建立连接 - // 升级HTTP连接为WebSocket连接 ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) clients[ws] = true if err != nil { - // log.Println(err) fmt.Println(err) return } defer ws.Close() - worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5) //设置播放状态 - // 接收客户端消息并发送到指定用户 + worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5) + go subscribeAndHandleMessages(ws, device_id_int) +} - go func(ws *websocket.Conn, device_id int) { - - }(ws, device_id_int) +func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { + ctx := context.Background() + pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_channel") + defer pubsub.Close() + ch := pubsub.Channel() var check_cnt int - - for { - if v := clients[ws]; v == true { - res2 := worker.PopRedisListLeft(device_id + "_frames") - var res3 []byte - var msg proto.Message - if res2 != "" { - //若有消息则发送消息 - msg.Type = "img" - msg.Msg = res2 - msg.From_user_id = id1 - res3, _ = json.Marshal(msg) - } else { - //若无消息则发送心跳包 - if check_cnt < 5 { - check_cnt++ - time.Sleep(time.Millisecond * 200) //设置延时200ms - continue - } - check_cnt = 0 - msg.Type = "check" - msg.Msg = "check" - msg.From_user_id = -1 - res3, _ = json.Marshal(msg) + for msg := range ch { + var res3 []byte + var msgObj proto.Message + if msg.Payload != "" { + msgObj.Type = "img" + msgObj.Msg = msg.Payload + msgObj.From_user_id = -1 + res3, _ = json.Marshal(msgObj) + } else { + if check_cnt < 5 { + check_cnt++ + time.Sleep(time.Millisecond * 200) + continue } - err2 := ws.WriteMessage(websocket.TextMessage, res3) - if err2 != nil { - clientsMux.Lock() - clients[ws] = false - clientsMux.Unlock() - //设置ws关闭状态信息 - worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "0", time.Minute*5) //设置播放状态 - break - } - time.Sleep(time.Millisecond * 200) //设置延时200ms + check_cnt = 0 + msgObj.Type = "check" + msgObj.Msg = "check" + msgObj.From_user_id = -1 + res3, _ = json.Marshal(msgObj) } + err2 := ws.WriteMessage(websocket.TextMessage, res3) + if err2 != nil { + clientsMux.Lock() + clients[ws] = false + clientsMux.Unlock() + worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) + break + } + time.Sleep(time.Millisecond * 200) } } diff --git a/worker/redis.go b/worker/redis.go index a05fc21..b89116d 100644 --- a/worker/redis.go +++ b/worker/redis.go @@ -12,18 +12,18 @@ import ( "github.com/go-redis/redis/v8" ) -var redisClient *redis.Client // Redis 客户端, 用于连接 Redis 服务器 +var RedisClient *redis.Client // Redis 客户端, 用于连接 Redis 服务器 func InitRedis() { ctx := context.Background() // 连接redis - redisClient = redis.NewClient(&redis.Options{ + RedisClient = redis.NewClient(&redis.Options{ Addr: proto.REDIS_ADDR, // Redis 服务器地址 Password: proto.REDIS_PASSWORD, // 如果 Redis 设置了密码 DB: proto.REIDS_DB, // 使用的数据库编号 }) // 验证 Redis 客户端是否可以正常工作 - _, err := redisClient.Ping(ctx).Result() + _, err := RedisClient.Ping(ctx).Result() if err != nil { fmt.Println("Error connecting to Redis: %v", err) } @@ -31,14 +31,14 @@ func InitRedis() { func CloseRedis() { // 关闭 Redis 客户端 - if err := redisClient.Close(); err != nil { + if err := RedisClient.Close(); err != nil { fmt.Println("Error closing Redis client: %v", err) } } func IsContainKey(key string) bool { ctx := context.Background() - val, err := redisClient.Exists(ctx, key).Result() // 检查键是否存在, 如果存在则返回 1, 否则返回 0 + val, err := RedisClient.Exists(ctx, key).Result() // 检查键是否存在, 如果存在则返回 1, 否则返回 0 if err != nil { fmt.Println("Error getting key: %v", err) return false @@ -53,7 +53,7 @@ func IsContainKey(key string) bool { func SetRedis(key string, value string) bool { ctx := context.Background() // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 - err := redisClient.Set(ctx, key, value, time.Minute*30).Err() + err := RedisClient.Set(ctx, key, value, time.Minute*30).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -73,12 +73,12 @@ func SetHashWithTime(key string, id int, name, email string, duration time.Durat } // 设置哈希表的字段值, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 - err := redisClient.HSet(ctx, key, fields).Err() + err := RedisClient.HSet(ctx, key, fields).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false } - err = redisClient.Expire(ctx, key, time.Hour*10).Err() + err = RedisClient.Expire(ctx, key, time.Hour*10).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -89,12 +89,12 @@ func SetHashWithTime(key string, id int, name, email string, duration time.Durat // 设置redis hash,设置过期时间 func SetHash(key string, data map[string]interface{}) bool { ctx := context.Background() - err := redisClient.HSet(ctx, key, data).Err() + err := RedisClient.HSet(ctx, key, data).Err() if err != nil { fmt.Println("%v :Error setting hash: %v", key, err) return false } - err = redisClient.Expire(ctx, key, time.Minute*30).Err() + err = RedisClient.Expire(ctx, key, time.Minute*30).Err() if err != nil { fmt.Println("%v :Error setting expire: %v", key, err) return false @@ -104,7 +104,7 @@ func SetHash(key string, data map[string]interface{}) bool { func SetHashWithField(key string, field string, value string) bool { ctx := context.Background() - err := redisClient.HSet(ctx, key, field, value).Err() + err := RedisClient.HSet(ctx, key, field, value).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -114,7 +114,7 @@ func SetHashWithField(key string, field string, value string) bool { func GetHash(key string, field string) string { ctx := context.Background() - val, err := redisClient.HGet(ctx, key, field).Result() + val, err := RedisClient.HGet(ctx, key, field).Result() if err != nil { fmt.Println("Error getting hash: %v", err) return "" @@ -124,7 +124,7 @@ func GetHash(key string, field string) string { func GetHashAll(key string) map[string]string { ctx := context.Background() - val, err := redisClient.HGetAll(ctx, key).Result() + val, err := RedisClient.HGetAll(ctx, key).Result() if err != nil { fmt.Println("Error getting hash: %v", err) return nil @@ -136,7 +136,7 @@ func GetHashAll(key string) map[string]string { func SetRedisWithExpire(key string, value string, expire time.Duration) bool { // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 ctx := context.Background() // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 - err := redisClient.Set(ctx, key, value, expire).Err() + err := RedisClient.Set(ctx, key, value, expire).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -147,7 +147,7 @@ func SetRedisWithExpire(key string, value string, expire time.Duration) bool { / // 获取redis func GetRedis(key string) string { ctx := context.Background() - val, err := redisClient.Get(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 + val, err := RedisClient.Get(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 if err != nil { fmt.Println(key, " Error getting key: %v", err) return "" @@ -158,7 +158,7 @@ func GetRedis(key string) string { // pop redis list from right,as stack func PopRedisList(key string) string { ctx := context.Background() - val, err := redisClient.RPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 + val, err := RedisClient.RPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 if err != nil { fmt.Println(key, " Error reading from Redis: %v", err) return "" @@ -169,7 +169,7 @@ func PopRedisList(key string) string { // pop redis list from left,as queue func PopRedisListLeft(key string) string { ctx := context.Background() - val, err := redisClient.LPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 + val, err := RedisClient.LPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 if err != nil { return "" } @@ -178,7 +178,7 @@ func PopRedisListLeft(key string) string { func DelRedis(key string) { ctx := context.Background() - err := redisClient.Del(ctx, key).Err() + err := RedisClient.Del(ctx, key).Err() if err != nil { fmt.Println("Error deleting key: %v", err) } @@ -187,7 +187,7 @@ func DelRedis(key string) { // push redis list from right func PushRedisList(key string, value string) bool { ctx := context.Background() - err := redisClient.RPush(ctx, key, value).Err() + err := RedisClient.RPush(ctx, key, value).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -197,12 +197,12 @@ func PushRedisList(key string, value string) bool { func PushRedisListWithExpire(key string, value string, expire time.Duration) bool { ctx := context.Background() - err := redisClient.RPush(ctx, key, value).Err() + err := RedisClient.RPush(ctx, key, value).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false } - err = redisClient.Expire(ctx, key, expire).Err() + err = RedisClient.Expire(ctx, key, expire).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -213,7 +213,7 @@ func PushRedisListWithExpire(key string, value string, expire time.Duration) boo // delete redis key func delRedis(key string) { ctx := context.Background() - err := redisClient.Del(ctx, key).Err() + err := RedisClient.Del(ctx, key).Err() if err != nil { fmt.Println("Error setting key: %v", err) } @@ -239,7 +239,7 @@ func (u *RUser) toJSONString() string { // put hash to redis func hSetRedis(key string, field string, value string) { ctx := context.Background() - err := redisClient.HSet(ctx, key, field, value).Err() + err := RedisClient.HSet(ctx, key, field, value).Err() if err != nil { fmt.Println("Error setting key: %v", err) } @@ -248,7 +248,7 @@ func hSetRedis(key string, field string, value string) { // get hash from redis func hGetRedis(key string, field string) string { ctx := context.Background() - val, err := redisClient.HGet(ctx, key, field).Result() + val, err := RedisClient.HGet(ctx, key, field).Result() if err != nil { fmt.Println("Error getting key: %v", err) }