VideoStream/handler/tool.go

228 lines
6.3 KiB
Go
Raw Normal View History

package handler
import (
"VideoStream/proto"
"VideoStream/service"
"VideoStream/worker"
"fmt"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"gocv.io/x/gocv"
"io"
"log"
"net/http"
"strconv"
"time"
)
var (
upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// 允许所有来源的连接
return true
},
}
)
func SetUpToolGroup(router *gin.Engine) {
toolGroup := router.Group("/tool")
toolGroup.GET("/video_stream", GetVideoStream)
toolGroup.GET("/video_real_time", GetRealTimeImage)
}
// 跨域访问cross origin resource share
func CorsHandler() gin.HandlerFunc {
return func(context *gin.Context) {
//method := context.Request.Method
context.Writer.Header().Set("Access-Control-Allow-Origin", "*")
context.Header("Access-Control-Allow-Origin", "*") // 设置允许访问所有域
context.Header("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE,UPDATE")
context.Header("Access-Control-Allow-Headers", "Authorization, Content-Length, X-CSRF-Token, Token,session,X_Requested_With,Accept, Origin, Host, Connection, Accept-Encoding, Accept-Language,DNT, X-CustomHeader, Keep-Alive, User-Agent, X-Requested-With, If-Modified-Since, Cache-Control, Content-Type, Pragma,token,openid,opentoken")
context.Header("Access-Control-Expose-Headers", "Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers,Cache-Control,Content-Language,Content-Type,Expires,Last-Modified,Pragma,FooBar")
context.Header("Access-Control-Max-Age", "172800")
context.Header("Access-Control-Allow-Credentials", "false")
context.Set("content-type", "application/json") //设置返回格式是json
// if method == "OPTIONS" {
// context.JSON(http.StatusOK, gin.H{
// "code":1,
// "message":"error",
// "data":"request error",
// })
// }
//处理请求
context.Next()
}
}
type videoStreamReq struct {
ID int `json:"id" form:"id"`
Key string `json:"key" form:"key"`
}
func GetVideoStream(c *gin.Context) {
var req videoStreamReq
2025-01-14 15:11:45 +08:00
id, _ := c.Get("id")
id1 := id.(int)
//将query参数绑定到结构体
if err := c.ShouldBind(&req); err != nil {
c.JSON(400, gin.H{"error": err.Error()})
return
} else {
//校验权限
device := service.GetDevice(req.ID, id1)
if device.ID == 0 {
c.JSON(400, gin.H{"error": "device not exist"})
return
}
//查看id是否存在
index := -1
for _, device := range proto.Config.DeviceInfo {
if device.ID == req.ID {
index = req.ID
break
}
}
if index == -1 {
c.JSON(400, gin.H{"error": "id not exist"})
return
}
//查看key是否正确
if req.Key != "123456" {
c.JSON(400, gin.H{"error": "key error"})
return
}
//设备流
c.Stream(func(w io.Writer) bool {
var count int
var frame gocv.Mat
for {
cnt := service.GetDeviceCurrentFrameV2(&frame, req.ID)
if cnt == count {
time.Sleep(50 * time.Millisecond)
continue
}
//gocv.Matrix转为jpeg
img, err2 := gocv.IMEncode(".jpg", frame)
if err2 != nil {
fmt.Printf("img encode err:%v", err2)
time.Sleep(50 * time.Millisecond)
continue
}
frame_ := img.GetBytes()
_, err = w.Write([]byte("--frame\r\nContent-Type: image/jpeg\r\n\r\n"))
if err != nil {
fmt.Printf("写入头部信息错误: %v\n", err)
return false
}
_, err = w.Write(frame_)
if err != nil {
fmt.Printf("写入帧数据错误: %v\n", err)
return false
}
_, err = w.Write([]byte("\r\n"))
if err != nil {
fmt.Printf("写入帧结束标记错误: %v\n", err)
return false
}
time.Sleep(50 * time.Millisecond) // 控制帧率模拟每秒约20帧可按实际调整
}
})
}
}
// 发送实时视频流
func GetRealTimeImage(c *gin.Context) {
id, _ := c.Get("id")
id1 := id.(int)
device_id := c.Query("device_id")
device_id_int, _ := strconv.Atoi(device_id)
device := service.GetDevice(device_id_int, id1)
if device.ID == 0 {
c.JSON(http.StatusOK, gin.H{"code": 4, "message": "device not found"})
return
}
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Printf("connect wss err:%v", err)
return
}
worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5)
log.Printf("device_id:%d has set is_play to 1", device_id_int)
go subscribeAndHandleMessagesV3(ws, device_id_int)
}
func subscribeAndHandleMessagesV3(ws *websocket.Conn, device_id int) {
// 生成唯一连接 uuid
con_id := uuid.New().String()
online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids"
// 加入设备在线连接集合
worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5)
//图片计数器
count := 0
//定时器,发送计数器
tCount := 0
//从service获取当前帧
var img gocv.Mat
for {
c := service.GetDeviceCurrentFrameV2(&img, device_id)
if c != count {
//将img转[]byte
2025-01-15 00:39:18 +08:00
if img.Empty() {
log.Printf("device:%d img is empty! count=%d", device_id, c)
2025-01-15 00:39:18 +08:00
} else {
2025-01-15 23:04:32 +08:00
//gocv.Matrix转为jpeg
buf, err := gocv.IMEncode(".jpg", img)
if err != nil {
log.Printf("img encode err:%v", err)
2025-01-15 23:04:32 +08:00
worker.SetRedisSetRemove(online_conn_key, con_id)
break
2025-01-15 23:04:32 +08:00
}
2025-01-15 00:39:18 +08:00
buf1 := buf.GetBytes()
2025-01-15 00:39:18 +08:00
err2 := ws.WriteMessage(websocket.BinaryMessage, buf1)
if err2 != nil {
log.Printf("send message to client err:%v", err2)
2025-01-15 00:39:18 +08:00
worker.SetRedisSetRemove(online_conn_key, con_id)
break
2025-01-15 00:39:18 +08:00
}
c = count
err4 := img.Close()
if err4 != nil {
log.Printf("close img err:%v", err)
}
}
} else {
//每秒发送一次心跳检测
if tCount%10 == 0 {
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
if err != nil {
log.Printf("Connection check failed:%v", err)
worker.SetRedisSetRemove(online_conn_key, con_id)
break
}
}
}
time.Sleep(100 * time.Millisecond)
tCount++
}
//关闭img
err := img.Close()
if err != nil {
log.Printf("img close err:%v", err)
}
// 查看是否还有其他连接,没有则设置 is_play 为 0
if worker.IsContainKey(online_conn_key) == false {
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "1", time.Minute*5)
log.Printf("device_id: %d has set is_play to 0", device_id)
}
}