实时视频返回版本2
This commit is contained in:
parent
09f1a3e38d
commit
a10d4add52
|
|
@ -229,7 +229,69 @@ func GetRealTimeImage(c *gin.Context) {
|
||||||
}
|
}
|
||||||
worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5)
|
worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5)
|
||||||
fmt.Println("device_id:", device_id_int, " has set is_play to 1")
|
fmt.Println("device_id:", device_id_int, " has set is_play to 1")
|
||||||
go subscribeAndHandleMessages(ws, device_id_int)
|
go subscribeAndHandleMessagesV2(ws, device_id_int)
|
||||||
|
}
|
||||||
|
|
||||||
|
func subscribeAndHandleMessagesV2(ws *websocket.Conn, device_id int) {
|
||||||
|
ctx := context.Background()
|
||||||
|
pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs")
|
||||||
|
// 生成唯一连接 uuid
|
||||||
|
con_id := uuid.New().String()
|
||||||
|
online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids"
|
||||||
|
// 加入设备在线连接集合
|
||||||
|
worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5)
|
||||||
|
defer pubsub.Close()
|
||||||
|
defer ws.Close()
|
||||||
|
ch := pubsub.Channel()
|
||||||
|
var ticker *time.Ticker
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msg, _ := <-ch:
|
||||||
|
frame := ""
|
||||||
|
if msg.Payload != "" {
|
||||||
|
frame = msg.Payload
|
||||||
|
}
|
||||||
|
//将base64图片数据解析为图片
|
||||||
|
buf, err := base64.StdEncoding.DecodeString(frame)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("base64 decode error:", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err2 := ws.WriteMessage(websocket.BinaryMessage, buf)
|
||||||
|
if err2 != nil {
|
||||||
|
clientsMux.Lock()
|
||||||
|
clients[ws] = false
|
||||||
|
clientsMux.Unlock()
|
||||||
|
fmt.Println("send message to client err:", err2)
|
||||||
|
worker.SetRedisSetRemove(online_conn_key, con_id)
|
||||||
|
goto end
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
if ticker == nil {
|
||||||
|
ticker = time.NewTicker(time.Second)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("Connection check failed:", err)
|
||||||
|
worker.SetRedisSetRemove(online_conn_key, con_id)
|
||||||
|
goto end
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
end:
|
||||||
|
// 查看是否还有其他连接,没有则设置 is_play 为 0
|
||||||
|
if worker.IsContainKey(online_conn_key) == false {
|
||||||
|
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5)
|
||||||
|
fmt.Println("device_id:", device_id, " has set is_play to 0")
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) {
|
func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue