Merge branch 'refs/heads/feat-video-stream'

This commit is contained in:
junleea 2024-12-26 13:37:09 +08:00
commit 68d9082f96
2 changed files with 122 additions and 1 deletions

View File

@ -2,11 +2,13 @@ package handler
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"io"
"net/http"
"strconv"
"time"
@ -60,7 +62,7 @@ func SetUpDeviceGroup(router *gin.Engine) {
deviceGroup.POST("/update_device", UpdateDevice)
deviceGroup.POST("/delete_device", DeleteDevice)
deviceGroup.GET("/get_real_time_image", GetRealTimeImage)
deviceGroup.POST("/video_feed", VideoFeed)
}
func DeleteDevice(c *gin.Context) {
@ -301,3 +303,120 @@ end:
}
}
// VideoFeed 函数用于处理视频流的获取与返回,支持根据前端连接情况控制发送
func VideoFeed(c *gin.Context) {
// 更严谨地获取并转换 id 参数,处理可能的错误
id, ok := c.Get("id")
if !ok {
c.JSON(http.StatusInternalServerError, gin.H{"code": proto.DataNotFound, "message": "id not found in context"})
return
}
floatId, ok := id.(float64)
if !ok {
c.JSON(http.StatusInternalServerError, gin.H{"code": proto.DataFormatError, "message": "invalid id type in context"})
return
}
user_id := int(floatId)
device_id := c.Query("device_id")
device_id_int, err := strconv.Atoi(device_id)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"code": proto.DataFormatError, "message": "invalid device_id format"})
return
}
c.Header("Content-Type", "multipart/x-mixed-replace; boundary=frame") // 设置响应头
device := service.GetDevice(device_id_int, user_id)
if device.ID == 0 {
c.JSON(http.StatusOK, gin.H{"code": proto.DataNotFound, "message": "device not found"})
return
}
// 在 Redis 中设置设备正在播放状态及过期时间,处理设置失败的情况
isSuccess := worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5)
if isSuccess == false {
fmt.Println("set redis is_play error:", err)
c.JSON(http.StatusInternalServerError, gin.H{"code": proto.OperationFailed, "message": "failed to set device play status in Redis"})
return
}
fmt.Println("device_id:", device_id_int, " has set is_play to 1")
ctx := context.Background()
pubSub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id_int)+"_frames_msgs")
defer func() {
err = pubSub.Close()
if err != nil {
fmt.Println("close pubSub error:", err)
}
}()
// 生成唯一连接 uuid
//conId := uuid.New().String()
//onlineConnKey := "device_" + strconv.Itoa(device_id_int) + "_online_conn_ids"
//// 加入设备在线连接集合,处理添加失败的情况
//worker.SetRedisSetAdd(onlineConnKey, conId)
// 创建一个通道用于接收客户端连接关闭的信号
clientClosed := make(chan struct{})
c.Stream(func(w io.Writer) bool {
// 将读取 Redis 消息通道的逻辑放在这里,方便根据返回值控制循环
ch := pubSub.Channel()
for {
select {
case msg, ok := <-ch:
if !ok {
// 如果 Redis 通道关闭,进行相应处理并返回 false 停止发送
fmt.Println("Redis channel closed unexpectedly")
c.JSON(http.StatusInternalServerError, gin.H{"code": proto.RedisSetError, "message": "Redis channel closed"})
return false
}
// 将 base64 图片数据解析为图片
frame := msg.Payload
buf, err := base64.StdEncoding.DecodeString(frame)
if err != nil {
fmt.Println("base64 decode error:", err)
continue
}
// 按照视频流格式要求构造并返回响应数据
_, err = w.Write([]byte("--frame\r\n"))
if err != nil {
fmt.Println("write video frame error:", err)
continue
}
_, err = w.Write([]byte("Content-Type: image/jpeg\r\n\r\n"))
if err != nil {
fmt.Println("write video frame error:", err)
continue
}
_, err = w.Write(buf)
if err != nil {
fmt.Println("write video frame error:", err)
continue
}
_, err = w.Write([]byte("\r\n"))
if err != nil {
fmt.Println("write video frame error:", err)
continue
}
case <-clientClosed:
// 当接收到客户端关闭连接的信号,返回 false 停止发送视频流
return false
}
}
})
// 启动一个协程来监测客户端连接是否关闭,关闭时向 clientClosed 通道发送信号
go func() {
<-c.Request.Context().Done()
close(clientClosed)
}()
// 查看是否还有其他连接,没有则设置 is_play 为 0
//if worker.IsContainKey(onlineConnKey) == false {
// worker.SetRedisWithExpire(strconv.Itoa(device_id_int)+"_is_play", "0", time.Minute*5)
// fmt.Println("device_id:", device_id, " has set is_play to 0")
//}
}

View File

@ -61,4 +61,6 @@ const (
NoUploadPermissions = 76 // 无上传权限
DeleteFileFailed = 77 // 删除文件失败
DeleteFileInfoFailed = 78 // 删除文件信息失败
DataFormatError = 80 // 数据格式错误
)