VideoStream/handler/tool.go

202 lines
5.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package handler
import (
"VideoStream/dao"
"VideoStream/proto"
"VideoStream/service"
"VideoStream/worker"
"fmt"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"gocv.io/x/gocv"
"io"
"net/http"
"strconv"
"time"
)
var (
upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// 允许所有来源的连接
return true
},
}
)
func SetUpToolGroup(router *gin.Engine) {
toolGroup := router.Group("/tool")
toolGroup.GET("/video_stream", GetVideoStream)
toolGroup.GET("/video_real_time", GetRealTimeImage)
}
// 跨域访问cross origin resource share
func CrosHandler() gin.HandlerFunc {
return func(context *gin.Context) {
//method := context.Request.Method
context.Writer.Header().Set("Access-Control-Allow-Origin", "*")
context.Header("Access-Control-Allow-Origin", "*") // 设置允许访问所有域
context.Header("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE,UPDATE")
context.Header("Access-Control-Allow-Headers", "Authorization, Content-Length, X-CSRF-Token, Token,session,X_Requested_With,Accept, Origin, Host, Connection, Accept-Encoding, Accept-Language,DNT, X-CustomHeader, Keep-Alive, User-Agent, X-Requested-With, If-Modified-Since, Cache-Control, Content-Type, Pragma,token,openid,opentoken")
context.Header("Access-Control-Expose-Headers", "Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers,Cache-Control,Content-Language,Content-Type,Expires,Last-Modified,Pragma,FooBar")
context.Header("Access-Control-Max-Age", "172800")
context.Header("Access-Control-Allow-Credentials", "false")
context.Set("content-type", "application/json") //设置返回格式是json
// if method == "OPTIONS" {
// context.JSON(http.StatusOK, gin.H{
// "code":1,
// "message":"error",
// "data":"request error",
// })
// }
//处理请求
context.Next()
}
}
type videoStreamReq struct {
ID int `json:"id" form:"id"`
Key string `json:"key" form:"key"`
}
func GetVideoStream(c *gin.Context) {
var req videoStreamReq
id, _ := c.Get("id")
id1 := id.(int)
//校验权限
device := dao.FindDeviceByID(req.ID, id1)
if device.ID == 0 {
c.JSON(400, gin.H{"error": "device not exist"})
return
}
if err := c.ShouldBind(&req); err != nil {
c.JSON(400, gin.H{"error": err.Error()})
return
} else {
//查看id是否存在
index := -1
for _, device := range proto.Config.DeviceInfo {
if device.ID == req.ID {
index = req.ID
break
}
}
if index == -1 {
c.JSON(400, gin.H{"error": "id not exist"})
return
}
//查看key是否正确
if req.Key != "123456" {
c.JSON(400, gin.H{"error": "key error"})
return
}
//设备流
c.Stream(func(w io.Writer) bool {
var count int
for {
frame, cnt := service.GetDeviceCurrentFrame(req.ID)
if cnt == count {
time.Sleep(50 * time.Millisecond)
continue
}
//gocv.Matrix转为jpeg
img, err := gocv.IMEncode(".jpg", frame)
frame_ := img.GetBytes()
_, err = w.Write([]byte("--frame\r\nContent-Type: image/jpeg\r\n\r\n"))
if err != nil {
fmt.Printf("写入头部信息错误: %v\n", err)
return false
}
_, err = w.Write(frame_)
if err != nil {
fmt.Printf("写入帧数据错误: %v\n", err)
return false
}
_, err = w.Write([]byte("\r\n"))
if err != nil {
fmt.Printf("写入帧结束标记错误: %v\n", err)
return false
}
time.Sleep(50 * time.Millisecond) // 控制帧率模拟每秒约20帧可按实际调整
}
})
}
}
// 发送实时视频流
func GetRealTimeImage(c *gin.Context) {
id, _ := c.Get("id")
id1 := id.(int)
device_id := c.Query("device_id")
device_id_int, _ := strconv.Atoi(device_id)
device := service.GetDevice(device_id_int, id1)
if device.ID == 0 {
c.JSON(http.StatusOK, gin.H{"code": 4, "message": "device not found"})
return
}
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
fmt.Println("connect wss err:", err)
return
}
worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5)
fmt.Println("device_id:", device_id_int, " has set is_play to 1")
go subscribeAndHandleMessagesV3(ws, device_id_int)
}
func subscribeAndHandleMessagesV3(ws *websocket.Conn, device_id int) {
// 生成唯一连接 uuid
con_id := uuid.New().String()
online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids"
// 加入设备在线连接集合
worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5)
//图片计数器
count := 0
//定时器,发送计数器
t_count := 0
for {
//从service获取当前帧
img, c := service.GetDeviceCurrentFrame(device_id)
if c != count {
//将img转[]byte
buf, _ := gocv.IMEncode(".jpg", img)
buf1 := buf.GetBytes()
err2 := ws.WriteMessage(websocket.BinaryMessage, buf1)
if err2 != nil {
fmt.Println("send message to client err:", err2)
worker.SetRedisSetRemove(online_conn_key, con_id)
goto end
}
c = count
} else {
//每秒发送一次心跳检测
if t_count%10 == 0 {
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
if err != nil {
fmt.Println("Connection check failed:", err)
worker.SetRedisSetRemove(online_conn_key, con_id)
goto end
}
}
}
time.Sleep(100 * time.Millisecond)
t_count++
}
end:
// 查看是否还有其他连接,没有则设置 is_play 为 0
if worker.IsContainKey(online_conn_key) == false {
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "1", time.Minute*5)
fmt.Println("device_id:", device_id, " has set is_play to 0")
}
}