429 lines
13 KiB
Go
429 lines
13 KiB
Go
package handler
|
||
|
||
import (
|
||
"context"
|
||
"encoding/base64"
|
||
"encoding/json"
|
||
"fmt"
|
||
"github.com/gin-gonic/gin"
|
||
"github.com/google/uuid"
|
||
"github.com/gorilla/websocket"
|
||
"io"
|
||
"net/http"
|
||
"strconv"
|
||
"time"
|
||
"videoplayer/proto"
|
||
"videoplayer/service"
|
||
"videoplayer/worker"
|
||
)
|
||
|
||
type DeviceAddReq struct {
|
||
DeviceName string `json:"device_name" form:"device_name"`
|
||
DeviceIP string `json:"device_ip" form:"device_ip"`
|
||
DeviceStatus string `json:"device_status" form:"device_status"`
|
||
AuthID int `json:"auth_id" form:"auth_id"`
|
||
DeviceInfo string `json:"device_info" form:"device_info"`
|
||
DeviceType string `json:"device_type" form:"device_type"`
|
||
DeviceLocation string `json:"device_location" form:"device_location"`
|
||
}
|
||
|
||
type DeviceUpdateReq struct {
|
||
ID int `json:"id" form:"id"`
|
||
DeviceName string `json:"device_name" form:"device_name" `
|
||
DeviceIP string `json:"device_ip" form:"device_ip" `
|
||
DeviceStatus string `json:"device_status" form:"device_status" `
|
||
AuthID int `json:"auth_id" form:"auth_id" `
|
||
DeviceInfo string `json:"device_info" form:"device_info" `
|
||
DeviceType string `json:"device_type" form:"device_type" `
|
||
DeviceLocation string `json:"device_location" form:"device_location" `
|
||
}
|
||
|
||
type DeviceStatus struct {
|
||
IP string `json:"ip" form:"ip" `
|
||
Status string `json:"status" form:"status" `
|
||
ID int `json:"id" form:"id" `
|
||
}
|
||
|
||
type DeviceDelReq struct {
|
||
ID int `json:"id" form:"id" `
|
||
}
|
||
|
||
type DeviceRestartReq struct {
|
||
ID int `json:"id" form:"id" `
|
||
Option string `json:"option" form:"option" `
|
||
}
|
||
|
||
func SetUpDeviceGroup(router *gin.Engine) {
|
||
deviceGroup := router.Group("/device")
|
||
deviceGroup.POST("/get_device_list", GetDeviceList)
|
||
deviceGroup.POST("/restart", RestartDevice)
|
||
deviceGroup.POST("/add_device", AddDevice)
|
||
deviceGroup.POST("/set_device_status", SetDeviceStatus)
|
||
deviceGroup.POST("/update_device", UpdateDevice)
|
||
deviceGroup.POST("/delete_device", DeleteDevice)
|
||
deviceGroup.GET("/get_real_time_image", GetRealTimeImage)
|
||
deviceGroup.GET("/video_feed", VideoFeed)
|
||
}
|
||
|
||
func DeleteDevice(c *gin.Context) {
|
||
id, _ := c.Get("id")
|
||
//获取post参数
|
||
var req DeviceDelReq
|
||
if err := c.ShouldBind(&req); err == nil {
|
||
if service.DeleteDevice(req.ID, int(id.(float64))) {
|
||
c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success"})
|
||
} else {
|
||
c.JSON(200, gin.H{"code": proto.OperationFailed, "message": "failed"})
|
||
}
|
||
} else {
|
||
c.JSON(200, gin.H{"code": proto.ParameterError, "message": "failed"})
|
||
}
|
||
}
|
||
|
||
func UpdateDevice(c *gin.Context) {
|
||
var req DeviceUpdateReq
|
||
user_id, _ := c.Get("id")
|
||
if err := c.ShouldBind(&req); err == nil {
|
||
res := service.UpdateDevice(req.DeviceName, req.DeviceIP, req.DeviceStatus, req.DeviceInfo, req.DeviceType, req.DeviceLocation, req.ID, int(user_id.(float64)))
|
||
if res {
|
||
c.JSON(200, gin.H{
|
||
"code": 0,
|
||
"message": "success",
|
||
})
|
||
} else {
|
||
c.JSON(200, gin.H{
|
||
"code": proto.DeviceUpdateFailed,
|
||
"message": "failed",
|
||
})
|
||
}
|
||
} else {
|
||
c.JSON(200, gin.H{
|
||
"code": proto.ParameterError,
|
||
"message": "failed",
|
||
})
|
||
}
|
||
}
|
||
func SetDeviceStatus(c *gin.Context) {
|
||
var req DeviceStatus
|
||
id, _ := c.Get("id")
|
||
if err := c.ShouldBind(&req); err == nil {
|
||
if req.IP != "" {
|
||
if service.SetDeviceStatus(req.Status, req.ID, int(id.(float64))) {
|
||
c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success"})
|
||
} else {
|
||
c.JSON(200, gin.H{"code": 1, "message": "failed"})
|
||
}
|
||
}
|
||
} else {
|
||
c.JSON(200, gin.H{"code": proto.ParameterError, "message": "failed", "data": err.Error()})
|
||
}
|
||
}
|
||
|
||
func AddDevice(c *gin.Context) {
|
||
var req DeviceAddReq
|
||
id, _ := c.Get("id")
|
||
if err := c.ShouldBind(&req); err == nil {
|
||
user_id := int(id.(float64))
|
||
fmt.Println(req)
|
||
device_id := service.AddDevice(req.DeviceName, req.DeviceIP, req.DeviceStatus, req.DeviceInfo, req.DeviceType, req.DeviceLocation, user_id)
|
||
if device_id != 0 {
|
||
c.JSON(200, gin.H{
|
||
"code": proto.SuccessCode,
|
||
"message": "success",
|
||
"data": device_id,
|
||
})
|
||
} else {
|
||
c.JSON(200, gin.H{
|
||
"code": proto.DeviceAddFailed,
|
||
"message": "failed",
|
||
"data": "device add failed",
|
||
})
|
||
}
|
||
} else {
|
||
c.JSON(200, gin.H{
|
||
"code": proto.ParameterError,
|
||
"message": "failed",
|
||
"data": err.Error(),
|
||
})
|
||
}
|
||
}
|
||
|
||
func GetDeviceList(c *gin.Context) {
|
||
id, _ := c.Get("id")
|
||
devices := service.GetDeviceList(int(id.(float64)))
|
||
c.JSON(200, gin.H{
|
||
"code": proto.SuccessCode,
|
||
"message": "success",
|
||
"data": devices,
|
||
})
|
||
}
|
||
|
||
func RestartDevice(c *gin.Context) {
|
||
user_id, _ := c.Get("id")
|
||
var req DeviceRestartReq
|
||
if err := c.ShouldBind(&req); err != nil {
|
||
c.JSON(200, gin.H{"code": proto.ParameterError, "message": "failed", "data": err.Error()})
|
||
return
|
||
}
|
||
device_id := req.ID
|
||
if req.Option == "one" {
|
||
device := service.GetDevice(device_id, int(user_id.(float64)))
|
||
if device.ID != 0 {
|
||
if device.DeviceIP != "" {
|
||
if Restart(device.DeviceIP) {
|
||
c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success"})
|
||
} else {
|
||
c.JSON(200, gin.H{"code": proto.DeviceRestartFailed, "message": "failed"})
|
||
}
|
||
}
|
||
} else {
|
||
c.JSON(200, gin.H{"code": proto.DataNotFound, "message": "failed", "data": string(device_id) + ": device not found"})
|
||
}
|
||
} else if req.Option == "all" {
|
||
devices := service.GetDeviceList(int(user_id.(float64)))
|
||
if len(devices) > 0 {
|
||
for _, device := range devices {
|
||
if device.DeviceIP != "" {
|
||
if !Restart(device.DeviceIP) {
|
||
c.JSON(200, gin.H{"code": proto.DeviceRestartFailed, "message": "failed"})
|
||
return
|
||
}
|
||
}
|
||
}
|
||
} else {
|
||
c.JSON(200, gin.H{"code": proto.DataNotFound, "message": "failed", "data": "device not found"})
|
||
}
|
||
c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success"})
|
||
}
|
||
}
|
||
|
||
func Restart(ip string) bool {
|
||
url := "http://" + ip + "/restart"
|
||
resp, err := http.Get(url)
|
||
if err != nil {
|
||
return false
|
||
}
|
||
defer resp.Body.Close()
|
||
if resp.StatusCode == 200 {
|
||
return true
|
||
}
|
||
return false
|
||
}
|
||
|
||
// 发送实时视频流
|
||
func GetRealTimeImage(c *gin.Context) {
|
||
id, _ := c.Get("id")
|
||
id1 := int(id.(float64))
|
||
device_id := c.Query("device_id")
|
||
device_id_int, _ := strconv.Atoi(device_id)
|
||
device := service.GetDevice(device_id_int, id1)
|
||
if device.ID == 0 {
|
||
c.JSON(http.StatusOK, gin.H{"code": proto.DataNotFound, "message": "device not found"})
|
||
return
|
||
}
|
||
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||
clients[ws] = true
|
||
if err != nil {
|
||
fmt.Println("connect wss err:", err)
|
||
return
|
||
}
|
||
worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5)
|
||
fmt.Println("device_id:", device_id_int, " has set is_play to 1")
|
||
go subscribeAndHandleMessages(ws, device_id_int)
|
||
}
|
||
|
||
func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) {
|
||
ctx := context.Background()
|
||
pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs")
|
||
// 生成唯一连接 uuid
|
||
con_id := uuid.New().String()
|
||
online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids"
|
||
// 加入设备在线连接集合
|
||
worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5)
|
||
defer pubsub.Close()
|
||
defer ws.Close()
|
||
ch := pubsub.Channel()
|
||
var check_cnt int
|
||
var ticker *time.Ticker
|
||
for {
|
||
select {
|
||
case msg, _ := <-ch:
|
||
var res3 []byte
|
||
var msgObj proto.Message
|
||
if msg.Payload != "" {
|
||
msgObj.Type = "img"
|
||
msgObj.Msg = msg.Payload
|
||
msgObj.From_user_id = -1
|
||
res3, _ = json.Marshal(msgObj)
|
||
} else {
|
||
if check_cnt < 5 {
|
||
check_cnt++
|
||
time.Sleep(time.Millisecond * 200)
|
||
continue
|
||
}
|
||
check_cnt = 0
|
||
msgObj.Type = "check"
|
||
msgObj.Msg = "check"
|
||
msgObj.From_user_id = -1
|
||
res3, _ = json.Marshal(msgObj)
|
||
}
|
||
// fmt.Println("send message to client length:", len(res3))
|
||
err2 := ws.WriteMessage(websocket.TextMessage, res3)
|
||
if err2 != nil {
|
||
clientsMux.Lock()
|
||
clients[ws] = false
|
||
clientsMux.Unlock()
|
||
fmt.Println("send message to client err:", err2)
|
||
worker.SetRedisSetRemove(online_conn_key, con_id)
|
||
goto end
|
||
}
|
||
default:
|
||
if ticker == nil {
|
||
ticker = time.NewTicker(time.Second)
|
||
}
|
||
select {
|
||
case <-ticker.C:
|
||
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
|
||
if err != nil {
|
||
fmt.Println("Connection check failed:", err)
|
||
worker.SetRedisSetRemove(online_conn_key, con_id)
|
||
goto end
|
||
}
|
||
default:
|
||
continue
|
||
}
|
||
}
|
||
}
|
||
|
||
end:
|
||
// 查看是否还有其他连接,没有则设置 is_play 为 0
|
||
if worker.IsContainKey(online_conn_key) == false {
|
||
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5)
|
||
fmt.Println("device_id:", device_id, " has set is_play to 0")
|
||
}
|
||
|
||
}
|
||
|
||
// VideoFeed 函数用于处理视频流的获取与返回,支持根据前端连接情况控制发送
|
||
func VideoFeed(c *gin.Context) {
|
||
// 更严谨地获取并转换 id 参数,处理可能的错误
|
||
id, ok := c.Get("id")
|
||
if !ok {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"code": proto.DataNotFound, "message": "id not found in context"})
|
||
return
|
||
}
|
||
floatId, ok := id.(float64)
|
||
if !ok {
|
||
c.JSON(http.StatusInternalServerError, gin.H{"code": proto.DataFormatError, "message": "invalid id type in context"})
|
||
return
|
||
}
|
||
user_id := int(floatId)
|
||
|
||
device_id := c.Query("device_id")
|
||
device_id_int, err := strconv.Atoi(device_id)
|
||
if err != nil {
|
||
c.JSON(http.StatusBadRequest, gin.H{"code": proto.DataFormatError, "message": "invalid device_id format"})
|
||
return
|
||
}
|
||
|
||
c.Header("Content-Type", "multipart/x-mixed-replace; boundary=frame") // 设置响应头
|
||
|
||
device := service.GetDevice(device_id_int, user_id)
|
||
if device.ID == 0 {
|
||
c.JSON(http.StatusOK, gin.H{"code": proto.DataNotFound, "message": "device not found"})
|
||
return
|
||
}
|
||
|
||
// 在 Redis 中设置设备正在播放状态及过期时间,处理设置失败的情况
|
||
isSuccess := worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5)
|
||
if isSuccess == false {
|
||
fmt.Println("set redis is_play error:", err)
|
||
c.JSON(http.StatusInternalServerError, gin.H{"code": proto.OperationFailed, "message": "failed to set device play status in Redis"})
|
||
return
|
||
}
|
||
fmt.Println("device_id:", device_id_int, " has set is_play to 1")
|
||
|
||
ctx := context.Background()
|
||
pubSub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id_int)+"_frames_msgs")
|
||
|
||
defer func() {
|
||
err = pubSub.Close()
|
||
if err != nil {
|
||
fmt.Println("close pubSub error:", err)
|
||
}
|
||
}()
|
||
|
||
// 生成唯一连接 uuid
|
||
//conId := uuid.New().String()
|
||
//onlineConnKey := "device_" + strconv.Itoa(device_id_int) + "_online_conn_ids"
|
||
//// 加入设备在线连接集合,处理添加失败的情况
|
||
//worker.SetRedisSetAdd(onlineConnKey, conId)
|
||
|
||
// 创建一个通道用于接收客户端连接关闭的信号
|
||
clientClosed := make(chan struct{})
|
||
defer close(clientClosed)
|
||
c.Stream(func(w io.Writer) bool {
|
||
// 将读取 Redis 消息通道的逻辑放在这里,方便根据返回值控制循环
|
||
ch := pubSub.Channel()
|
||
for {
|
||
select {
|
||
case msg, ok := <-ch:
|
||
if !ok {
|
||
// 如果 Redis 通道关闭,进行相应处理并返回 false 停止发送
|
||
fmt.Println("Redis channel closed unexpectedly")
|
||
c.JSON(http.StatusInternalServerError, gin.H{"code": proto.RedisSetError, "message": "Redis channel closed"})
|
||
return false
|
||
}
|
||
// 将 base64 图片数据解析为图片
|
||
frame := msg.Payload
|
||
buf, err := base64.StdEncoding.DecodeString(frame)
|
||
if err != nil {
|
||
fmt.Println("base64 decode error:", err)
|
||
continue
|
||
}
|
||
// 按照视频流格式要求构造并返回响应数据
|
||
_, err = w.Write([]byte("--frame\r\n"))
|
||
if err != nil {
|
||
fmt.Println("write video frame error:", err)
|
||
return false
|
||
}
|
||
_, err = w.Write([]byte("Content-Type: image/jpeg\r\n\r\n"))
|
||
if err != nil {
|
||
fmt.Println("write video frame error:", err)
|
||
return false
|
||
}
|
||
_, err = w.Write(buf)
|
||
if err != nil {
|
||
fmt.Println("write video frame error:", err)
|
||
return false
|
||
}
|
||
_, err = w.Write([]byte("\r\n"))
|
||
if err != nil {
|
||
fmt.Println("write video frame error:", err)
|
||
return false
|
||
}
|
||
case <-clientClosed:
|
||
// 当接收到客户端关闭连接的信号,返回 false 停止发送视频流
|
||
return false
|
||
}
|
||
}
|
||
})
|
||
|
||
// 启动一个协程来监测客户端连接是否关闭,关闭时向 clientClosed 通道发送信号
|
||
go func() {
|
||
<-c.Request.Context().Done()
|
||
//如果clientClosed通道已经关闭,不再发送
|
||
if _, ok := <-clientClosed; !ok {
|
||
return
|
||
} else {
|
||
clientClosed <- struct{}{}
|
||
}
|
||
}()
|
||
|
||
// 查看是否还有其他连接,没有则设置 is_play 为 0
|
||
//if worker.IsContainKey(onlineConnKey) == false {
|
||
// worker.SetRedisWithExpire(strconv.Itoa(device_id_int)+"_is_play", "0", time.Minute*5)
|
||
// fmt.Println("device_id:", device_id, " has set is_play to 0")
|
||
//}
|
||
}
|