From 7770efd21a65aee642271cb56595af67a49dd12f Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Fri, 25 Oct 2024 18:45:04 +0800 Subject: [PATCH 01/17] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E5=88=9D=E6=9C=AA=E8=AE=BE=E7=BD=AE=E6=83=85?= =?UTF-8?q?=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/video.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/handler/video.go b/handler/video.go index 5246d4f..341e005 100644 --- a/handler/video.go +++ b/handler/video.go @@ -160,6 +160,10 @@ func GetVideoList(c *gin.Context) { const layout = "2006-01-02 15:04:05" tm1 := time.Unix(gvl_req.StartTime, 0).Format(layout) tm2 := time.Unix(gvl_req.EndTime, 0).Format(layout) + if gvl_req.StartTime == 0 || gvl_req.EndTime == 0 { + tm1 = "" + tm2 = "" + } videos := service.GetVideoList(int(id.(float64)), tm1, tm2, gvl_req.Hour) c.JSON(http.StatusOK, gin.H{"data": videos, "code": proto.SuccessCode, "message": "success"}) } else { From 1ce27b543d6bc61912335f1e48453859a950c8c9 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Mon, 28 Oct 2024 17:17:23 +0800 Subject: [PATCH 02/17] =?UTF-8?q?=E8=AE=BE=E5=A4=87=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E5=9B=BE=E7=89=87=E9=80=9A=E8=BF=87redis=E5=8F=91=E5=B8=83?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=A8=A1=E5=BC=8F=E5=AE=9E=E7=8E=B0=EF=BC=8C?= =?UTF-8?q?=E4=BF=AE=E6=94=B9redis=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/device.go | 82 +++++++++++++++++++++-------------------------- worker/redis.go | 74 +++++++++++++++++++++--------------------- 2 files changed, 74 insertions(+), 82 deletions(-) diff --git a/handler/device.go b/handler/device.go index e33c975..27955c2 100644 --- a/handler/device.go +++ b/handler/device.go @@ -1,6 +1,7 @@ package handler import ( + "context" "encoding/json" "fmt" "github.com/gin-gonic/gin" @@ -206,71 +207,62 @@ func Restart(ip string) bool { return false } -// 接收及发送消息 +// 发送实时视频流 func GetRealTimeImage(c *gin.Context) { id, _ := c.Get("id") id1 := int(id.(float64)) device_id := c.Query("device_id") - //字符串转int device_id_int, _ := strconv.Atoi(device_id) device := service.GetDevice(device_id_int, id1) if device.ID == 0 { c.JSON(http.StatusOK, gin.H{"code": proto.DataNotFound, "message": "device not found"}) return } - //建立连接 - // 升级HTTP连接为WebSocket连接 ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) clients[ws] = true if err != nil { - // log.Println(err) fmt.Println(err) return } defer ws.Close() - worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5) //设置播放状态 - // 接收客户端消息并发送到指定用户 + worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5) + go subscribeAndHandleMessages(ws, device_id_int) +} - go func(ws *websocket.Conn, device_id int) { - - }(ws, device_id_int) +func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { + ctx := context.Background() + pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_channel") + defer pubsub.Close() + ch := pubsub.Channel() var check_cnt int - - for { - if v := clients[ws]; v == true { - res2 := worker.PopRedisListLeft(device_id + "_frames") - var res3 []byte - var msg proto.Message - if res2 != "" { - //若有消息则发送消息 - msg.Type = "img" - msg.Msg = res2 - msg.From_user_id = id1 - res3, _ = json.Marshal(msg) - } else { - //若无消息则发送心跳包 - if check_cnt < 5 { - check_cnt++ - time.Sleep(time.Millisecond * 200) //设置延时200ms - continue - } - check_cnt = 0 - msg.Type = "check" - msg.Msg = "check" - msg.From_user_id = -1 - res3, _ = json.Marshal(msg) + for msg := range ch { + var res3 []byte + var msgObj proto.Message + if msg.Payload != "" { + msgObj.Type = "img" + msgObj.Msg = msg.Payload + msgObj.From_user_id = -1 + res3, _ = json.Marshal(msgObj) + } else { + if check_cnt < 5 { + check_cnt++ + time.Sleep(time.Millisecond * 200) + continue } - err2 := ws.WriteMessage(websocket.TextMessage, res3) - worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5) //设置播放状态 - if err2 != nil { - clientsMux.Lock() - clients[ws] = false - clientsMux.Unlock() - //设置ws关闭状态信息 - worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "0", time.Minute*5) //设置播放状态 - break - } - time.Sleep(time.Millisecond * 200) //设置延时200ms + check_cnt = 0 + msgObj.Type = "check" + msgObj.Msg = "check" + msgObj.From_user_id = -1 + res3, _ = json.Marshal(msgObj) } + err2 := ws.WriteMessage(websocket.TextMessage, res3) + if err2 != nil { + clientsMux.Lock() + clients[ws] = false + clientsMux.Unlock() + worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) + break + } + time.Sleep(time.Millisecond * 200) } } diff --git a/worker/redis.go b/worker/redis.go index e2695bd..9b032a7 100644 --- a/worker/redis.go +++ b/worker/redis.go @@ -12,19 +12,19 @@ import ( "github.com/go-redis/redis/v8" ) -var redisClient *redis.Client // Redis 客户端, 用于连接 Redis 服务器 +var RedisClient *redis.Client // Redis 客户端, 用于连接 Redis 服务器 func InitRedis() error { ctx := context.Background() if proto.Config.REDIS_User_PW == false { // 连接redis - redisClient = redis.NewClient(&redis.Options{ + RedisClient = redis.NewClient(&redis.Options{ Addr: proto.Config.REDIS_ADDR, // Redis 服务器地址 DB: proto.Config.REDIS_DB, // 使用的数据库编号 }) } else { // 连接redis - redisClient = redis.NewClient(&redis.Options{ + RedisClient = redis.NewClient(&redis.Options{ Addr: proto.Config.REDIS_ADDR, // Redis 服务器地址 Password: proto.Config.REDIS_PASSWORD, // 如果 Redis 设置了密码 DB: proto.Config.REDIS_DB, // 使用的数据库编号 @@ -32,7 +32,7 @@ func InitRedis() error { } // 验证 Redis 客户端是否可以正常工作 - _, err := redisClient.Ping(ctx).Result() + _, err := RedisClient.Ping(ctx).Result() if err != nil { fmt.Println("Error connecting to Redis: %v", err) } @@ -41,14 +41,14 @@ func InitRedis() error { func CloseRedis() { // 关闭 Redis 客户端 - if err := redisClient.Close(); err != nil { + if err := RedisClient.Close(); err != nil { fmt.Println("Error closing Redis client: %v", err) } } func IsContainKey(key string) bool { ctx := context.Background() - val, err := redisClient.Exists(ctx, key).Result() // 检查键是否存在, 如果存在则返回 1, 否则返回 0 + val, err := RedisClient.Exists(ctx, key).Result() // 检查键是否存在, 如果存在则返回 1, 否则返回 0 if err != nil { fmt.Println("Error getting key: %v", err) return false @@ -63,7 +63,7 @@ func IsContainKey(key string) bool { func SetRedis(key string, value string) bool { ctx := context.Background() // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 - err := redisClient.Set(ctx, key, value, time.Minute*30).Err() + err := RedisClient.Set(ctx, key, value, time.Minute*30).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -83,12 +83,12 @@ func SetHashWithTime(key string, id int, name, email string, duration time.Durat } // 设置哈希表的字段值, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 - err := redisClient.HSet(ctx, key, fields).Err() + err := RedisClient.HSet(ctx, key, fields).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false } - err = redisClient.Expire(ctx, key, time.Hour*10).Err() + err = RedisClient.Expire(ctx, key, time.Hour*10).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -99,12 +99,12 @@ func SetHashWithTime(key string, id int, name, email string, duration time.Durat // 设置redis hash,设置过期时间 func SetHash(key string, data map[string]interface{}) bool { ctx := context.Background() - err := redisClient.HSet(ctx, key, data).Err() + err := RedisClient.HSet(ctx, key, data).Err() if err != nil { fmt.Println("%v :Error setting hash: %v", key, err) return false } - err = redisClient.Expire(ctx, key, time.Minute*30).Err() + err = RedisClient.Expire(ctx, key, time.Minute*30).Err() if err != nil { fmt.Println("%v :Error setting expire: %v", key, err) return false @@ -114,7 +114,7 @@ func SetHash(key string, data map[string]interface{}) bool { func SetHashWithField(key string, field string, value string) bool { ctx := context.Background() - err := redisClient.HSet(ctx, key, field, value).Err() + err := RedisClient.HSet(ctx, key, field, value).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -124,7 +124,7 @@ func SetHashWithField(key string, field string, value string) bool { func GetHash(key string, field string) string { ctx := context.Background() - val, err := redisClient.HGet(ctx, key, field).Result() + val, err := RedisClient.HGet(ctx, key, field).Result() if err != nil { fmt.Println("Error getting hash: %v", err) return "" @@ -134,7 +134,7 @@ func GetHash(key string, field string) string { func GetHashAll(key string) map[string]string { ctx := context.Background() - val, err := redisClient.HGetAll(ctx, key).Result() + val, err := RedisClient.HGetAll(ctx, key).Result() if err != nil { fmt.Println("Error getting hash: %v", err) return nil @@ -146,7 +146,7 @@ func GetHashAll(key string) map[string]string { func SetRedisWithExpire(key string, value string, expire time.Duration) bool { // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 ctx := context.Background() // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 - err := redisClient.Set(ctx, key, value, expire).Err() + err := RedisClient.Set(ctx, key, value, expire).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -157,7 +157,7 @@ func SetRedisWithExpire(key string, value string, expire time.Duration) bool { / // 获取redis func GetRedis(key string) string { ctx := context.Background() - val, err := redisClient.Get(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 + val, err := RedisClient.Get(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 if err != nil { fmt.Println(key, " Error getting key: %v", err) return "" @@ -168,7 +168,7 @@ func GetRedis(key string) string { // pop redis list from right,as stack func PopRedisList(key string) string { ctx := context.Background() - val, err := redisClient.RPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 + val, err := RedisClient.RPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 if err != nil { fmt.Println(key, " Error reading from Redis: %v", err) return "" @@ -179,7 +179,7 @@ func PopRedisList(key string) string { // pop redis list from left,as queue func PopRedisListLeft(key string) string { ctx := context.Background() - val, err := redisClient.LPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 + val, err := RedisClient.LPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 if err != nil { return "" } @@ -188,7 +188,7 @@ func PopRedisListLeft(key string) string { func DelRedis(key string) { ctx := context.Background() - err := redisClient.Del(ctx, key).Err() + err := RedisClient.Del(ctx, key).Err() if err != nil { fmt.Println("Error deleting key: %v", err) } @@ -197,7 +197,7 @@ func DelRedis(key string) { // push redis list from right func PushRedisList(key string, value string) bool { ctx := context.Background() - err := redisClient.RPush(ctx, key, value).Err() + err := RedisClient.RPush(ctx, key, value).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -207,12 +207,12 @@ func PushRedisList(key string, value string) bool { func PushRedisListWithExpire(key string, value string, expire time.Duration) bool { ctx := context.Background() - err := redisClient.RPush(ctx, key, value).Err() + err := RedisClient.RPush(ctx, key, value).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false } - err = redisClient.Expire(ctx, key, expire).Err() + err = RedisClient.Expire(ctx, key, expire).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -223,7 +223,7 @@ func PushRedisListWithExpire(key string, value string, expire time.Duration) boo // delete redis key func delRedis(key string) { ctx := context.Background() - err := redisClient.Del(ctx, key).Err() + err := RedisClient.Del(ctx, key).Err() if err != nil { fmt.Println("Error setting key: %v", err) } @@ -249,7 +249,7 @@ func (u *RUser) toJSONString() string { // put hash to redis func hSetRedis(key string, field string, value string) { ctx := context.Background() - err := redisClient.HSet(ctx, key, field, value).Err() + err := RedisClient.HSet(ctx, key, field, value).Err() if err != nil { fmt.Println("Error setting key: %v", err) } @@ -258,7 +258,7 @@ func hSetRedis(key string, field string, value string) { // get hash from redis func hGetRedis(key string, field string) string { ctx := context.Background() - val, err := redisClient.HGet(ctx, key, field).Result() + val, err := RedisClient.HGet(ctx, key, field).Result() if err != nil { fmt.Println("Error getting key: %v", err) } @@ -268,12 +268,12 @@ func hGetRedis(key string, field string) string { // 设置set,有过期时间 func SetRedisSet(key string, values []string, expire time.Duration) bool { ctx := context.Background() - err := redisClient.SAdd(ctx, key, values).Err() + err := RedisClient.SAdd(ctx, key, values).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false } - err = redisClient.Expire(ctx, key, expire).Err() + err = RedisClient.Expire(ctx, key, expire).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -284,7 +284,7 @@ func SetRedisSet(key string, values []string, expire time.Duration) bool { // 设置set,添加元素 func SetRedisSetAdd(key string, value string) bool { ctx := context.Background() - err := redisClient.SAdd(ctx, key, value).Err() + err := RedisClient.SAdd(ctx, key, value).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -295,7 +295,7 @@ func SetRedisSetAdd(key string, value string) bool { // 设置set,删除元素 func SetRedisSetRemove(key string, value string) bool { ctx := context.Background() - err := redisClient.SRem(ctx, key, value).Err() + err := RedisClient.SRem(ctx, key, value).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -306,7 +306,7 @@ func SetRedisSetRemove(key string, value string) bool { // 获取两个set的交集 func GetRedisSetIntersect(key1 string, key2 string) []string { ctx := context.Background() - val, err := redisClient.SInter(ctx, key1, key2).Result() + val, err := RedisClient.SInter(ctx, key1, key2).Result() if err != nil { fmt.Println("Error getting key: %v", err) return nil @@ -317,7 +317,7 @@ func GetRedisSetIntersect(key1 string, key2 string) []string { // 查看set是否包含元素 func IsContainSet(key string, value string) bool { ctx := context.Background() - val, err := redisClient.SIsMember(ctx, key, value).Result() + val, err := RedisClient.SIsMember(ctx, key, value).Result() if err != nil { fmt.Println("Error getting key: %v", err) return false @@ -328,7 +328,7 @@ func IsContainSet(key string, value string) bool { // 查看set的所有元素 func GetRedisSetMembers(key string) []string { ctx := context.Background() - val, err := redisClient.SMembers(ctx, key).Result() + val, err := RedisClient.SMembers(ctx, key).Result() if err != nil { fmt.Println("Error getting key: %v", err) return nil @@ -339,7 +339,7 @@ func GetRedisSetMembers(key string) []string { // BITMAP func SetRedisBitmap(key string, offset int64, value int) bool { ctx := context.Background() - err := redisClient.SetBit(ctx, key, offset, value).Err() + err := RedisClient.SetBit(ctx, key, offset, value).Err() if err != nil { fmt.Println("Error setting key: %v", err) return false @@ -350,7 +350,7 @@ func SetRedisBitmap(key string, offset int64, value int) bool { // BITMAP获取 func GetRedisBitmap(key string, offset int64) int { ctx := context.Background() - val, err := redisClient.GetBit(ctx, key, offset).Result() + val, err := RedisClient.GetBit(ctx, key, offset).Result() if err != nil { fmt.Println("Error getting key: %v", err) return 0 @@ -361,11 +361,11 @@ func GetRedisBitmap(key string, offset int64) int { // 发布订阅者模式-发布消息 func Publish(channel string, message string, expire time.Duration) { ctx := context.Background() - err := redisClient.Publish(ctx, channel, message).Err() + err := RedisClient.Publish(ctx, channel, message).Err() if err != nil { fmt.Println("Error publishing message: %v", err) } - err = redisClient.Expire(ctx, channel, expire).Err() + err = RedisClient.Expire(ctx, channel, expire).Err() if err != nil { fmt.Println("Error setting key: %v", err) } @@ -374,7 +374,7 @@ func Publish(channel string, message string, expire time.Duration) { // 发布订阅者模式-订阅消息 func Subscribe(channel string) []string { ctx := context.Background() - pubsub := redisClient.Subscribe(ctx, channel) + pubsub := RedisClient.Subscribe(ctx, channel) ch := pubsub.Channel() defer pubsub.Close() var messages []string From 81e0c2a5bf40b5fa8a08906b2f9100288dc49719 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Mon, 28 Oct 2024 20:56:26 +0800 Subject: [PATCH 03/17] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E6=9F=A5=E7=9C=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/device.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/handler/device.go b/handler/device.go index 27955c2..a0030aa 100644 --- a/handler/device.go +++ b/handler/device.go @@ -221,11 +221,11 @@ func GetRealTimeImage(c *gin.Context) { ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) clients[ws] = true if err != nil { - fmt.Println(err) + fmt.Println("connect wss err:", err) return } - defer ws.Close() worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5) + fmt.Println("device_id:", device_id_int, " has set is_play to 1") go subscribeAndHandleMessages(ws, device_id_int) } @@ -255,11 +255,13 @@ func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { msgObj.From_user_id = -1 res3, _ = json.Marshal(msgObj) } + fmt.Println("send message to client length:", len(res3)) err2 := ws.WriteMessage(websocket.TextMessage, res3) if err2 != nil { clientsMux.Lock() clients[ws] = false clientsMux.Unlock() + ws.Close() worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) break } From 9a234f50fb1dc426e85a0028d30bfbbe3cbc1c9a Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Mon, 28 Oct 2024 21:01:23 +0800 Subject: [PATCH 04/17] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E6=9F=A5=E7=9C=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/device.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/handler/device.go b/handler/device.go index a0030aa..5ea231f 100644 --- a/handler/device.go +++ b/handler/device.go @@ -231,7 +231,7 @@ func GetRealTimeImage(c *gin.Context) { func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { ctx := context.Background() - pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_channel") + pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs") defer pubsub.Close() ch := pubsub.Channel() var check_cnt int From f53af2c09746c35d5a3dea0311e8e30280231bc4 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Mon, 28 Oct 2024 21:11:44 +0800 Subject: [PATCH 05/17] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E6=9F=A5=E7=9C=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/device.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/handler/device.go b/handler/device.go index 5ea231f..83424bc 100644 --- a/handler/device.go +++ b/handler/device.go @@ -233,6 +233,7 @@ func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { ctx := context.Background() pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs") defer pubsub.Close() + defer ws.Close() ch := pubsub.Channel() var check_cnt int for msg := range ch { @@ -261,10 +262,9 @@ func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { clientsMux.Lock() clients[ws] = false clientsMux.Unlock() - ws.Close() + fmt.Println("send message to client err:", err2) worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) break } - time.Sleep(time.Millisecond * 200) } } From e1bae50a0460d12aa07ee6ae157f7dedf1c9c124 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Mon, 28 Oct 2024 21:18:54 +0800 Subject: [PATCH 06/17] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E6=9F=A5=E7=9C=8Bredis=E5=8F=91=E5=B8=83=E8=AE=A2=E9=98=85?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/device.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/handler/device.go b/handler/device.go index 83424bc..0bf7d3f 100644 --- a/handler/device.go +++ b/handler/device.go @@ -256,7 +256,7 @@ func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { msgObj.From_user_id = -1 res3, _ = json.Marshal(msgObj) } - fmt.Println("send message to client length:", len(res3)) + //fmt.Println("send message to client length:", len(res3)) err2 := ws.WriteMessage(websocket.TextMessage, res3) if err2 != nil { clientsMux.Lock() From 6ba661c0f82fae6ce02bc58dbcdde3c81db87ff9 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Tue, 29 Oct 2024 18:53:08 +0800 Subject: [PATCH 07/17] =?UTF-8?q?=E7=A6=BB=E7=BA=BF=E8=81=8A=E5=A4=A9?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E4=BD=BF=E7=94=A8redis=E5=8F=91=E5=B8=83?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/im.go | 76 +++++++++++++++++++++++++++++++++++++++++++- service/imService.go | 4 +++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/handler/im.go b/handler/im.go index bc872a1..020f486 100644 --- a/handler/im.go +++ b/handler/im.go @@ -1,6 +1,7 @@ package handler import ( + "context" "crypto/rand" "encoding/hex" "encoding/json" @@ -74,7 +75,7 @@ func SetUpIMGroup(router *gin.Engine) { imGroup.POST("/get_group", GetGroups) imGroup.POST("/get_group_req_user", GetFriendRequest) imGroup.GET("/sse_msg", ServerSendMsg) - imGroup.GET("/ws_v2", ServerSendMsgV2) + imGroup.GET("/ws_v2", ServerSendMsgV3) imGroup.POST("/get_friend_list", GetFriendList) //获取好友列表,包括群聊 //获取好友请求 imGroup.POST("/get_friend_request", GetFriendRequest) @@ -607,6 +608,28 @@ func ServerSendMsgV2(c *gin.Context) { time.Sleep(time.Second * 1) } } + +func ServerSendMsgV3(c *gin.Context) { + //ws + id, _ := c.Get("id") + user_id := int(id.(float64)) + + // 升级HTTP连接为WebSocket连接 + ws, err1 := upgrader.Upgrade(c.Writer, c.Request, nil) + clients[ws] = true + if err1 != nil { + // log.Println(err) + fmt.Println(err1) + return + } + //设置用户在线状态 + worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60) + worker.SetRedisBitmap("im2_online_users", int64(user_id), 1) + worker.SetRedisSetAdd("im2_online_users_set", strconv.Itoa(user_id)) + //发送消息 + key := "user_" + strconv.Itoa(user_id) + "_msg_ids" + go subscribeAndHandleIMMessages(ws, key, user_id) +} func ServerSendMsg(c *gin.Context) { //sse c.Writer.Header().Set("Content-Type", "text/event-stream") @@ -654,3 +677,54 @@ func ServerSendMsg(c *gin.Context) { time.Sleep(time.Second * 1) } } + +func subscribeAndHandleIMMessages(ws *websocket.Conn, chanel string, user_id int) { + ctx := context.Background() + pubsub := worker.RedisClient.Subscribe(ctx, chanel) + defer pubsub.Close() + defer ws.Close() + ch := pubsub.Channel() + for m := range ch { + msg_id := m.Payload //消息id + if msg_id != "" { + msg_id_num, _ := strconv.ParseInt(msg_id, 10, 64) + msgs := dao.FindMessageByID2(uint(msg_id_num)) + if len(msgs) > 0 { + msg := msgs[0] + //发送消息 + msg_str, _ := json.Marshal(msg) + var msg_ proto.Message + msg_.Type = "msg" + msg_.Msg = string(msg_str) + msg_str2, _ := json.Marshal(msg_) + + err2 := ws.WriteMessage(websocket.TextMessage, msg_str2) + if err2 != nil { + worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600) + clientsMux.Lock() + delete(clients, ws) + clientsMux.Unlock() + break + } + } + } else { + var msg proto.Message + msg.Type = "check" + msg.Msg = "check" + msg.From_user_id = -1 + //发送心跳包 + res3, _ := json.Marshal(msg) + err2 := ws.WriteMessage(websocket.TextMessage, res3) + if err2 != nil { + worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600) + worker.SetRedisBitmap("im2_online_users", int64(user_id), 0) + worker.SetRedisSetRemove("im2_online_users_set", strconv.Itoa(user_id)) + clientsMux.Lock() + delete(clients, ws) + clientsMux.Unlock() + break + } + } + worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60) + } +} diff --git a/service/imService.go b/service/imService.go index 60185c3..b402d76 100644 --- a/service/imService.go +++ b/service/imService.go @@ -45,6 +45,8 @@ func CreateGeneralMessageService(from_id, to_id, msg_type, group_id int, content if res == "1" { //在线,存入redis worker.PushRedisListWithExpire("user_"+strconv.Itoa(to_id)+"_msg_ids", strconv.Itoa(int(id)), time.Second*300) + //发布消息 + worker.Publish("user_"+strconv.Itoa(to_id)+"_msg_ids", strconv.Itoa(int(id)), time.Second*300) } //判断接收方是否是机器人 id_str := strconv.Itoa(to_id) @@ -96,6 +98,8 @@ func CreateGeneralMessageService(from_id, to_id, msg_type, group_id int, content } //在线,存入redis worker.PushRedisListWithExpire("user_"+user_id+"_msg_ids", strconv.Itoa(int(id)), time.Second*300) + //发布消息 + worker.Publish("user_"+user_id+"_msg_ids", strconv.Itoa(int(id)), time.Second*300) } case 3: //user := dao.FindUserByID(to_id) From 3a7590ae8d6276c8dc9fb2363350f7a8a0d7c712 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Wed, 6 Nov 2024 11:59:53 +0800 Subject: [PATCH 08/17] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A4=9A=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E6=9F=A5=E7=9C=8B=E5=AE=9E=E6=97=B6=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E4=B8=8D=E6=AD=A3=E5=B8=B8=E5=85=B3=E9=97=AD=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/device.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/handler/device.go b/handler/device.go index 0bf7d3f..9918ace 100644 --- a/handler/device.go +++ b/handler/device.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "github.com/gin-gonic/gin" + "github.com/google/uuid" "github.com/gorilla/websocket" "net/http" "strconv" @@ -232,6 +233,11 @@ func GetRealTimeImage(c *gin.Context) { func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { ctx := context.Background() pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs") + //生成唯一连接uuid + con_id := uuid.New().String() + online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids" + //加入设备在线连接集合 + worker.SetRedisSetAdd(online_conn_key, con_id) defer pubsub.Close() defer ws.Close() ch := pubsub.Channel() @@ -263,8 +269,13 @@ func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { clients[ws] = false clientsMux.Unlock() fmt.Println("send message to client err:", err2) - worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) + worker.SetRedisSetRemove(online_conn_key, con_id) break } } + //查看是否还有其他连接,没有则设置is_play为0 + if worker.GetRedisSetMembers(online_conn_key) == nil { + worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) + fmt.Println("device_id:", device_id, " has set is_play to 0") + } } From e651a844f515db8be6360406ecc1f2f9cbdb5091 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Wed, 6 Nov 2024 12:04:51 +0800 Subject: [PATCH 09/17] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A4=9A=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E6=9F=A5=E7=9C=8B=E5=AE=9E=E6=97=B6=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E4=B8=8D=E6=AD=A3=E5=B8=B8=E5=85=B3=E9=97=AD=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/device.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/handler/device.go b/handler/device.go index 9918ace..565ca06 100644 --- a/handler/device.go +++ b/handler/device.go @@ -274,7 +274,7 @@ func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { } } //查看是否还有其他连接,没有则设置is_play为0 - if worker.GetRedisSetMembers(online_conn_key) == nil { + if worker.IsContainKey(online_conn_key) == false { worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) fmt.Println("device_id:", device_id, " has set is_play to 0") } From d9e622e37b4670e6dcbd28021a3492e4f33339a0 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Wed, 6 Nov 2024 15:13:06 +0800 Subject: [PATCH 10/17] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=8A=9F=E8=83=BD=EF=BC=8C=E5=8F=8A=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E4=BB=BB=E5=8A=A1=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dao/cid.go | 22 ++++++++++++++---- go.mod | 1 + go.sum | 2 ++ handler/cid.go | 59 ++++++++++++++++++++++++++++++++++++++++++++--- main.go | 16 +++++++++++++ proto/user_req.go | 6 +++++ 6 files changed, 99 insertions(+), 7 deletions(-) diff --git a/dao/cid.go b/dao/cid.go index f6872ea..6233dc6 100644 --- a/dao/cid.go +++ b/dao/cid.go @@ -10,6 +10,7 @@ type CID struct { Auth_id int `gorm:"column:auth_id"` Name string `gorm:"column:name"` Url string `gorm:"column:url"` + Time int `gorm:"column:time"` // 定时任务,单位秒,大于0表示定时任务 Script string `gorm:"column:script"` Token string `gorm:"column:token"` // 用于外部回调 } @@ -24,8 +25,8 @@ type CIDRunLog struct { } // CreateCID 创建持续集成、部署 -func CreateCID(name, url, script, token string, auth_id int) uint { - cid := CID{Name: name, Url: url, Script: script, Token: token, Auth_id: auth_id} +func CreateCID(name, url, script, token string, time, auth_id int) uint { + cid := CID{Name: name, Url: url, Script: script, Token: token, Auth_id: auth_id, Time: time} result := DB.Debug().Create(&cid) if result.Error != nil { return 0 @@ -57,7 +58,7 @@ func FindCIDByAuthID(auth_id int) []CID { } // UpdateCIDByID 更新持续集成、部署 -func UpdateCIDByID(id, auth_id int, name, url, script, token string) bool { +func UpdateCIDByID(id, auth_id, time int, name, url, script, token string) bool { pd := FindCIDByID(id, auth_id) if pd.ID == 0 { return false @@ -66,7 +67,7 @@ func UpdateCIDByID(id, auth_id int, name, url, script, token string) bool { if token == "" { token = pd.Token } - result := DB.Debug().Model(&CID{}).Where("id = ? and auth_id = ?", id, auth_id).Updates(CID{Name: name, Url: url, Script: script, Token: token}) + result := DB.Debug().Model(&CID{}).Where("id = ? and auth_id = ?", id, auth_id).Updates(CID{Name: name, Url: url, Script: script, Token: token, Time: time}) if result.Error != nil { return false } @@ -106,3 +107,16 @@ func FindCIDByIDAndToken(id int, token string) CID { DB.Debug().Where("id = ? and token = ?", id, token).First(&cid) return cid } + +func FindCIDByTime() []CID { + var cids []CID + DB.Debug().Where("time > 0").Find(&cids) + return cids +} + +// FindCIDByID 查找持续集成、部署 +func FindCIDByCID(id uint) CID { + var cid CID + DB.Debug().Where("id = ? ", id).First(&cid) + return cid +} diff --git a/go.mod b/go.mod index d311974..b324382 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/golang-jwt/jwt v3.2.2+incompatible github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 + github.com/robfig/cron/v3 v3.0.1 gorm.io/driver/mysql v1.5.6 gorm.io/driver/postgres v1.5.9 gorm.io/gorm v1.25.10 diff --git a/go.sum b/go.sum index ff6741f..40ddc6a 100644 --- a/go.sum +++ b/go.sum @@ -86,6 +86,8 @@ github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6 github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/handler/cid.go b/handler/cid.go index 2e5994c..cd1010d 100644 --- a/handler/cid.go +++ b/handler/cid.go @@ -2,6 +2,7 @@ package handler import ( "bytes" + "encoding/json" "fmt" "github.com/gin-gonic/gin" "os/exec" @@ -9,11 +10,13 @@ import ( "strings" "videoplayer/dao" "videoplayer/proto" + "videoplayer/worker" ) type CIDCreateReq struct { Name string `json:"name" form:"name"` Url string `json:"url" form:"url"` + Time int `json:"time" form:"time"` // 定时任务,单位秒,大于0表示定时任务 Script string `json:"script" form:"script"` } @@ -33,6 +36,7 @@ type CIDUpdateReq struct { ID int `json:"id" form:"id"` Name string `json:"name" form:"name"` Url string `json:"url" form:"url"` + Time int `json:"time" form:"time"` // 定时任务,单位秒,大于0表示定时任务 Script string `json:"script" form:"script"` Token string `json:"cidtoken" form:"cidtoken"` } @@ -82,8 +86,7 @@ func CreateCID(c *gin.Context) { id, _ := c.Get("id") authID := int(id.(float64)) token, _ := generateRandomHexString(32) - - res := dao.CreateCID(req.Name, req.Url, req.Script, token, authID) + res := dao.CreateCID(req.Name, req.Url, req.Script, token, req.Time, authID) if res != 0 { c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success", "data": res}) } else { @@ -118,7 +121,7 @@ func UpdateCID(c *gin.Context) { // 获取用户ID id, _ := c.Get("id") authID := int(id.(float64)) - cid := dao.UpdateCIDByID(req.ID, authID, req.Name, req.Url, req.Script, req.Token) + cid := dao.UpdateCIDByID(req.ID, authID, req.Time, req.Name, req.Url, req.Script, req.Token) if cid == false { c.JSON(200, gin.H{"error": "CID not found", "code": proto.OperationFailed, "message": "failed"}) return @@ -222,3 +225,53 @@ echo "end"` //fmt.Println("bash content:", scriptContent) dao.CreateRunLog(id, authID, scriptContent, out.String(), err3_info) //添加执行日志 } + +// 定时任务处理逻辑 +func RunCron() { + //从redis查看是否有定时任务 + //如果有定时任务,执行定时任务 + //如果没有定时任务,查找数据库是否有定时任务,如果有,则加入redis,如果没有,则不做任何操作 + key := "cron_cid_runs" + res := worker.GetRedis(key) + if res == "" { + cids := dao.FindCIDByTime() + cid_workers := make([]proto.CIDRUN, 0) + for _, v := range cids { + cid_worker := proto.CIDRUN{CID: v.ID, Curr: v.Time, Every: v.Time} + cid_workers = append(cid_workers, cid_worker) + } + if len(cid_workers) > 0 { + //将定时任务加入redis + json_data, err := json.Marshal(cid_workers) + if err != nil { + fmt.Println("json marshal failed") + } + worker.SetRedis(key, string(json_data)) + } + } else { + var cid_workers []proto.CIDRUN + cid_redis_workers := worker.GetRedis(key) + err := json.Unmarshal([]byte(cid_redis_workers), &cid_workers) + if err != nil { + fmt.Println("json unmarshal failed") + } + for _, v := range cid_workers { + //查找定时任务 + if v.Curr-10 <= 0 { + cid := dao.FindCIDByCID(v.CID) + if cid.ID != 0 { + go RunShell("cron", cid.Url, cid.Script, int(cid.ID), cid.Auth_id) + } + } else { + v.Curr = v.Curr - 10 + } + } + //将定时任务加入redis + json_data, err := json.Marshal(cid_workers) + if err != nil { + fmt.Println("json marshal failed") + } + worker.SetRedis(key, string(json_data)) + } + +} diff --git a/main.go b/main.go index 48c4966..c42348e 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,9 @@ package main import ( "github.com/gin-gonic/gin" "github.com/golang-jwt/jwt" + "github.com/robfig/cron/v3" "io" + "log" "os" "strings" "videoplayer/dao" @@ -33,6 +35,14 @@ func main() { handler.SetUpToolGroup(r) // Tool defer dao.Close() defer worker.CloseRedis() + //定时任务 + c := cron.New(cron.WithSeconds()) + // 添加每 10 秒执行一次的任务 + _, err = c.AddFunc("@every 10s", myTask) + if err != nil { + log.Fatal("添加定时任务失败: ", err) + } + c.Start() r.Run(":8083") // listen and serve on 0.0.0.0:8083 } func init() { @@ -138,3 +148,9 @@ func JWTAuthMiddleware() gin.HandlerFunc { c.Next() } } + +func myTask() { + // 定时任务 + //redis中取出数据 + handler.RunCron() +} diff --git a/proto/user_req.go b/proto/user_req.go index be15ea6..2413f7b 100644 --- a/proto/user_req.go +++ b/proto/user_req.go @@ -11,3 +11,9 @@ type UpdateUserInfoReq struct { Run bool `json:"run" form:"run"` //是否运行 Avatar string `json:"avatar" form:"avatar"` //头像 } + +type CIDRUN struct { + CID uint `json:"cid" form:"cid"` //持续集成ID,查找持续集成任务 + Curr int `json:"curr" form:"curr"` //当前剩余时间,每次执行减10s小于等于0则执行 + Every int `json:"every" form:"every"` //每隔多少秒执行一次,小于等于0表示不执行,时间粒度为10s +} From 66ec0ff9479b1a8d5cbec113e016d29acd291f39 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Wed, 6 Nov 2024 21:20:12 +0800 Subject: [PATCH 11/17] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/cid.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/handler/cid.go b/handler/cid.go index cd1010d..afd4137 100644 --- a/handler/cid.go +++ b/handler/cid.go @@ -233,6 +233,7 @@ func RunCron() { //如果没有定时任务,查找数据库是否有定时任务,如果有,则加入redis,如果没有,则不做任何操作 key := "cron_cid_runs" res := worker.GetRedis(key) + fmt.Println("cid run cron res:", res) if res == "" { cids := dao.FindCIDByTime() cid_workers := make([]proto.CIDRUN, 0) @@ -250,8 +251,7 @@ func RunCron() { } } else { var cid_workers []proto.CIDRUN - cid_redis_workers := worker.GetRedis(key) - err := json.Unmarshal([]byte(cid_redis_workers), &cid_workers) + err := json.Unmarshal([]byte(res), &cid_workers) if err != nil { fmt.Println("json unmarshal failed") } From 69f04c09a65e30f78f6d453df731241432a2f249 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Wed, 6 Nov 2024 21:25:57 +0800 Subject: [PATCH 12/17] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/cid.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/handler/cid.go b/handler/cid.go index afd4137..53e6e7d 100644 --- a/handler/cid.go +++ b/handler/cid.go @@ -255,15 +255,17 @@ func RunCron() { if err != nil { fmt.Println("json unmarshal failed") } - for _, v := range cid_workers { + fmt.Println("cid_workers:", cid_workers) + for i, v := range cid_workers { //查找定时任务 if v.Curr-10 <= 0 { cid := dao.FindCIDByCID(v.CID) if cid.ID != 0 { go RunShell("cron", cid.Url, cid.Script, int(cid.ID), cid.Auth_id) } + cid_workers[i].Curr = v.Every } else { - v.Curr = v.Curr - 10 + cid_workers[i].Curr = v.Curr - 10 } } //将定时任务加入redis From 54b67296bd51dbedfac2a81ec34ab90c5ab10c6d Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Wed, 6 Nov 2024 21:41:02 +0800 Subject: [PATCH 13/17] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A4=9A=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E6=9F=A5=E7=9C=8B=E5=AE=9E=E6=97=B6=E8=A7=86=E9=A2=91?= =?UTF-8?q?=EF=BC=8C=E8=AE=BE=E5=A4=87=E5=85=B3=E9=97=AD=E5=90=8E=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E6=9C=AA=E6=AD=A3=E5=B8=B8=E5=85=B3=E9=97=AD=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/device.go | 89 ++++++++++++++++++++++++++++------------------- worker/redis.go | 16 +++++++++ 2 files changed, 69 insertions(+), 36 deletions(-) diff --git a/handler/device.go b/handler/device.go index 565ca06..3332c9d 100644 --- a/handler/device.go +++ b/handler/device.go @@ -233,49 +233,66 @@ func GetRealTimeImage(c *gin.Context) { func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { ctx := context.Background() pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs") - //生成唯一连接uuid + // 生成唯一连接 uuid con_id := uuid.New().String() online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids" - //加入设备在线连接集合 - worker.SetRedisSetAdd(online_conn_key, con_id) + // 加入设备在线连接集合 + worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5) defer pubsub.Close() defer ws.Close() ch := pubsub.Channel() var check_cnt int - for msg := range ch { - var res3 []byte - var msgObj proto.Message - if msg.Payload != "" { - msgObj.Type = "img" - msgObj.Msg = msg.Payload - msgObj.From_user_id = -1 - res3, _ = json.Marshal(msgObj) - } else { - if check_cnt < 5 { - check_cnt++ - time.Sleep(time.Millisecond * 200) - continue + for { + select { + case msg, ok := <-ch: + if !ok { + // 通道关闭或没有消息,每秒尝试检测连接 + for { + err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)) + if err != nil { + clientsMux.Lock() + clients[ws] = false + clientsMux.Unlock() + // 查看是否还有其他连接,没有则设置 is_play 为 0 + if worker.IsContainKey(online_conn_key) == false { + worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) + fmt.Println("device_id:", device_id, " has set is_play to 0") + } + return + } + time.Sleep(time.Second) + } + } + var res3 []byte + var msgObj proto.Message + if msg.Payload != "" { + msgObj.Type = "img" + msgObj.Msg = msg.Payload + msgObj.From_user_id = -1 + res3, _ = json.Marshal(msgObj) + } else { + if check_cnt < 5 { + check_cnt++ + time.Sleep(time.Millisecond * 200) + continue + } + check_cnt = 0 + msgObj.Type = "check" + msgObj.Msg = "check" + msgObj.From_user_id = -1 + res3, _ = json.Marshal(msgObj) + } + // fmt.Println("send message to client length:", len(res3)) + err2 := ws.WriteMessage(websocket.TextMessage, res3) + if err2 != nil { + clientsMux.Lock() + clients[ws] = false + clientsMux.Unlock() + fmt.Println("send message to client err:", err2) + worker.SetRedisSetRemove(online_conn_key, con_id) + return } - check_cnt = 0 - msgObj.Type = "check" - msgObj.Msg = "check" - msgObj.From_user_id = -1 - res3, _ = json.Marshal(msgObj) - } - //fmt.Println("send message to client length:", len(res3)) - err2 := ws.WriteMessage(websocket.TextMessage, res3) - if err2 != nil { - clientsMux.Lock() - clients[ws] = false - clientsMux.Unlock() - fmt.Println("send message to client err:", err2) - worker.SetRedisSetRemove(online_conn_key, con_id) - break } } - //查看是否还有其他连接,没有则设置is_play为0 - if worker.IsContainKey(online_conn_key) == false { - worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) - fmt.Println("device_id:", device_id, " has set is_play to 0") - } + } diff --git a/worker/redis.go b/worker/redis.go index 9b032a7..a9957bb 100644 --- a/worker/redis.go +++ b/worker/redis.go @@ -292,6 +292,22 @@ func SetRedisSetAdd(key string, value string) bool { return true } +// 设置set,添加元素 +func SetRedisSetAddWithExpire(key string, value string, expire time.Duration) bool { + ctx := context.Background() + err := RedisClient.SAdd(ctx, key, value).Err() + if err != nil { + fmt.Println("Error setting key: %v", err) + return false + } + err = RedisClient.Expire(ctx, key, expire).Err() + if err != nil { + fmt.Println("Error setting key: %v", err) + return false + } + return true +} + // 设置set,删除元素 func SetRedisSetRemove(key string, value string) bool { ctx := context.Background() From 65ac4802f566cc0932d75d6c3f6f2f6ec4fca6bb Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Wed, 6 Nov 2024 21:52:57 +0800 Subject: [PATCH 14/17] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A4=9A=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E6=9F=A5=E7=9C=8B=E5=AE=9E=E6=97=B6=E8=A7=86=E9=A2=91?= =?UTF-8?q?=EF=BC=8C=E8=AE=BE=E5=A4=87=E5=85=B3=E9=97=AD=E5=90=8E=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E6=9C=AA=E6=AD=A3=E5=B8=B8=E5=85=B3=E9=97=AD=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/device.go | 45 +++++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/handler/device.go b/handler/device.go index 3332c9d..a1b1d28 100644 --- a/handler/device.go +++ b/handler/device.go @@ -242,27 +242,10 @@ func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { defer ws.Close() ch := pubsub.Channel() var check_cnt int + var ticker *time.Ticker for { select { - case msg, ok := <-ch: - if !ok { - // 通道关闭或没有消息,每秒尝试检测连接 - for { - err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)) - if err != nil { - clientsMux.Lock() - clients[ws] = false - clientsMux.Unlock() - // 查看是否还有其他连接,没有则设置 is_play 为 0 - if worker.IsContainKey(online_conn_key) == false { - worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) - fmt.Println("device_id:", device_id, " has set is_play to 0") - } - return - } - time.Sleep(time.Second) - } - } + case msg, _ := <-ch: var res3 []byte var msgObj proto.Message if msg.Payload != "" { @@ -290,9 +273,31 @@ func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) { clientsMux.Unlock() fmt.Println("send message to client err:", err2) worker.SetRedisSetRemove(online_conn_key, con_id) - return + goto end + } + default: + if ticker == nil { + ticker = time.NewTicker(time.Second) + } + select { + case <-ticker.C: + err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)) + if err != nil { + fmt.Println("Connection check failed:", err) + worker.SetRedisSetRemove(online_conn_key, con_id) + goto end + } + default: + continue } } } +end: + // 查看是否还有其他连接,没有则设置 is_play 为 0 + if worker.IsContainKey(online_conn_key) == false { + worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5) + fmt.Println("device_id:", device_id, " has set is_play to 0") + } + } From eda29d09a4091cff736670b1ab7068da1c45c403 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Wed, 6 Nov 2024 21:56:28 +0800 Subject: [PATCH 15/17] =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=97=A5=E5=BF=97=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/cid.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/handler/cid.go b/handler/cid.go index 53e6e7d..d052b7f 100644 --- a/handler/cid.go +++ b/handler/cid.go @@ -233,7 +233,7 @@ func RunCron() { //如果没有定时任务,查找数据库是否有定时任务,如果有,则加入redis,如果没有,则不做任何操作 key := "cron_cid_runs" res := worker.GetRedis(key) - fmt.Println("cid run cron res:", res) + //fmt.Println("cid run cron res:", res) if res == "" { cids := dao.FindCIDByTime() cid_workers := make([]proto.CIDRUN, 0) @@ -255,7 +255,7 @@ func RunCron() { if err != nil { fmt.Println("json unmarshal failed") } - fmt.Println("cid_workers:", cid_workers) + //fmt.Println("cid_workers:", cid_workers) for i, v := range cid_workers { //查找定时任务 if v.Curr-10 <= 0 { From 32b108c3143ac5704da6b0ef559d58827e238cc0 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Sat, 23 Nov 2024 17:46:13 +0800 Subject: [PATCH 16/17] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1redis?= =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dao/cid.go | 4 +- handler/cid.go | 120 +++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 108 insertions(+), 16 deletions(-) diff --git a/dao/cid.go b/dao/cid.go index 6233dc6..4425e07 100644 --- a/dao/cid.go +++ b/dao/cid.go @@ -93,12 +93,12 @@ func FindRunLogByAuthID(auth_id int) []CIDRunLog { func FindRunLogByID(auth_id, cid_id int) []CIDRunLog { var cidRunLog []CIDRunLog - DB.Debug().Where("cid_id = ? and auth_id = ?", cid_id, auth_id).Order("created_at desc").Find(&cidRunLog) + DB.Debug().Where("cid_id = ? and auth_id = ?", cid_id, auth_id).Order("created_at desc").Find(&cidRunLog).Limit(30) return cidRunLog } func FindRunLogByCIDLogID(id, auth_id int) []CIDRunLog { var cidRunLogs []CIDRunLog - DB.Debug().Where("id = ? and auth_id = ?", id, auth_id).Order("created_at desc").Find(&cidRunLogs) + DB.Debug().Where("id = ? and auth_id = ?", id, auth_id).Order("created_at desc").Find(&cidRunLogs).Limit(30) return cidRunLogs } diff --git a/handler/cid.go b/handler/cid.go index d052b7f..ed5469f 100644 --- a/handler/cid.go +++ b/handler/cid.go @@ -41,6 +41,9 @@ type CIDUpdateReq struct { Token string `json:"cidtoken" form:"cidtoken"` } +// 全局变量,记录是否进行cron定时任务的刷新 +var cron_count int + func SetUpCIDGroup(router *gin.Engine) { cidGroup := router.Group("/cid") //持续集成、部署 cidGroup.POST("/create", CreateCID) @@ -88,6 +91,9 @@ func CreateCID(c *gin.Context) { token, _ := generateRandomHexString(32) res := dao.CreateCID(req.Name, req.Url, req.Script, token, req.Time, authID) if res != 0 { + if req.Time > 0 { + updateCronRedisTime(int(res), req.Time) + } c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success", "data": res}) } else { c.JSON(200, gin.H{"error": "CreateCID failed", "code": proto.OperationFailed, "message": "failed"}) @@ -126,6 +132,9 @@ func UpdateCID(c *gin.Context) { c.JSON(200, gin.H{"error": "CID not found", "code": proto.OperationFailed, "message": "failed"}) return } else { + if req.Time > 0 { + updateCronRedisTime(req.ID, req.Time) + } c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success", "data": "success"}) } } else { @@ -228,6 +237,11 @@ echo "end"` // 定时任务处理逻辑 func RunCron() { + cron_count++ + if cron_count > 6 { + updateCronFromDBToRedis() + cron_count = 0 + } //从redis查看是否有定时任务 //如果有定时任务,执行定时任务 //如果没有定时任务,查找数据库是否有定时任务,如果有,则加入redis,如果没有,则不做任何操作 @@ -235,20 +249,7 @@ func RunCron() { res := worker.GetRedis(key) //fmt.Println("cid run cron res:", res) if res == "" { - cids := dao.FindCIDByTime() - cid_workers := make([]proto.CIDRUN, 0) - for _, v := range cids { - cid_worker := proto.CIDRUN{CID: v.ID, Curr: v.Time, Every: v.Time} - cid_workers = append(cid_workers, cid_worker) - } - if len(cid_workers) > 0 { - //将定时任务加入redis - json_data, err := json.Marshal(cid_workers) - if err != nil { - fmt.Println("json marshal failed") - } - worker.SetRedis(key, string(json_data)) - } + readCronFromDBToRedis(key) } else { var cid_workers []proto.CIDRUN err := json.Unmarshal([]byte(res), &cid_workers) @@ -277,3 +278,94 @@ func RunCron() { } } + +// 将数据库中的定时任务加入redis +func readCronFromDBToRedis(key string) { + cids := dao.FindCIDByTime() + cid_workers := make([]proto.CIDRUN, 0) + for _, v := range cids { + cid_worker := proto.CIDRUN{CID: v.ID, Curr: v.Time, Every: v.Time} + cid_workers = append(cid_workers, cid_worker) + } + if len(cid_workers) > 0 { + //将定时任务加入redis + json_data, err := json.Marshal(cid_workers) + if err != nil { + fmt.Println("json marshal failed") + } + worker.SetRedis(key, string(json_data)) + } +} + +// 从数据库更新定时任务到redis +func updateCronFromDBToRedis() { + key := "cron_cid_runs" + cids := dao.FindCIDByTime() + cid_maps := make(map[uint]int) + + //将数据库中的定时任务加入map,key为cid,value为时间,便于后续查找 + for _, v := range cids { + cid_maps[v.ID] = v.Time + } + res := worker.GetRedis(key) + if res == "" { + readCronFromDBToRedis(key) + return + } + var cid_workers_redis []proto.CIDRUN + err := json.Unmarshal([]byte(res), &cid_workers_redis) + if err != nil { + fmt.Println("json unmarshal failed") + return + } + for i, v := range cid_workers_redis { + if time, ok := cid_maps[v.CID]; ok { + if v.Every != time { + cid_workers_redis[i].Every = time + cid_workers_redis[i].Curr = time + } + } + } + //将定时任务加入redis + json_data, err := json.Marshal(cid_workers_redis) + if err != nil { + fmt.Println("json marshal failed") + return + } + worker.SetRedis(key, string(json_data)) +} + +// 查看指定定时任务是否存在,如果存在则更新时间,如果不存在则加入 +func updateCronRedisTime(id int, time int) { + key := "cron_cid_runs" + res := worker.GetRedis(key) + if res == "" { + readCronFromDBToRedis(key) + return + } else { + var cid_workers []proto.CIDRUN + err := json.Unmarshal([]byte(res), &cid_workers) + if err != nil { + fmt.Println("json unmarshal failed") + } + isContain := false + for i, v := range cid_workers { + if v.CID == uint(id) { + //更新时间,不会继续原来的时间 + cid_workers[i].Curr = time + cid_workers[i].Every = time + isContain = true + } + } + if isContain == false { + cid_worker := proto.CIDRUN{CID: uint(id), Curr: time, Every: time} + cid_workers = append(cid_workers, cid_worker) + } + //将定时任务加入redis + json_data, err := json.Marshal(cid_workers) + if err != nil { + fmt.Println("json marshal failed") + } + worker.SetRedis(key, string(json_data)) + } +} From d302535608b20b661112f8c7bb801ecc4f40e744 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Sat, 23 Nov 2024 18:14:35 +0800 Subject: [PATCH 17/17] =?UTF-8?q?cid=20log=E6=9F=A5=E6=89=BE=E5=8F=AA?= =?UTF-8?q?=E6=9F=A5=E6=9C=80=E8=BF=9130=E6=9D=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dao/cid.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dao/cid.go b/dao/cid.go index 4425e07..c0b3d46 100644 --- a/dao/cid.go +++ b/dao/cid.go @@ -93,12 +93,12 @@ func FindRunLogByAuthID(auth_id int) []CIDRunLog { func FindRunLogByID(auth_id, cid_id int) []CIDRunLog { var cidRunLog []CIDRunLog - DB.Debug().Where("cid_id = ? and auth_id = ?", cid_id, auth_id).Order("created_at desc").Find(&cidRunLog).Limit(30) + DB.Debug().Where("cid_id = ? and auth_id = ?", cid_id, auth_id).Order("created_at desc").Limit(30).Find(&cidRunLog) return cidRunLog } func FindRunLogByCIDLogID(id, auth_id int) []CIDRunLog { var cidRunLogs []CIDRunLog - DB.Debug().Where("id = ? and auth_id = ?", id, auth_id).Order("created_at desc").Find(&cidRunLogs).Limit(30) + DB.Debug().Where("id = ? and auth_id = ?", id, auth_id).Order("created_at desc").Limit(30).Find(&cidRunLogs) return cidRunLogs }