package handler import ( "VideoStream/proto" "VideoStream/service" "VideoStream/worker" "fmt" "github.com/gin-gonic/gin" "github.com/google/uuid" "github.com/gorilla/websocket" "gocv.io/x/gocv" "io" "log" "net/http" "strconv" "time" ) var ( upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { // 允许所有来源的连接 return true }, } ) func SetUpToolGroup(router *gin.Engine) { toolGroup := router.Group("/tool") toolGroup.GET("/video_stream", GetVideoStream) toolGroup.GET("/video_real_time", GetRealTimeImage) } // 跨域访问:cross origin resource share func CorsHandler() gin.HandlerFunc { return func(context *gin.Context) { //method := context.Request.Method context.Writer.Header().Set("Access-Control-Allow-Origin", "*") context.Header("Access-Control-Allow-Origin", "*") // 设置允许访问所有域 context.Header("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE,UPDATE") context.Header("Access-Control-Allow-Headers", "Authorization, Content-Length, X-CSRF-Token, Token,session,X_Requested_With,Accept, Origin, Host, Connection, Accept-Encoding, Accept-Language,DNT, X-CustomHeader, Keep-Alive, User-Agent, X-Requested-With, If-Modified-Since, Cache-Control, Content-Type, Pragma,token,openid,opentoken") context.Header("Access-Control-Expose-Headers", "Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers,Cache-Control,Content-Language,Content-Type,Expires,Last-Modified,Pragma,FooBar") context.Header("Access-Control-Max-Age", "172800") context.Header("Access-Control-Allow-Credentials", "false") context.Set("content-type", "application/json") //设置返回格式是json // if method == "OPTIONS" { // context.JSON(http.StatusOK, gin.H{ // "code":1, // "message":"error", // "data":"request error", // }) // } //处理请求 context.Next() } } type videoStreamReq struct { ID int `json:"id" form:"id"` Key string `json:"key" form:"key"` } func GetVideoStream(c *gin.Context) { id, _ := c.Get("id") id1 := id.(int) deviceIDSTR := c.Query("id") deviceID, err := strconv.Atoi(deviceIDSTR) if err != nil { c.JSON(400, gin.H{"error": "device_id error"}) return } key := c.Query("key") //校验权限 device := service.GetDevice(deviceID, id1) if device.ID == 0 { c.JSON(400, gin.H{"error": "device not exist"}) return } rKey := worker.GetRedis("video_stream_get_stream_key") if rKey == "" { rKey = "123456" } //查看id是否存在 index := -1 for _, device_ := range proto.Config.DeviceInfo { if device_.ID == deviceID { index = deviceID break } } if index == -1 { c.JSON(400, gin.H{"error": "id config not exist"}) return } //查看key是否正确 if key != rKey { c.JSON(400, gin.H{"error": "key error"}) return } //查看设备是否在获取 isGetting := worker.GetRedis(fmt.Sprintf("device_%d_is_getting", deviceID)) if isGetting != "true" { c.JSON(400, gin.H{"error": "device is not getting or not exist"}) log.Printf("stream device_id:%d is not getting or not exist", deviceID) return } //设备流 c.Stream(func(w io.Writer) bool { var count int frame := gocv.NewMat() defer func() { //关闭帧 err4 := frame.Close() if err4 != nil { log.Printf("device:%d frame close err:%v", deviceID, err4) } }() errCount := 0 for { if errCount > 10 { log.Printf("stream device:%d errCount > 10", deviceID) return false } cnt := service.GetDeviceCurrentFrameV2(&frame, deviceID) if cnt == count || cnt == -1 { time.Sleep(50 * time.Millisecond) log.Printf("stream device:%d ,cnt =%d,count=%d,errCount=%d", deviceID, cnt, count, errCount) errCount++ continue } if frame.Empty() { log.Printf("stream device:%d frame is empty", deviceID) time.Sleep(100 * time.Millisecond) errCount++ continue } //gocv.Matrix转为jpeg img, err2 := gocv.IMEncode(".jpg", frame) if err2 != nil { log.Printf("stream img encode err:%v", err2) return false } frame_ := img.GetBytes() _, err = w.Write([]byte("--frame\r\nContent-Type: image/jpeg\r\n\r\n")) if err != nil { fmt.Printf("写入头部信息错误: %v\n", err) return false } _, err = w.Write(frame_) if err != nil { fmt.Printf("写入帧数据错误: %v\n", err) return false } _, err = w.Write([]byte("\r\n")) if err != nil { fmt.Printf("写入帧结束标记错误: %v\n", err) return false } time.Sleep(50 * time.Millisecond) // 控制帧率,模拟每秒约20帧,可按实际调整 } }) } // 发送实时视频流 func GetRealTimeImage(c *gin.Context) { id, _ := c.Get("id") id1 := id.(int) deviceId := c.Query("device_id") deviceIdInt, _ := strconv.Atoi(deviceId) device := service.GetDevice(deviceIdInt, id1) if device.ID == 0 { c.JSON(http.StatusOK, gin.H{"code": 4, "message": "device not found"}) return } //查看设备是否在获取 isGetting := worker.GetRedis(fmt.Sprintf("device_%d_is_getting", device.ID)) if isGetting != "true" { c.JSON(http.StatusOK, gin.H{"code": 4, "message": "device is not getting or not exist"}) log.Printf("device_id:%d is not getting or not exist", deviceIdInt) return } ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { log.Printf("connect wss err:%v", err) return } worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5) isGetting = worker.GetRedis(fmt.Sprintf("device_%d_is_getting", device.ID)) if isGetting != "true" { c.JSON(http.StatusOK, gin.H{"code": 5, "message": "device is not getting or not exist"}) log.Printf("device_id:%d is not getting or not exist", deviceIdInt) return } subscribeAndHandleMessagesV3(ws, deviceIdInt) } func subscribeAndHandleMessagesV3(ws *websocket.Conn, device_id int) { // 生成唯一连接 uuid con_id := uuid.New().String() online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids" // 加入设备在线连接集合 worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5) //图片计数器 count := 0 //定时器,发送计数器 t_count := 0 t := 0 //计算帧率 for { //从service获取当前帧 img, c := service.GetDeviceCurrentFrameV3(device_id) if c != count { if c == -1 { log.Printf("device:%d get frame err!", device_id) worker.SetRedisSetRemove(online_conn_key, con_id) break } //将img转[]byte if img.Empty() { log.Printf("device:%d img is empty! count = %d \n", device_id, c) } else { //gocv.Matrix转为jpeg buf, err := gocv.IMEncode(".jpg", img) if err != nil { log.Printf("img encode err:%v", err) worker.SetRedisSetRemove(online_conn_key, con_id) break } buf1 := buf.GetBytes() err2 := ws.WriteMessage(websocket.BinaryMessage, buf1) if err2 != nil { log.Printf("send message to client err:%v", err2) worker.SetRedisSetRemove(online_conn_key, con_id) break } c = count err5 := img.Close() if err5 != nil { log.Printf("img close err:%v", err) } t++ if t%50 == 0 { log.Printf("device:%d send frame count:%d,img is closed", device_id, c) } } } else { //每秒发送一次心跳检测 if t_count%10 == 0 { err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)) if err != nil { log.Printf("Connection check failed:%v", err) worker.SetRedisSetRemove(online_conn_key, con_id) break } else { log.Printf("Connection check success") } } } time.Sleep(100 * time.Millisecond) t_count++ } // 查看是否还有其他连接,没有则设置 is_play 为 0 if worker.IsContainKey(online_conn_key) == false { worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "1", time.Minute*5) log.Printf("device_id: %d has set is_play to 0", device_id) } }