Compare commits

..

No commits in common. "542935cb8fc42747060eea645fcd24b8d12a789d" and "69f04c09a65e30f78f6d453df731241432a2f249" have entirely different histories.

2 changed files with 36 additions and 69 deletions

View File

@ -233,66 +233,49 @@ func GetRealTimeImage(c *gin.Context) {
func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) {
ctx := context.Background() ctx := context.Background()
pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs") pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs")
// 生成唯一连接 uuid //生成唯一连接uuid
con_id := uuid.New().String() con_id := uuid.New().String()
online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids" online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids"
// 加入设备在线连接集合 //加入设备在线连接集合
worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5) worker.SetRedisSetAdd(online_conn_key, con_id)
defer pubsub.Close() defer pubsub.Close()
defer ws.Close() defer ws.Close()
ch := pubsub.Channel() ch := pubsub.Channel()
var check_cnt int var check_cnt int
for { for msg := range ch {
select { var res3 []byte
case msg, ok := <-ch: var msgObj proto.Message
if !ok { if msg.Payload != "" {
// 通道关闭或没有消息,每秒尝试检测连接 msgObj.Type = "img"
for { msgObj.Msg = msg.Payload
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)) msgObj.From_user_id = -1
if err != nil { res3, _ = json.Marshal(msgObj)
clientsMux.Lock() } else {
clients[ws] = false if check_cnt < 5 {
clientsMux.Unlock() check_cnt++
// 查看是否还有其他连接,没有则设置 is_play 为 0 time.Sleep(time.Millisecond * 200)
if worker.IsContainKey(online_conn_key) == false { continue
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5)
fmt.Println("device_id:", device_id, " has set is_play to 0")
}
return
}
time.Sleep(time.Second)
}
}
var res3 []byte
var msgObj proto.Message
if msg.Payload != "" {
msgObj.Type = "img"
msgObj.Msg = msg.Payload
msgObj.From_user_id = -1
res3, _ = json.Marshal(msgObj)
} else {
if check_cnt < 5 {
check_cnt++
time.Sleep(time.Millisecond * 200)
continue
}
check_cnt = 0
msgObj.Type = "check"
msgObj.Msg = "check"
msgObj.From_user_id = -1
res3, _ = json.Marshal(msgObj)
}
// fmt.Println("send message to client length:", len(res3))
err2 := ws.WriteMessage(websocket.TextMessage, res3)
if err2 != nil {
clientsMux.Lock()
clients[ws] = false
clientsMux.Unlock()
fmt.Println("send message to client err:", err2)
worker.SetRedisSetRemove(online_conn_key, con_id)
return
} }
check_cnt = 0
msgObj.Type = "check"
msgObj.Msg = "check"
msgObj.From_user_id = -1
res3, _ = json.Marshal(msgObj)
}
//fmt.Println("send message to client length:", len(res3))
err2 := ws.WriteMessage(websocket.TextMessage, res3)
if err2 != nil {
clientsMux.Lock()
clients[ws] = false
clientsMux.Unlock()
fmt.Println("send message to client err:", err2)
worker.SetRedisSetRemove(online_conn_key, con_id)
break
} }
} }
//查看是否还有其他连接没有则设置is_play为0
if worker.IsContainKey(online_conn_key) == false {
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5)
fmt.Println("device_id:", device_id, " has set is_play to 0")
}
} }

View File

@ -292,22 +292,6 @@ func SetRedisSetAdd(key string, value string) bool {
return true return true
} }
// 设置set,添加元素
func SetRedisSetAddWithExpire(key string, value string, expire time.Duration) bool {
ctx := context.Background()
err := RedisClient.SAdd(ctx, key, value).Err()
if err != nil {
fmt.Println("Error setting key: %v", err)
return false
}
err = RedisClient.Expire(ctx, key, expire).Err()
if err != nil {
fmt.Println("Error setting key: %v", err)
return false
}
return true
}
// 设置set,删除元素 // 设置set,删除元素
func SetRedisSetRemove(key string, value string) bool { func SetRedisSetRemove(key string, value string) bool {
ctx := context.Background() ctx := context.Background()