|
|
|
|
@ -233,49 +233,66 @@ func GetRealTimeImage(c *gin.Context) {
|
|
|
|
|
func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) {
|
|
|
|
|
ctx := context.Background()
|
|
|
|
|
pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs")
|
|
|
|
|
//生成唯一连接uuid
|
|
|
|
|
// 生成唯一连接 uuid
|
|
|
|
|
con_id := uuid.New().String()
|
|
|
|
|
online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids"
|
|
|
|
|
//加入设备在线连接集合
|
|
|
|
|
worker.SetRedisSetAdd(online_conn_key, con_id)
|
|
|
|
|
// 加入设备在线连接集合
|
|
|
|
|
worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5)
|
|
|
|
|
defer pubsub.Close()
|
|
|
|
|
defer ws.Close()
|
|
|
|
|
ch := pubsub.Channel()
|
|
|
|
|
var check_cnt int
|
|
|
|
|
for msg := range ch {
|
|
|
|
|
var res3 []byte
|
|
|
|
|
var msgObj proto.Message
|
|
|
|
|
if msg.Payload != "" {
|
|
|
|
|
msgObj.Type = "img"
|
|
|
|
|
msgObj.Msg = msg.Payload
|
|
|
|
|
msgObj.From_user_id = -1
|
|
|
|
|
res3, _ = json.Marshal(msgObj)
|
|
|
|
|
} else {
|
|
|
|
|
if check_cnt < 5 {
|
|
|
|
|
check_cnt++
|
|
|
|
|
time.Sleep(time.Millisecond * 200)
|
|
|
|
|
continue
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case msg, ok := <-ch:
|
|
|
|
|
if !ok {
|
|
|
|
|
// 通道关闭或没有消息,每秒尝试检测连接
|
|
|
|
|
for {
|
|
|
|
|
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
|
|
|
|
|
if err != nil {
|
|
|
|
|
clientsMux.Lock()
|
|
|
|
|
clients[ws] = false
|
|
|
|
|
clientsMux.Unlock()
|
|
|
|
|
// 查看是否还有其他连接,没有则设置 is_play 为 0
|
|
|
|
|
if worker.IsContainKey(online_conn_key) == false {
|
|
|
|
|
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5)
|
|
|
|
|
fmt.Println("device_id:", device_id, " has set is_play to 0")
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
var res3 []byte
|
|
|
|
|
var msgObj proto.Message
|
|
|
|
|
if msg.Payload != "" {
|
|
|
|
|
msgObj.Type = "img"
|
|
|
|
|
msgObj.Msg = msg.Payload
|
|
|
|
|
msgObj.From_user_id = -1
|
|
|
|
|
res3, _ = json.Marshal(msgObj)
|
|
|
|
|
} else {
|
|
|
|
|
if check_cnt < 5 {
|
|
|
|
|
check_cnt++
|
|
|
|
|
time.Sleep(time.Millisecond * 200)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
check_cnt = 0
|
|
|
|
|
msgObj.Type = "check"
|
|
|
|
|
msgObj.Msg = "check"
|
|
|
|
|
msgObj.From_user_id = -1
|
|
|
|
|
res3, _ = json.Marshal(msgObj)
|
|
|
|
|
}
|
|
|
|
|
// fmt.Println("send message to client length:", len(res3))
|
|
|
|
|
err2 := ws.WriteMessage(websocket.TextMessage, res3)
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
clientsMux.Lock()
|
|
|
|
|
clients[ws] = false
|
|
|
|
|
clientsMux.Unlock()
|
|
|
|
|
fmt.Println("send message to client err:", err2)
|
|
|
|
|
worker.SetRedisSetRemove(online_conn_key, con_id)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
check_cnt = 0
|
|
|
|
|
msgObj.Type = "check"
|
|
|
|
|
msgObj.Msg = "check"
|
|
|
|
|
msgObj.From_user_id = -1
|
|
|
|
|
res3, _ = json.Marshal(msgObj)
|
|
|
|
|
}
|
|
|
|
|
//fmt.Println("send message to client length:", len(res3))
|
|
|
|
|
err2 := ws.WriteMessage(websocket.TextMessage, res3)
|
|
|
|
|
if err2 != nil {
|
|
|
|
|
clientsMux.Lock()
|
|
|
|
|
clients[ws] = false
|
|
|
|
|
clientsMux.Unlock()
|
|
|
|
|
fmt.Println("send message to client err:", err2)
|
|
|
|
|
worker.SetRedisSetRemove(online_conn_key, con_id)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//查看是否还有其他连接,没有则设置is_play为0
|
|
|
|
|
if worker.IsContainKey(online_conn_key) == false {
|
|
|
|
|
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5)
|
|
|
|
|
fmt.Println("device_id:", device_id, " has set is_play to 0")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|