diff --git a/handler/device.go b/handler/device.go index 565ca06..3332c9d 100644 --- a/handler/device.go +++ b/handler/device.go @@ -233,49 +233,66 @@ func GetRealTimeImage(c *gin.Context) { func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { ctx := context.Background() pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs") - //生成唯一连接uuid + // 生成唯一连接 uuid con_id := uuid.New().String() online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids" - //加入设备在线连接集合 - worker.SetRedisSetAdd(online_conn_key, con_id) + // 加入设备在线连接集合 + worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5) defer pubsub.Close() defer ws.Close() ch := pubsub.Channel() var check_cnt int - for msg := range ch { - var res3 []byte - var msgObj proto.Message - if msg.Payload != "" { - msgObj.Type = "img" - msgObj.Msg = msg.Payload - msgObj.From_user_id = -1 - res3, _ = json.Marshal(msgObj) - } else { - if check_cnt < 5 { - check_cnt++ - time.Sleep(time.Millisecond * 200) - continue + for { + select { + case msg, ok := <-ch: + if !ok { + // 通道关闭或没有消息,每秒尝试检测连接 + for { + err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)) + if err != nil { + clientsMux.Lock() + clients[ws] = false + clientsMux.Unlock() + // 查看是否还有其他连接,没有则设置 is_play 为 0 + if worker.IsContainKey(online_conn_key) == false { + worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) + fmt.Println("device_id:", device_id, " has set is_play to 0") + } + return + } + time.Sleep(time.Second) + } + } + var res3 []byte + var msgObj proto.Message + if msg.Payload != "" { + msgObj.Type = "img" + msgObj.Msg = msg.Payload + msgObj.From_user_id = -1 + res3, _ = json.Marshal(msgObj) + } else { + if check_cnt < 5 { + check_cnt++ + time.Sleep(time.Millisecond * 200) + continue + } + check_cnt = 0 + msgObj.Type = "check" + msgObj.Msg = "check" + msgObj.From_user_id = -1 + res3, _ = json.Marshal(msgObj) + } + // fmt.Println("send message to client length:", len(res3)) + err2 := ws.WriteMessage(websocket.TextMessage, res3) + if err2 != nil { + clientsMux.Lock() + clients[ws] = false + clientsMux.Unlock() + fmt.Println("send message to client err:", err2) + worker.SetRedisSetRemove(online_conn_key, con_id) + return } - check_cnt = 0 - msgObj.Type = "check" - msgObj.Msg = "check" - msgObj.From_user_id = -1 - res3, _ = json.Marshal(msgObj) - } - //fmt.Println("send message to client length:", len(res3)) - err2 := ws.WriteMessage(websocket.TextMessage, res3) - if err2 != nil { - clientsMux.Lock() - clients[ws] = false - clientsMux.Unlock() - fmt.Println("send message to client err:", err2) - worker.SetRedisSetRemove(online_conn_key, con_id) - break } } - //查看是否还有其他连接,没有则设置is_play为0 - if worker.IsContainKey(online_conn_key) == false { - worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) - fmt.Println("device_id:", device_id, " has set is_play to 0") - } + } diff --git a/worker/redis.go b/worker/redis.go index 9b032a7..a9957bb 100644 --- a/worker/redis.go +++ b/worker/redis.go @@ -292,6 +292,22 @@ func SetRedisSetAdd(key string, value string) bool { return true } +// 设置set,添加元素 +func SetRedisSetAddWithExpire(key string, value string, expire time.Duration) bool { + ctx := context.Background() + err := RedisClient.SAdd(ctx, key, value).Err() + if err != nil { + fmt.Println("Error setting key: %v", err) + return false + } + err = RedisClient.Expire(ctx, key, expire).Err() + if err != nil { + fmt.Println("Error setting key: %v", err) + return false + } + return true +} + // 设置set,删除元素 func SetRedisSetRemove(key string, value string) bool { ctx := context.Background()