diff --git a/handler/tool.go b/handler/tool.go index ea048a2..6c71e8c 100644 --- a/handler/tool.go +++ b/handler/tool.go @@ -185,6 +185,59 @@ func GetRealTimeImage(c *gin.Context) { subscribeAndHandleMessagesV3(ws, deviceIdInt) } +func subscribeAndHandleMessagesV5(ws *websocket.Conn, deviceId int) { + // 生成唯一连接 uuid + con_id := uuid.New().String() + online_conn_key := "device_" + strconv.Itoa(deviceId) + "_online_conn_ids" + // 加入设备在线连接集合 + worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5) + //图片计数器 + count := 0 + //定时器,发送计数器 + tCount := 0 + var buf []byte + //计算帧率 + for { + //将buf置空 + buf = nil + //从service获取当前帧 + c := service.GetDeviceCurrentFrameV5(deviceId, count, &buf) + if c != count { + count = c + err2 := ws.WriteMessage(websocket.BinaryMessage, buf) + if err2 != nil { + log.Printf("send message to client err:%v", err2) + worker.SetRedisSetRemove(online_conn_key, con_id) + break + } + c = count + } else { + //每秒发送一次心跳检测 + if tCount%10 == 0 { + err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)) + if err != nil { + log.Printf("Connection check failed:%v", err) + worker.SetRedisSetRemove(online_conn_key, con_id) + break + } else { + log.Printf("Connection check success") + } + } + } + time.Sleep(100 * time.Millisecond) + tCount++ + } + + // 查看是否还有其他连接,没有则设置 is_play 为 0 + if worker.IsContainKey(online_conn_key) == false { + worker.SetRedisWithExpire(strconv.Itoa(deviceId)+"_is_play", "1", time.Minute*5) + log.Printf("device_id: %d has set is_play to 0", deviceId) + } + + //垃圾回收 + runtime.GC() +} + func subscribeAndHandleMessagesV3(ws *websocket.Conn, deviceId int) { // 生成唯一连接 uuid con_id := uuid.New().String() diff --git a/service/tool.go b/service/tool.go index b45ee95..d7d433a 100644 --- a/service/tool.go +++ b/service/tool.go @@ -92,6 +92,63 @@ func SetDeviceCurrentFrameV2(frame *gocv.Mat, deviceId int) error { DeviceFrameCount[deviceId] = frameCount return nil } +func GetDeviceCurrentFrameV5(deviceId int, curCount int, buf *[]byte) (cnt int) { + defer func() { + if err := recover(); err != nil { + log.Printf("设备:%d 错误: %v\n", deviceId, err) + // 异常时释放 buf + buf = nil + } + }() + //获取读写锁 + mutex_, ok := DeviceRWMap.Load(deviceId) + if !ok { + log.Printf("DeviceRWMap 读写锁不存在,device_id: %d \n", deviceId) + return -1 + } + mutex, ok := mutex_.(*sync.RWMutex) + if !ok { + log.Printf("DeviceRWMap 存储的不是 *sync.RWMutex 类型,device_id: %d \n", deviceId) + return -1 + } + mutex.RLock() + defer mutex.RUnlock() + frameCount, ok := DeviceFrameCount[deviceId] + if !ok { + return -1 + } + if frameCount == curCount { + return frameCount + } + //获取当前帧 + var tempBuf []byte + switch deviceId { + case 1: + if Device1CurrentFrame.Empty() { + return -1 + } + tempBuf = Device1CurrentFrame.ToBytes() + case 50: + if Device50CurrentFrame.Empty() { + return -1 + } + tempBuf = Device50CurrentFrame.ToBytes() + case 73: + if Device73CurrentFrame.Empty() { + return -1 + } + tempBuf = Device73CurrentFrame.ToBytes() + default: + return -1 + } + // 避免直接返回大数组,可考虑复制必要的数据 + if len(tempBuf) > 0 { + copy(*buf, tempBuf) + // 释放临时变量 + tempBuf = nil + } + return frameCount +} func GetDeviceCurrentFrameV4(deviceId int, curCount int) (buf []byte, cnt int) { defer func() {