设备实时图片通过redis发布订阅模式实现,修改redis参数

This commit is contained in:
junleea 2024-10-28 17:14:16 +08:00
parent cd3c9a30ce
commit 864445f47e
2 changed files with 60 additions and 67 deletions

View File

@ -1,6 +1,7 @@
package handler package handler
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@ -211,65 +212,57 @@ func GetRealTimeImage(c *gin.Context) {
id, _ := c.Get("id") id, _ := c.Get("id")
id1 := int(id.(float64)) id1 := int(id.(float64))
device_id := c.Query("device_id") device_id := c.Query("device_id")
//字符串转int
device_id_int, _ := strconv.Atoi(device_id) device_id_int, _ := strconv.Atoi(device_id)
device := service.GetDevice(device_id_int, id1) device := service.GetDevice(device_id_int, id1)
if device.ID == 0 { if device.ID == 0 {
c.JSON(http.StatusOK, gin.H{"code": proto.DataNotFound, "message": "device not found"}) c.JSON(http.StatusOK, gin.H{"code": proto.DataNotFound, "message": "device not found"})
return return
} }
//建立连接
// 升级HTTP连接为WebSocket连接
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
clients[ws] = true clients[ws] = true
if err != nil { if err != nil {
// log.Println(err)
fmt.Println(err) fmt.Println(err)
return return
} }
defer ws.Close() defer ws.Close()
worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5) //设置播放状态 worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5)
// 接收客户端消息并发送到指定用户 go subscribeAndHandleMessages(ws, device_id_int)
}
go func(ws *websocket.Conn, device_id int) { func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) {
ctx := context.Background()
}(ws, device_id_int) pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_channel")
defer pubsub.Close()
ch := pubsub.Channel()
var check_cnt int var check_cnt int
for msg := range ch {
for { var res3 []byte
if v := clients[ws]; v == true { var msgObj proto.Message
res2 := worker.PopRedisListLeft(device_id + "_frames") if msg.Payload != "" {
var res3 []byte msgObj.Type = "img"
var msg proto.Message msgObj.Msg = msg.Payload
if res2 != "" { msgObj.From_user_id = -1
//若有消息则发送消息 res3, _ = json.Marshal(msgObj)
msg.Type = "img" } else {
msg.Msg = res2 if check_cnt < 5 {
msg.From_user_id = id1 check_cnt++
res3, _ = json.Marshal(msg) time.Sleep(time.Millisecond * 200)
} else { continue
//若无消息则发送心跳包
if check_cnt < 5 {
check_cnt++
time.Sleep(time.Millisecond * 200) //设置延时200ms
continue
}
check_cnt = 0
msg.Type = "check"
msg.Msg = "check"
msg.From_user_id = -1
res3, _ = json.Marshal(msg)
} }
err2 := ws.WriteMessage(websocket.TextMessage, res3) check_cnt = 0
if err2 != nil { msgObj.Type = "check"
clientsMux.Lock() msgObj.Msg = "check"
clients[ws] = false msgObj.From_user_id = -1
clientsMux.Unlock() res3, _ = json.Marshal(msgObj)
//设置ws关闭状态信息
worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "0", time.Minute*5) //设置播放状态
break
}
time.Sleep(time.Millisecond * 200) //设置延时200ms
} }
err2 := ws.WriteMessage(websocket.TextMessage, res3)
if err2 != nil {
clientsMux.Lock()
clients[ws] = false
clientsMux.Unlock()
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5)
break
}
time.Sleep(time.Millisecond * 200)
} }
} }

View File

@ -12,18 +12,18 @@ import (
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
) )
var redisClient *redis.Client // Redis 客户端, 用于连接 Redis 服务器 var RedisClient *redis.Client // Redis 客户端, 用于连接 Redis 服务器
func InitRedis() { func InitRedis() {
ctx := context.Background() ctx := context.Background()
// 连接redis // 连接redis
redisClient = redis.NewClient(&redis.Options{ RedisClient = redis.NewClient(&redis.Options{
Addr: proto.REDIS_ADDR, // Redis 服务器地址 Addr: proto.REDIS_ADDR, // Redis 服务器地址
Password: proto.REDIS_PASSWORD, // 如果 Redis 设置了密码 Password: proto.REDIS_PASSWORD, // 如果 Redis 设置了密码
DB: proto.REIDS_DB, // 使用的数据库编号 DB: proto.REIDS_DB, // 使用的数据库编号
}) })
// 验证 Redis 客户端是否可以正常工作 // 验证 Redis 客户端是否可以正常工作
_, err := redisClient.Ping(ctx).Result() _, err := RedisClient.Ping(ctx).Result()
if err != nil { if err != nil {
fmt.Println("Error connecting to Redis: %v", err) fmt.Println("Error connecting to Redis: %v", err)
} }
@ -31,14 +31,14 @@ func InitRedis() {
func CloseRedis() { func CloseRedis() {
// 关闭 Redis 客户端 // 关闭 Redis 客户端
if err := redisClient.Close(); err != nil { if err := RedisClient.Close(); err != nil {
fmt.Println("Error closing Redis client: %v", err) fmt.Println("Error closing Redis client: %v", err)
} }
} }
func IsContainKey(key string) bool { func IsContainKey(key string) bool {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.Exists(ctx, key).Result() // 检查键是否存在, 如果存在则返回 1, 否则返回 0 val, err := RedisClient.Exists(ctx, key).Result() // 检查键是否存在, 如果存在则返回 1, 否则返回 0
if err != nil { if err != nil {
fmt.Println("Error getting key: %v", err) fmt.Println("Error getting key: %v", err)
return false return false
@ -53,7 +53,7 @@ func IsContainKey(key string) bool {
func SetRedis(key string, value string) bool { func SetRedis(key string, value string) bool {
ctx := context.Background() ctx := context.Background()
// 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等
err := redisClient.Set(ctx, key, value, time.Minute*30).Err() err := RedisClient.Set(ctx, key, value, time.Minute*30).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -73,12 +73,12 @@ func SetHashWithTime(key string, id int, name, email string, duration time.Durat
} }
// 设置哈希表的字段值, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 // 设置哈希表的字段值, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等
err := redisClient.HSet(ctx, key, fields).Err() err := RedisClient.HSet(ctx, key, fields).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
} }
err = redisClient.Expire(ctx, key, time.Hour*10).Err() err = RedisClient.Expire(ctx, key, time.Hour*10).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -89,12 +89,12 @@ func SetHashWithTime(key string, id int, name, email string, duration time.Durat
// 设置redis hash设置过期时间 // 设置redis hash设置过期时间
func SetHash(key string, data map[string]interface{}) bool { func SetHash(key string, data map[string]interface{}) bool {
ctx := context.Background() ctx := context.Background()
err := redisClient.HSet(ctx, key, data).Err() err := RedisClient.HSet(ctx, key, data).Err()
if err != nil { if err != nil {
fmt.Println("%v :Error setting hash: %v", key, err) fmt.Println("%v :Error setting hash: %v", key, err)
return false return false
} }
err = redisClient.Expire(ctx, key, time.Minute*30).Err() err = RedisClient.Expire(ctx, key, time.Minute*30).Err()
if err != nil { if err != nil {
fmt.Println("%v :Error setting expire: %v", key, err) fmt.Println("%v :Error setting expire: %v", key, err)
return false return false
@ -104,7 +104,7 @@ func SetHash(key string, data map[string]interface{}) bool {
func SetHashWithField(key string, field string, value string) bool { func SetHashWithField(key string, field string, value string) bool {
ctx := context.Background() ctx := context.Background()
err := redisClient.HSet(ctx, key, field, value).Err() err := RedisClient.HSet(ctx, key, field, value).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -114,7 +114,7 @@ func SetHashWithField(key string, field string, value string) bool {
func GetHash(key string, field string) string { func GetHash(key string, field string) string {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.HGet(ctx, key, field).Result() val, err := RedisClient.HGet(ctx, key, field).Result()
if err != nil { if err != nil {
fmt.Println("Error getting hash: %v", err) fmt.Println("Error getting hash: %v", err)
return "" return ""
@ -124,7 +124,7 @@ func GetHash(key string, field string) string {
func GetHashAll(key string) map[string]string { func GetHashAll(key string) map[string]string {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.HGetAll(ctx, key).Result() val, err := RedisClient.HGetAll(ctx, key).Result()
if err != nil { if err != nil {
fmt.Println("Error getting hash: %v", err) fmt.Println("Error getting hash: %v", err)
return nil return nil
@ -136,7 +136,7 @@ func GetHashAll(key string) map[string]string {
func SetRedisWithExpire(key string, value string, expire time.Duration) bool { // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 func SetRedisWithExpire(key string, value string, expire time.Duration) bool { // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等
ctx := context.Background() ctx := context.Background()
// 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等
err := redisClient.Set(ctx, key, value, expire).Err() err := RedisClient.Set(ctx, key, value, expire).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -147,7 +147,7 @@ func SetRedisWithExpire(key string, value string, expire time.Duration) bool { /
// 获取redis // 获取redis
func GetRedis(key string) string { func GetRedis(key string) string {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.Get(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 val, err := RedisClient.Get(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误
if err != nil { if err != nil {
fmt.Println(key, " Error getting key: %v", err) fmt.Println(key, " Error getting key: %v", err)
return "" return ""
@ -158,7 +158,7 @@ func GetRedis(key string) string {
// pop redis list from right,as stack // pop redis list from right,as stack
func PopRedisList(key string) string { func PopRedisList(key string) string {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.RPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 val, err := RedisClient.RPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误
if err != nil { if err != nil {
fmt.Println(key, " Error reading from Redis: %v", err) fmt.Println(key, " Error reading from Redis: %v", err)
return "" return ""
@ -169,7 +169,7 @@ func PopRedisList(key string) string {
// pop redis list from left,as queue // pop redis list from left,as queue
func PopRedisListLeft(key string) string { func PopRedisListLeft(key string) string {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.LPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 val, err := RedisClient.LPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误
if err != nil { if err != nil {
return "" return ""
} }
@ -178,7 +178,7 @@ func PopRedisListLeft(key string) string {
func DelRedis(key string) { func DelRedis(key string) {
ctx := context.Background() ctx := context.Background()
err := redisClient.Del(ctx, key).Err() err := RedisClient.Del(ctx, key).Err()
if err != nil { if err != nil {
fmt.Println("Error deleting key: %v", err) fmt.Println("Error deleting key: %v", err)
} }
@ -187,7 +187,7 @@ func DelRedis(key string) {
// push redis list from right // push redis list from right
func PushRedisList(key string, value string) bool { func PushRedisList(key string, value string) bool {
ctx := context.Background() ctx := context.Background()
err := redisClient.RPush(ctx, key, value).Err() err := RedisClient.RPush(ctx, key, value).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -197,12 +197,12 @@ func PushRedisList(key string, value string) bool {
func PushRedisListWithExpire(key string, value string, expire time.Duration) bool { func PushRedisListWithExpire(key string, value string, expire time.Duration) bool {
ctx := context.Background() ctx := context.Background()
err := redisClient.RPush(ctx, key, value).Err() err := RedisClient.RPush(ctx, key, value).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
} }
err = redisClient.Expire(ctx, key, expire).Err() err = RedisClient.Expire(ctx, key, expire).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -213,7 +213,7 @@ func PushRedisListWithExpire(key string, value string, expire time.Duration) boo
// delete redis key // delete redis key
func delRedis(key string) { func delRedis(key string) {
ctx := context.Background() ctx := context.Background()
err := redisClient.Del(ctx, key).Err() err := RedisClient.Del(ctx, key).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
} }
@ -239,7 +239,7 @@ func (u *RUser) toJSONString() string {
// put hash to redis // put hash to redis
func hSetRedis(key string, field string, value string) { func hSetRedis(key string, field string, value string) {
ctx := context.Background() ctx := context.Background()
err := redisClient.HSet(ctx, key, field, value).Err() err := RedisClient.HSet(ctx, key, field, value).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
} }
@ -248,7 +248,7 @@ func hSetRedis(key string, field string, value string) {
// get hash from redis // get hash from redis
func hGetRedis(key string, field string) string { func hGetRedis(key string, field string) string {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.HGet(ctx, key, field).Result() val, err := RedisClient.HGet(ctx, key, field).Result()
if err != nil { if err != nil {
fmt.Println("Error getting key: %v", err) fmt.Println("Error getting key: %v", err)
} }