From a10d4add523893adb9d5d1bb6c09e6d54bbfb927 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Thu, 26 Dec 2024 15:44:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E6=97=B6=E8=A7=86=E9=A2=91=E8=BF=94?= =?UTF-8?q?=E5=9B=9E=E7=89=88=E6=9C=AC2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/device.go | 64 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/handler/device.go b/handler/device.go index 253df8e..06dccb6 100644 --- a/handler/device.go +++ b/handler/device.go @@ -229,7 +229,69 @@ func GetRealTimeImage(c *gin.Context) { } worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5) fmt.Println("device_id:", device_id_int, " has set is_play to 1") - go subscribeAndHandleMessages(ws, device_id_int) + go subscribeAndHandleMessagesV2(ws, device_id_int) +} + +func subscribeAndHandleMessagesV2(ws *websocket.Conn, device_id int) { + ctx := context.Background() + pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs") + // 生成唯一连接 uuid + con_id := uuid.New().String() + online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids" + // 加入设备在线连接集合 + worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5) + defer pubsub.Close() + defer ws.Close() + ch := pubsub.Channel() + var ticker *time.Ticker + for { + select { + case msg, _ := <-ch: + frame := "" + if msg.Payload != "" { + frame = msg.Payload + } + //将base64图片数据解析为图片 + buf, err := base64.StdEncoding.DecodeString(frame) + if err != nil { + fmt.Println("base64 decode error:", err) + continue + } + + err2 := ws.WriteMessage(websocket.BinaryMessage, buf) + if err2 != nil { + clientsMux.Lock() + clients[ws] = false + clientsMux.Unlock() + fmt.Println("send message to client err:", err2) + worker.SetRedisSetRemove(online_conn_key, con_id) + goto end + } + default: + if ticker == nil { + ticker = time.NewTicker(time.Second) + } + select { + case <-ticker.C: + err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)) + if err != nil { + fmt.Println("Connection check failed:", err) + worker.SetRedisSetRemove(online_conn_key, con_id) + goto end + } + default: + continue + } + } + } + +end: + // 查看是否还有其他连接,没有则设置 is_play 为 0 + if worker.IsContainKey(online_conn_key) == false { + worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) + fmt.Println("device_id:", device_id, " has set is_play to 0") + } + } func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) {