package handler import ( "context" "encoding/base64" "encoding/json" "fmt" "github.com/gin-gonic/gin" "github.com/google/uuid" "github.com/gorilla/websocket" "net/http" "strconv" "time" "videoplayer/proto" "videoplayer/service" "videoplayer/worker" ) type DeviceAddReq struct { DeviceName string `json:"device_name" form:"device_name"` DeviceIP string `json:"device_ip" form:"device_ip"` DeviceStatus string `json:"device_status" form:"device_status"` AuthID int `json:"auth_id" form:"auth_id"` DeviceInfo string `json:"device_info" form:"device_info"` DeviceType string `json:"device_type" form:"device_type"` DeviceLocation string `json:"device_location" form:"device_location"` } type DeviceUpdateReq struct { ID int `json:"id" form:"id"` DeviceName string `json:"device_name" form:"device_name" ` DeviceIP string `json:"device_ip" form:"device_ip" ` DeviceStatus string `json:"device_status" form:"device_status" ` AuthID int `json:"auth_id" form:"auth_id" ` DeviceInfo string `json:"device_info" form:"device_info" ` DeviceType string `json:"device_type" form:"device_type" ` DeviceLocation string `json:"device_location" form:"device_location" ` } type DeviceStatus struct { IP string `json:"ip" form:"ip" ` Status string `json:"status" form:"status" ` ID int `json:"id" form:"id" ` } type DeviceDelReq struct { ID int `json:"id" form:"id" ` } type DeviceRestartReq struct { ID int `json:"id" form:"id" ` Option string `json:"option" form:"option" ` } func SetUpDeviceGroup(router *gin.Engine) { deviceGroup := router.Group("/device") deviceGroup.POST("/get_device_list", GetDeviceList) deviceGroup.POST("/restart", RestartDevice) deviceGroup.POST("/add_device", AddDevice) deviceGroup.POST("/set_device_status", SetDeviceStatus) deviceGroup.POST("/update_device", UpdateDevice) deviceGroup.POST("/delete_device", DeleteDevice) deviceGroup.GET("/get_real_time_image", GetRealTimeImage) deviceGroup.POST("/video_feed", VideoFeed) } func DeleteDevice(c *gin.Context) { id, _ := c.Get("id") //获取post参数 var req DeviceDelReq if err := c.ShouldBind(&req); err == nil { if service.DeleteDevice(req.ID, int(id.(float64))) { c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success"}) } else { c.JSON(200, gin.H{"code": proto.OperationFailed, "message": "failed"}) } } else { c.JSON(200, gin.H{"code": proto.ParameterError, "message": "failed"}) } } func UpdateDevice(c *gin.Context) { var req DeviceUpdateReq user_id, _ := c.Get("id") if err := c.ShouldBind(&req); err == nil { res := service.UpdateDevice(req.DeviceName, req.DeviceIP, req.DeviceStatus, req.DeviceInfo, req.DeviceType, req.DeviceLocation, req.ID, int(user_id.(float64))) if res { c.JSON(200, gin.H{ "code": 0, "message": "success", }) } else { c.JSON(200, gin.H{ "code": proto.DeviceUpdateFailed, "message": "failed", }) } } else { c.JSON(200, gin.H{ "code": proto.ParameterError, "message": "failed", }) } } func SetDeviceStatus(c *gin.Context) { var req DeviceStatus id, _ := c.Get("id") if err := c.ShouldBind(&req); err == nil { if req.IP != "" { if service.SetDeviceStatus(req.Status, req.ID, int(id.(float64))) { c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success"}) } else { c.JSON(200, gin.H{"code": 1, "message": "failed"}) } } } else { c.JSON(200, gin.H{"code": proto.ParameterError, "message": "failed", "data": err.Error()}) } } func AddDevice(c *gin.Context) { var req DeviceAddReq id, _ := c.Get("id") if err := c.ShouldBind(&req); err == nil { user_id := int(id.(float64)) fmt.Println(req) device_id := service.AddDevice(req.DeviceName, req.DeviceIP, req.DeviceStatus, req.DeviceInfo, req.DeviceType, req.DeviceLocation, user_id) if device_id != 0 { c.JSON(200, gin.H{ "code": proto.SuccessCode, "message": "success", "data": device_id, }) } else { c.JSON(200, gin.H{ "code": proto.DeviceAddFailed, "message": "failed", "data": "device add failed", }) } } else { c.JSON(200, gin.H{ "code": proto.ParameterError, "message": "failed", "data": err.Error(), }) } } func GetDeviceList(c *gin.Context) { id, _ := c.Get("id") devices := service.GetDeviceList(int(id.(float64))) c.JSON(200, gin.H{ "code": proto.SuccessCode, "message": "success", "data": devices, }) } func RestartDevice(c *gin.Context) { user_id, _ := c.Get("id") var req DeviceRestartReq if err := c.ShouldBind(&req); err != nil { c.JSON(200, gin.H{"code": proto.ParameterError, "message": "failed", "data": err.Error()}) return } device_id := req.ID if req.Option == "one" { device := service.GetDevice(device_id, int(user_id.(float64))) if device.ID != 0 { if device.DeviceIP != "" { if Restart(device.DeviceIP) { c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success"}) } else { c.JSON(200, gin.H{"code": proto.DeviceRestartFailed, "message": "failed"}) } } } else { c.JSON(200, gin.H{"code": proto.DataNotFound, "message": "failed", "data": string(device_id) + ": device not found"}) } } else if req.Option == "all" { devices := service.GetDeviceList(int(user_id.(float64))) if len(devices) > 0 { for _, device := range devices { if device.DeviceIP != "" { if !Restart(device.DeviceIP) { c.JSON(200, gin.H{"code": proto.DeviceRestartFailed, "message": "failed"}) return } } } } else { c.JSON(200, gin.H{"code": proto.DataNotFound, "message": "failed", "data": "device not found"}) } c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success"}) } } func Restart(ip string) bool { url := "http://" + ip + "/restart" resp, err := http.Get(url) if err != nil { return false } defer resp.Body.Close() if resp.StatusCode == 200 { return true } return false } // 发送实时视频流 func GetRealTimeImage(c *gin.Context) { id, _ := c.Get("id") id1 := int(id.(float64)) device_id := c.Query("device_id") device_id_int, _ := strconv.Atoi(device_id) device := service.GetDevice(device_id_int, id1) if device.ID == 0 { c.JSON(http.StatusOK, gin.H{"code": proto.DataNotFound, "message": "device not found"}) return } ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) clients[ws] = true if err != nil { fmt.Println("connect wss err:", err) return } worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5) fmt.Println("device_id:", device_id_int, " has set is_play to 1") go subscribeAndHandleMessages(ws, device_id_int) } func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { ctx := context.Background() pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs") // 生成唯一连接 uuid con_id := uuid.New().String() online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids" // 加入设备在线连接集合 worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5) defer pubsub.Close() defer ws.Close() ch := pubsub.Channel() var check_cnt int var ticker *time.Ticker for { select { case msg, _ := <-ch: var res3 []byte var msgObj proto.Message if msg.Payload != "" { msgObj.Type = "img" msgObj.Msg = msg.Payload msgObj.From_user_id = -1 res3, _ = json.Marshal(msgObj) } else { if check_cnt < 5 { check_cnt++ time.Sleep(time.Millisecond * 200) continue } check_cnt = 0 msgObj.Type = "check" msgObj.Msg = "check" msgObj.From_user_id = -1 res3, _ = json.Marshal(msgObj) } // fmt.Println("send message to client length:", len(res3)) err2 := ws.WriteMessage(websocket.TextMessage, res3) if err2 != nil { clientsMux.Lock() clients[ws] = false clientsMux.Unlock() fmt.Println("send message to client err:", err2) worker.SetRedisSetRemove(online_conn_key, con_id) goto end } default: if ticker == nil { ticker = time.NewTicker(time.Second) } select { case <-ticker.C: err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)) if err != nil { fmt.Println("Connection check failed:", err) worker.SetRedisSetRemove(online_conn_key, con_id) goto end } default: continue } } } end: // 查看是否还有其他连接,没有则设置 is_play 为 0 if worker.IsContainKey(online_conn_key) == false { worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) fmt.Println("device_id:", device_id, " has set is_play to 0") } } // VideoFeed 函数用于处理视频流的获取与返回,支持根据前端连接情况控制发送 func VideoFeed(c *gin.Context) { // 更严谨地获取并转换 id 参数,处理可能的错误 id, ok := c.Get("id") if !ok { c.JSON(http.StatusInternalServerError, gin.H{"code": proto.DataNotFound, "message": "id not found in context"}) return } floatId, ok := id.(float64) if !ok { c.JSON(http.StatusInternalServerError, gin.H{"code": proto.DataFormatError, "message": "invalid id type in context"}) return } user_id := int(floatId) device_id := c.Query("device_id") device_id_int, err := strconv.Atoi(device_id) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"code": proto.DataFormatError, "message": "invalid device_id format"}) return } c.Header("Content-Type", "multipart/x-mixed-replace; boundary=frame") // 设置响应头 device := service.GetDevice(device_id_int, user_id) if device.ID == 0 { c.JSON(http.StatusOK, gin.H{"code": proto.DataNotFound, "message": "device not found"}) return } // 在 Redis 中设置设备正在播放状态及过期时间,处理设置失败的情况 isSuccess := worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5) if isSuccess == false { fmt.Println("set redis is_play error:", err) c.JSON(http.StatusInternalServerError, gin.H{"code": proto.OperationFailed, "message": "failed to set device play status in Redis"}) return } fmt.Println("device_id:", device_id_int, " has set is_play to 1") ctx := context.Background() pubSub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id_int)+"_frames_msgs") defer func() { err = pubSub.Close() if err != nil { fmt.Println("close pubSub error:", err) } }() // 生成唯一连接 uuid //conId := uuid.New().String() //onlineConnKey := "device_" + strconv.Itoa(device_id_int) + "_online_conn_ids" //// 加入设备在线连接集合,处理添加失败的情况 //worker.SetRedisSetAdd(onlineConnKey, conId) // 创建一个通道用于接收客户端连接关闭的信号 clientClosed := make(chan struct{}) c.Stream(func(w http.ResponseWriter) bool { // 将读取 Redis 消息通道的逻辑放在这里,方便根据返回值控制循环 ch := pubSub.Channel() for { select { case msg, ok := <-ch: if !ok { // 如果 Redis 通道关闭,进行相应处理并返回 false 停止发送 fmt.Println("Redis channel closed unexpectedly") c.JSON(http.StatusInternalServerError, gin.H{"code": proto.RedisSetError, "message": "Redis channel closed"}) return false } // 将 base64 图片数据解析为图片 frame := msg.Payload buf, err := base64.StdEncoding.DecodeString(frame) if err != nil { fmt.Println("base64 decode error:", err) continue } // 按照视频流格式要求构造并返回响应数据 _, err = w.Write([]byte("--frame\r\n")) if err != nil { fmt.Println("write video frame error:", err) continue } _, err = w.Write([]byte("Content-Type: image/jpeg\r\n\r\n")) if err != nil { fmt.Println("write video frame error:", err) continue } _, err = w.Write(buf) if err != nil { fmt.Println("write video frame error:", err) continue } _, err = w.Write([]byte("\r\n")) if err != nil { fmt.Println("write video frame error:", err) continue } case <-clientClosed: // 当接收到客户端关闭连接的信号,返回 false 停止发送视频流 return false } } }) // 启动一个协程来监测客户端连接是否关闭,关闭时向 clientClosed 通道发送信号 go func() { <-c.Request.Context().Done() close(clientClosed) }() // 查看是否还有其他连接,没有则设置 is_play 为 0 //if worker.IsContainKey(onlineConnKey) == false { // worker.SetRedisWithExpire(strconv.Itoa(device_id_int)+"_is_play", "0", time.Minute*5) // fmt.Println("device_id:", device_id, " has set is_play to 0") //} }