videoplayer/handler/device.go

491 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package handler
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"io"
"net/http"
"strconv"
"time"
"videoplayer/proto"
"videoplayer/service"
"videoplayer/worker"
)
type DeviceAddReq struct {
DeviceName string `json:"device_name" form:"device_name"`
DeviceIP string `json:"device_ip" form:"device_ip"`
DeviceStatus string `json:"device_status" form:"device_status"`
AuthID int `json:"auth_id" form:"auth_id"`
DeviceInfo string `json:"device_info" form:"device_info"`
DeviceType string `json:"device_type" form:"device_type"`
DeviceLocation string `json:"device_location" form:"device_location"`
}
type DeviceUpdateReq struct {
ID int `json:"id" form:"id"`
DeviceName string `json:"device_name" form:"device_name" `
DeviceIP string `json:"device_ip" form:"device_ip" `
DeviceStatus string `json:"device_status" form:"device_status" `
AuthID int `json:"auth_id" form:"auth_id" `
DeviceInfo string `json:"device_info" form:"device_info" `
DeviceType string `json:"device_type" form:"device_type" `
DeviceLocation string `json:"device_location" form:"device_location" `
}
type DeviceStatus struct {
IP string `json:"ip" form:"ip" `
Status string `json:"status" form:"status" `
ID int `json:"id" form:"id" `
}
type DeviceDelReq struct {
ID int `json:"id" form:"id" `
}
type DeviceRestartReq struct {
ID int `json:"id" form:"id" `
Option string `json:"option" form:"option" `
}
func SetUpDeviceGroup(router *gin.Engine) {
deviceGroup := router.Group("/device")
deviceGroup.POST("/get_device_list", GetDeviceList)
deviceGroup.POST("/restart", RestartDevice)
deviceGroup.POST("/add_device", AddDevice)
deviceGroup.POST("/set_device_status", SetDeviceStatus)
deviceGroup.POST("/update_device", UpdateDevice)
deviceGroup.POST("/delete_device", DeleteDevice)
deviceGroup.GET("/get_real_time_image", GetRealTimeImage)
deviceGroup.GET("/video_feed", VideoFeed)
}
func DeleteDevice(c *gin.Context) {
id, _ := c.Get("id")
//获取post参数
var req DeviceDelReq
if err := c.ShouldBind(&req); err == nil {
if service.DeleteDevice(req.ID, int(id.(float64))) {
c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success"})
} else {
c.JSON(200, gin.H{"code": proto.OperationFailed, "message": "failed"})
}
} else {
c.JSON(200, gin.H{"code": proto.ParameterError, "message": "failed"})
}
}
func UpdateDevice(c *gin.Context) {
var req DeviceUpdateReq
user_id, _ := c.Get("id")
if err := c.ShouldBind(&req); err == nil {
res := service.UpdateDevice(req.DeviceName, req.DeviceIP, req.DeviceStatus, req.DeviceInfo, req.DeviceType, req.DeviceLocation, req.ID, int(user_id.(float64)))
if res {
c.JSON(200, gin.H{
"code": 0,
"message": "success",
})
} else {
c.JSON(200, gin.H{
"code": proto.DeviceUpdateFailed,
"message": "failed",
})
}
} else {
c.JSON(200, gin.H{
"code": proto.ParameterError,
"message": "failed",
})
}
}
func SetDeviceStatus(c *gin.Context) {
var req DeviceStatus
id, _ := c.Get("id")
if err := c.ShouldBind(&req); err == nil {
if req.IP != "" {
if service.SetDeviceStatus(req.Status, req.ID, int(id.(float64))) {
c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success"})
} else {
c.JSON(200, gin.H{"code": 1, "message": "failed"})
}
}
} else {
c.JSON(200, gin.H{"code": proto.ParameterError, "message": "failed", "data": err.Error()})
}
}
func AddDevice(c *gin.Context) {
var req DeviceAddReq
id, _ := c.Get("id")
if err := c.ShouldBind(&req); err == nil {
user_id := int(id.(float64))
fmt.Println(req)
device_id := service.AddDevice(req.DeviceName, req.DeviceIP, req.DeviceStatus, req.DeviceInfo, req.DeviceType, req.DeviceLocation, user_id)
if device_id != 0 {
c.JSON(200, gin.H{
"code": proto.SuccessCode,
"message": "success",
"data": device_id,
})
} else {
c.JSON(200, gin.H{
"code": proto.DeviceAddFailed,
"message": "failed",
"data": "device add failed",
})
}
} else {
c.JSON(200, gin.H{
"code": proto.ParameterError,
"message": "failed",
"data": err.Error(),
})
}
}
func GetDeviceList(c *gin.Context) {
id, _ := c.Get("id")
devices := service.GetDeviceList(int(id.(float64)))
c.JSON(200, gin.H{
"code": proto.SuccessCode,
"message": "success",
"data": devices,
})
}
func RestartDevice(c *gin.Context) {
user_id, _ := c.Get("id")
var req DeviceRestartReq
if err := c.ShouldBind(&req); err != nil {
c.JSON(200, gin.H{"code": proto.ParameterError, "message": "failed", "data": err.Error()})
return
}
device_id := req.ID
if req.Option == "one" {
device := service.GetDevice(device_id, int(user_id.(float64)))
if device.ID != 0 {
if device.DeviceIP != "" {
if Restart(device.DeviceIP) {
c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success"})
} else {
c.JSON(200, gin.H{"code": proto.DeviceRestartFailed, "message": "failed"})
}
}
} else {
c.JSON(200, gin.H{"code": proto.DataNotFound, "message": "failed", "data": string(device_id) + ": device not found"})
}
} else if req.Option == "all" {
devices := service.GetDeviceList(int(user_id.(float64)))
if len(devices) > 0 {
for _, device := range devices {
if device.DeviceIP != "" {
if !Restart(device.DeviceIP) {
c.JSON(200, gin.H{"code": proto.DeviceRestartFailed, "message": "failed"})
return
}
}
}
} else {
c.JSON(200, gin.H{"code": proto.DataNotFound, "message": "failed", "data": "device not found"})
}
c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success"})
}
}
func Restart(ip string) bool {
url := "http://" + ip + "/restart"
resp, err := http.Get(url)
if err != nil {
return false
}
defer resp.Body.Close()
if resp.StatusCode == 200 {
return true
}
return false
}
// 发送实时视频流
func GetRealTimeImage(c *gin.Context) {
id, _ := c.Get("id")
id1 := int(id.(float64))
device_id := c.Query("device_id")
device_id_int, _ := strconv.Atoi(device_id)
device := service.GetDevice(device_id_int, id1)
if device.ID == 0 {
c.JSON(http.StatusOK, gin.H{"code": proto.DataNotFound, "message": "device not found"})
return
}
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
clients[ws] = true
if err != nil {
fmt.Println("connect wss err:", err)
return
}
worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5)
fmt.Println("device_id:", device_id_int, " has set is_play to 1")
go subscribeAndHandleMessagesV2(ws, device_id_int)
}
func subscribeAndHandleMessagesV2(ws *websocket.Conn, device_id int) {
ctx := context.Background()
pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs")
// 生成唯一连接 uuid
con_id := uuid.New().String()
online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids"
// 加入设备在线连接集合
worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5)
defer pubsub.Close()
defer ws.Close()
ch := pubsub.Channel()
var ticker *time.Ticker // 定时器, 用于发送心跳包, 防止连接断开,每秒发送一次
for {
select {
case msg, _ := <-ch:
frame := ""
if msg.Payload != "" {
frame = msg.Payload
}
//将base64图片数据解析为图片
buf, err := base64.StdEncoding.DecodeString(frame)
if err != nil {
fmt.Println("base64 decode error:", err)
continue
}
err2 := ws.WriteMessage(websocket.BinaryMessage, buf)
if err2 != nil {
clientsMux.Lock()
clients[ws] = false
clientsMux.Unlock()
fmt.Println("send message to client err:", err2)
worker.SetRedisSetRemove(online_conn_key, con_id)
goto end
}
default:
if ticker == nil {
ticker = time.NewTicker(time.Second)
}
select {
case <-ticker.C:
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
if err != nil {
fmt.Println("Connection check failed:", err)
worker.SetRedisSetRemove(online_conn_key, con_id)
goto end
}
default:
continue
}
}
}
end:
// 查看是否还有其他连接,没有则设置 is_play 为 0
if worker.IsContainKey(online_conn_key) == false {
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "1", time.Minute*5)
fmt.Println("device_id:", device_id, " has set is_play to 0")
}
}
func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) {
ctx := context.Background()
pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs")
// 生成唯一连接 uuid
con_id := uuid.New().String()
online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids"
// 加入设备在线连接集合
worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5)
defer pubsub.Close()
defer ws.Close()
ch := pubsub.Channel()
var check_cnt int
var ticker *time.Ticker
for {
select {
case msg, _ := <-ch:
var res3 []byte
var msgObj proto.Message
if msg.Payload != "" {
msgObj.Type = "img"
msgObj.Msg = msg.Payload
msgObj.From_user_id = -1
res3, _ = json.Marshal(msgObj)
} else {
if check_cnt < 5 {
check_cnt++
time.Sleep(time.Millisecond * 200)
continue
}
check_cnt = 0
msgObj.Type = "check"
msgObj.Msg = "check"
msgObj.From_user_id = -1
res3, _ = json.Marshal(msgObj)
}
// fmt.Println("send message to client length:", len(res3))
err2 := ws.WriteMessage(websocket.TextMessage, res3)
if err2 != nil {
clientsMux.Lock()
clients[ws] = false
clientsMux.Unlock()
fmt.Println("send message to client err:", err2)
worker.SetRedisSetRemove(online_conn_key, con_id)
goto end
}
default:
if ticker == nil {
ticker = time.NewTicker(time.Second)
}
select {
case <-ticker.C:
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
if err != nil {
fmt.Println("Connection check failed:", err)
worker.SetRedisSetRemove(online_conn_key, con_id)
goto end
}
default:
continue
}
}
}
end:
// 查看是否还有其他连接,没有则设置 is_play 为 0
if worker.IsContainKey(online_conn_key) == false {
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5)
fmt.Println("device_id:", device_id, " has set is_play to 0")
}
}
// VideoFeed 函数用于处理视频流的获取与返回,支持根据前端连接情况控制发送
func VideoFeed(c *gin.Context) {
// 更严谨地获取并转换 id 参数,处理可能的错误
id, ok := c.Get("id")
if !ok {
c.JSON(http.StatusInternalServerError, gin.H{"code": proto.DataNotFound, "message": "id not found in context"})
return
}
floatId, ok := id.(float64)
if !ok {
c.JSON(http.StatusInternalServerError, gin.H{"code": proto.DataFormatError, "message": "invalid id type in context"})
return
}
user_id := int(floatId)
device_id := c.Query("device_id")
device_id_int, err := strconv.Atoi(device_id)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"code": proto.DataFormatError, "message": "invalid device_id format"})
return
}
c.Header("Content-Type", "multipart/x-mixed-replace; boundary=frame") // 设置响应头
device := service.GetDevice(device_id_int, user_id)
if device.ID == 0 {
c.JSON(http.StatusOK, gin.H{"code": proto.DataNotFound, "message": "device not found"})
return
}
// 在 Redis 中设置设备正在播放状态及过期时间,处理设置失败的情况
isSuccess := worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5)
if isSuccess == false {
fmt.Println("set redis is_play error:", err)
c.JSON(http.StatusInternalServerError, gin.H{"code": proto.OperationFailed, "message": "failed to set device play status in Redis"})
return
}
fmt.Println("device_id:", device_id_int, " has set is_play to 1")
ctx := context.Background()
pubSub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id_int)+"_frames_msgs")
defer func() {
err = pubSub.Close()
if err != nil {
fmt.Println("close pubSub error:", err)
}
}()
// 生成唯一连接 uuid
//conId := uuid.New().String()
//onlineConnKey := "device_" + strconv.Itoa(device_id_int) + "_online_conn_ids"
//// 加入设备在线连接集合,处理添加失败的情况
//worker.SetRedisSetAdd(onlineConnKey, conId)
// 创建一个通道用于接收客户端连接关闭的信号
clientClosed := make(chan struct{})
defer close(clientClosed)
c.Stream(func(w io.Writer) bool {
// 将读取 Redis 消息通道的逻辑放在这里,方便根据返回值控制循环
ch := pubSub.Channel()
for {
select {
case msg, ok := <-ch:
if !ok {
// 如果 Redis 通道关闭,进行相应处理并返回 false 停止发送
fmt.Println("Redis channel closed unexpectedly")
c.JSON(http.StatusInternalServerError, gin.H{"code": proto.RedisSetError, "message": "Redis channel closed"})
return false
}
// 将 base64 图片数据解析为图片
frame := msg.Payload
buf, err := base64.StdEncoding.DecodeString(frame)
if err != nil {
fmt.Println("base64 decode error:", err)
continue
}
// 按照视频流格式要求构造并返回响应数据
_, err = w.Write([]byte("--frame\r\n"))
if err != nil {
fmt.Println("write video frame error:", err)
return false
}
_, err = w.Write([]byte("Content-Type: image/jpeg\r\n\r\n"))
if err != nil {
fmt.Println("write video frame error:", err)
return false
}
_, err = w.Write(buf)
if err != nil {
fmt.Println("write video frame error:", err)
return false
}
_, err = w.Write([]byte("\r\n"))
if err != nil {
fmt.Println("write video frame error:", err)
return false
}
case <-clientClosed:
// 当接收到客户端关闭连接的信号,返回 false 停止发送视频流
return false
}
}
})
// 启动一个协程来监测客户端连接是否关闭,关闭时向 clientClosed 通道发送信号
go func() {
<-c.Request.Context().Done()
//如果clientClosed通道已经关闭不再发送
if _, ok := <-clientClosed; !ok {
return
} else {
clientClosed <- struct{}{}
}
}()
// 查看是否还有其他连接,没有则设置 is_play 为 0
//if worker.IsContainKey(onlineConnKey) == false {
// worker.SetRedisWithExpire(strconv.Itoa(device_id_int)+"_is_play", "0", time.Minute*5)
// fmt.Println("device_id:", device_id, " has set is_play to 0")
//}
}