Compare commits

..

No commits in common. "73e3db98dbd39de3beca811adc55e66e04fc47b0" and "542935cb8fc42747060eea645fcd24b8d12a789d" have entirely different histories.

1 changed files with 20 additions and 25 deletions

View File

@ -242,10 +242,27 @@ func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) {
defer ws.Close() defer ws.Close()
ch := pubsub.Channel() ch := pubsub.Channel()
var check_cnt int var check_cnt int
var ticker *time.Ticker
for { for {
select { select {
case msg, _ := <-ch: case msg, ok := <-ch:
if !ok {
// 通道关闭或没有消息,每秒尝试检测连接
for {
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
if err != nil {
clientsMux.Lock()
clients[ws] = false
clientsMux.Unlock()
// 查看是否还有其他连接,没有则设置 is_play 为 0
if worker.IsContainKey(online_conn_key) == false {
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5)
fmt.Println("device_id:", device_id, " has set is_play to 0")
}
return
}
time.Sleep(time.Second)
}
}
var res3 []byte var res3 []byte
var msgObj proto.Message var msgObj proto.Message
if msg.Payload != "" { if msg.Payload != "" {
@ -273,31 +290,9 @@ func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) {
clientsMux.Unlock() clientsMux.Unlock()
fmt.Println("send message to client err:", err2) fmt.Println("send message to client err:", err2)
worker.SetRedisSetRemove(online_conn_key, con_id) worker.SetRedisSetRemove(online_conn_key, con_id)
goto end return
}
default:
if ticker == nil {
ticker = time.NewTicker(time.Second)
}
select {
case <-ticker.C:
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
if err != nil {
fmt.Println("Connection check failed:", err)
worker.SetRedisSetRemove(online_conn_key, con_id)
goto end
}
default:
continue
} }
} }
} }
end:
// 查看是否还有其他连接,没有则设置 is_play 为 0
if worker.IsContainKey(online_conn_key) == false {
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5)
fmt.Println("device_id:", device_id, " has set is_play to 0")
}
} }