diff --git a/handler/device.go b/handler/device.go index 3332c9d..a1b1d28 100644 --- a/handler/device.go +++ b/handler/device.go @@ -242,27 +242,10 @@ func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { defer ws.Close() ch := pubsub.Channel() var check_cnt int + var ticker *time.Ticker for { select { - case msg, ok := <-ch: - if !ok { - // 通道关闭或没有消息,每秒尝试检测连接 - for { - err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)) - if err != nil { - clientsMux.Lock() - clients[ws] = false - clientsMux.Unlock() - // 查看是否还有其他连接,没有则设置 is_play 为 0 - if worker.IsContainKey(online_conn_key) == false { - worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) - fmt.Println("device_id:", device_id, " has set is_play to 0") - } - return - } - time.Sleep(time.Second) - } - } + case msg, _ := <-ch: var res3 []byte var msgObj proto.Message if msg.Payload != "" { @@ -290,9 +273,31 @@ func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { clientsMux.Unlock() fmt.Println("send message to client err:", err2) worker.SetRedisSetRemove(online_conn_key, con_id) - return + goto end + } + default: + if ticker == nil { + ticker = time.NewTicker(time.Second) + } + select { + case <-ticker.C: + err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)) + if err != nil { + fmt.Println("Connection check failed:", err) + worker.SetRedisSetRemove(online_conn_key, con_id) + goto end + } + default: + continue } } } +end: + // 查看是否还有其他连接,没有则设置 is_play 为 0 + if worker.IsContainKey(online_conn_key) == false { + worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) + fmt.Println("device_id:", device_id, " has set is_play to 0") + } + }