Compare commits

..

25 Commits

Author SHA1 Message Date
junleea d6b1788ae5 Merge branch 'refs/heads/master' into release
# Conflicts:
#	dao/cid.go
2024-11-24 14:56:22 +08:00
junleea 4bc84531d4 Merge branch 'refs/heads/feat-cid' 2024-11-23 18:14:41 +08:00
junleea d302535608 cid log查找只查最近30条 2024-11-23 18:14:35 +08:00
junleea cf7d5d7992 Merge branch 'refs/heads/feat-cid' 2024-11-23 17:46:49 +08:00
junleea 32b108c314 定时任务redis更新 2024-11-23 17:46:13 +08:00
junleea 021c475720 Merge branch 'refs/heads/feat-cid' 2024-11-06 21:56:35 +08:00
junleea eda29d09a4 删除定时任务日志输出 2024-11-06 21:56:28 +08:00
junleea 73e3db98db Merge branch 'refs/heads/feat-realvp' 2024-11-06 21:53:03 +08:00
junleea 65ac4802f5 修复多用户查看实时视频,设备关闭后连接未正常关闭问题 2024-11-06 21:52:57 +08:00
junleea 542935cb8f Merge branch 'refs/heads/feat-realvp' 2024-11-06 21:41:13 +08:00
junleea 54b67296bd 修复多用户查看实时视频,设备关闭后连接未正常关闭问题 2024-11-06 21:41:02 +08:00
junleea 69f04c09a6 修复定时任务功能 2024-11-06 21:25:57 +08:00
junleea 66ec0ff947 修复定时任务功能 2024-11-06 21:20:12 +08:00
junleea d9e622e37b 添加定时任务功能,及定时任务处理 2024-11-06 15:13:06 +08:00
junleea 9f21e7c1d5 Merge branch 'refs/heads/feat-realvp' 2024-11-06 12:05:01 +08:00
junleea e651a844f5 修复多连接查看实时视频不正常关闭问题 2024-11-06 12:04:51 +08:00
junleea fa554de402 Merge branch 'refs/heads/feat-realvp' 2024-11-06 12:00:10 +08:00
junleea 3a7590ae8d 修复多连接查看实时视频不正常关闭问题 2024-11-06 11:59:53 +08:00
junleea 6ba661c0f8 离线聊天消息使用redis发布订阅模式 2024-10-29 18:53:08 +08:00
junleea e1bae50a04 修复实时查看redis发布订阅模式 2024-10-28 21:18:54 +08:00
junleea f53af2c097 修复实时查看 2024-10-28 21:11:44 +08:00
junleea 9a234f50fb 修复实时查看 2024-10-28 21:01:23 +08:00
junleea 81e0c2a5bf 修复实时查看 2024-10-28 20:56:26 +08:00
junleea 1ce27b543d 设备实时图片通过redis发布订阅模式实现,修改redis参数 2024-10-28 17:17:23 +08:00
junleea 7770efd21a 修复视频查询初未设置情况 2024-10-25 18:45:04 +08:00
11 changed files with 391 additions and 80 deletions

View File

@ -10,6 +10,7 @@ type CID struct {
Auth_id int `gorm:"column:auth_id"` Auth_id int `gorm:"column:auth_id"`
Name string `gorm:"column:name"` Name string `gorm:"column:name"`
Url string `gorm:"column:url"` Url string `gorm:"column:url"`
Time int `gorm:"column:time"` // 定时任务单位秒大于0表示定时任务
Script string `gorm:"column:script"` Script string `gorm:"column:script"`
Token string `gorm:"column:token"` // 用于外部回调 Token string `gorm:"column:token"` // 用于外部回调
} }
@ -24,8 +25,8 @@ type CIDRunLog struct {
} }
// CreateCID 创建持续集成、部署 // CreateCID 创建持续集成、部署
func CreateCID(name, url, script, token string, auth_id int) uint { func CreateCID(name, url, script, token string, time, auth_id int) uint {
cid := CID{Name: name, Url: url, Script: script, Token: token, Auth_id: auth_id} cid := CID{Name: name, Url: url, Script: script, Token: token, Auth_id: auth_id, Time: time}
result := DB.Create(&cid) result := DB.Create(&cid)
if result.Error != nil { if result.Error != nil {
return 0 return 0
@ -57,7 +58,7 @@ func FindCIDByAuthID(auth_id int) []CID {
} }
// UpdateCIDByID 更新持续集成、部署 // UpdateCIDByID 更新持续集成、部署
func UpdateCIDByID(id, auth_id int, name, url, script, token string) bool { func UpdateCIDByID(id, auth_id, time int, name, url, script, token string) bool {
pd := FindCIDByID(id, auth_id) pd := FindCIDByID(id, auth_id)
if pd.ID == 0 { if pd.ID == 0 {
return false return false
@ -66,7 +67,7 @@ func UpdateCIDByID(id, auth_id int, name, url, script, token string) bool {
if token == "" { if token == "" {
token = pd.Token token = pd.Token
} }
result := DB.Model(&CID{}).Where("id = ? and auth_id = ?", id, auth_id).Updates(CID{Name: name, Url: url, Script: script, Token: token}) result := DB.Model(&CID{}).Where("id = ? and auth_id = ?", id, auth_id).Updates(CID{Name: name, Url: url, Script: script, Token: token, Time: time})
if result.Error != nil { if result.Error != nil {
return false return false
} }
@ -92,12 +93,12 @@ func FindRunLogByAuthID(auth_id int) []CIDRunLog {
func FindRunLogByID(auth_id, cid_id int) []CIDRunLog { func FindRunLogByID(auth_id, cid_id int) []CIDRunLog {
var cidRunLog []CIDRunLog var cidRunLog []CIDRunLog
DB.Where("cid_id = ? and auth_id = ?", cid_id, auth_id).Order("created_at desc").Find(&cidRunLog) DB.Where("cid_id = ? and auth_id = ?", cid_id, auth_id).Order("created_at desc").Limit(30).Find(&cidRunLog)
return cidRunLog return cidRunLog
} }
func FindRunLogByCIDLogID(id, auth_id int) []CIDRunLog { func FindRunLogByCIDLogID(id, auth_id int) []CIDRunLog {
var cidRunLogs []CIDRunLog var cidRunLogs []CIDRunLog
DB.Where("id = ? and auth_id = ?", id, auth_id).Order("created_at desc").Find(&cidRunLogs) DB.Where("id = ? and auth_id = ?", id, auth_id).Order("created_at desc").Limit(30).Find(&cidRunLogs)
return cidRunLogs return cidRunLogs
} }
@ -106,3 +107,16 @@ func FindCIDByIDAndToken(id int, token string) CID {
DB.Where("id = ? and token = ?", id, token).First(&cid) DB.Where("id = ? and token = ?", id, token).First(&cid)
return cid return cid
} }
func FindCIDByTime() []CID {
var cids []CID
DB.Where("time > 0").Find(&cids)
return cids
}
// FindCIDByID 查找持续集成、部署
func FindCIDByCID(id uint) CID {
var cid CID
DB.Debug().Where("id = ? ", id).First(&cid)
return cid
}

1
go.mod
View File

@ -8,6 +8,7 @@ require (
github.com/golang-jwt/jwt v3.2.2+incompatible github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3 github.com/gorilla/websocket v1.5.3
github.com/robfig/cron/v3 v3.0.1
gorm.io/driver/mysql v1.5.6 gorm.io/driver/mysql v1.5.6
gorm.io/driver/postgres v1.5.9 gorm.io/driver/postgres v1.5.9
gorm.io/gorm v1.25.10 gorm.io/gorm v1.25.10

2
go.sum
View File

@ -86,6 +86,8 @@ github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

View File

@ -2,6 +2,7 @@ package handler
import ( import (
"bytes" "bytes"
"encoding/json"
"fmt" "fmt"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"os/exec" "os/exec"
@ -9,11 +10,13 @@ import (
"strings" "strings"
"videoplayer/dao" "videoplayer/dao"
"videoplayer/proto" "videoplayer/proto"
"videoplayer/worker"
) )
type CIDCreateReq struct { type CIDCreateReq struct {
Name string `json:"name" form:"name"` Name string `json:"name" form:"name"`
Url string `json:"url" form:"url"` Url string `json:"url" form:"url"`
Time int `json:"time" form:"time"` // 定时任务单位秒大于0表示定时任务
Script string `json:"script" form:"script"` Script string `json:"script" form:"script"`
} }
@ -33,10 +36,14 @@ type CIDUpdateReq struct {
ID int `json:"id" form:"id"` ID int `json:"id" form:"id"`
Name string `json:"name" form:"name"` Name string `json:"name" form:"name"`
Url string `json:"url" form:"url"` Url string `json:"url" form:"url"`
Time int `json:"time" form:"time"` // 定时任务单位秒大于0表示定时任务
Script string `json:"script" form:"script"` Script string `json:"script" form:"script"`
Token string `json:"cidtoken" form:"cidtoken"` Token string `json:"cidtoken" form:"cidtoken"`
} }
// 全局变量记录是否进行cron定时任务的刷新
var cron_count int
func SetUpCIDGroup(router *gin.Engine) { func SetUpCIDGroup(router *gin.Engine) {
cidGroup := router.Group("/cid") //持续集成、部署 cidGroup := router.Group("/cid") //持续集成、部署
cidGroup.POST("/create", CreateCID) cidGroup.POST("/create", CreateCID)
@ -82,9 +89,11 @@ func CreateCID(c *gin.Context) {
id, _ := c.Get("id") id, _ := c.Get("id")
authID := int(id.(float64)) authID := int(id.(float64))
token, _ := generateRandomHexString(32) token, _ := generateRandomHexString(32)
res := dao.CreateCID(req.Name, req.Url, req.Script, token, req.Time, authID)
res := dao.CreateCID(req.Name, req.Url, req.Script, token, authID)
if res != 0 { if res != 0 {
if req.Time > 0 {
updateCronRedisTime(int(res), req.Time)
}
c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success", "data": res}) c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success", "data": res})
} else { } else {
c.JSON(200, gin.H{"error": "CreateCID failed", "code": proto.OperationFailed, "message": "failed"}) c.JSON(200, gin.H{"error": "CreateCID failed", "code": proto.OperationFailed, "message": "failed"})
@ -118,11 +127,14 @@ func UpdateCID(c *gin.Context) {
// 获取用户ID // 获取用户ID
id, _ := c.Get("id") id, _ := c.Get("id")
authID := int(id.(float64)) authID := int(id.(float64))
cid := dao.UpdateCIDByID(req.ID, authID, req.Name, req.Url, req.Script, req.Token) cid := dao.UpdateCIDByID(req.ID, authID, req.Time, req.Name, req.Url, req.Script, req.Token)
if cid == false { if cid == false {
c.JSON(200, gin.H{"error": "CID not found", "code": proto.OperationFailed, "message": "failed"}) c.JSON(200, gin.H{"error": "CID not found", "code": proto.OperationFailed, "message": "failed"})
return return
} else { } else {
if req.Time > 0 {
updateCronRedisTime(req.ID, req.Time)
}
c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success", "data": "success"}) c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success", "data": "success"})
} }
} else { } else {
@ -222,3 +234,138 @@ echo "end"`
//fmt.Println("bash content:", scriptContent) //fmt.Println("bash content:", scriptContent)
dao.CreateRunLog(id, authID, scriptContent, out.String(), err3_info) //添加执行日志 dao.CreateRunLog(id, authID, scriptContent, out.String(), err3_info) //添加执行日志
} }
// 定时任务处理逻辑
func RunCron() {
cron_count++
if cron_count > 6 {
updateCronFromDBToRedis()
cron_count = 0
}
//从redis查看是否有定时任务
//如果有定时任务,执行定时任务
//如果没有定时任务查找数据库是否有定时任务如果有则加入redis如果没有则不做任何操作
key := "cron_cid_runs"
res := worker.GetRedis(key)
//fmt.Println("cid run cron res:", res)
if res == "" {
readCronFromDBToRedis(key)
} else {
var cid_workers []proto.CIDRUN
err := json.Unmarshal([]byte(res), &cid_workers)
if err != nil {
fmt.Println("json unmarshal failed")
}
//fmt.Println("cid_workers:", cid_workers)
for i, v := range cid_workers {
//查找定时任务
if v.Curr-10 <= 0 {
cid := dao.FindCIDByCID(v.CID)
if cid.ID != 0 {
go RunShell("cron", cid.Url, cid.Script, int(cid.ID), cid.Auth_id)
}
cid_workers[i].Curr = v.Every
} else {
cid_workers[i].Curr = v.Curr - 10
}
}
//将定时任务加入redis
json_data, err := json.Marshal(cid_workers)
if err != nil {
fmt.Println("json marshal failed")
}
worker.SetRedis(key, string(json_data))
}
}
// 将数据库中的定时任务加入redis
func readCronFromDBToRedis(key string) {
cids := dao.FindCIDByTime()
cid_workers := make([]proto.CIDRUN, 0)
for _, v := range cids {
cid_worker := proto.CIDRUN{CID: v.ID, Curr: v.Time, Every: v.Time}
cid_workers = append(cid_workers, cid_worker)
}
if len(cid_workers) > 0 {
//将定时任务加入redis
json_data, err := json.Marshal(cid_workers)
if err != nil {
fmt.Println("json marshal failed")
}
worker.SetRedis(key, string(json_data))
}
}
// 从数据库更新定时任务到redis
func updateCronFromDBToRedis() {
key := "cron_cid_runs"
cids := dao.FindCIDByTime()
cid_maps := make(map[uint]int)
//将数据库中的定时任务加入mapkey为cidvalue为时间便于后续查找
for _, v := range cids {
cid_maps[v.ID] = v.Time
}
res := worker.GetRedis(key)
if res == "" {
readCronFromDBToRedis(key)
return
}
var cid_workers_redis []proto.CIDRUN
err := json.Unmarshal([]byte(res), &cid_workers_redis)
if err != nil {
fmt.Println("json unmarshal failed")
return
}
for i, v := range cid_workers_redis {
if time, ok := cid_maps[v.CID]; ok {
if v.Every != time {
cid_workers_redis[i].Every = time
cid_workers_redis[i].Curr = time
}
}
}
//将定时任务加入redis
json_data, err := json.Marshal(cid_workers_redis)
if err != nil {
fmt.Println("json marshal failed")
return
}
worker.SetRedis(key, string(json_data))
}
// 查看指定定时任务是否存在,如果存在则更新时间,如果不存在则加入
func updateCronRedisTime(id int, time int) {
key := "cron_cid_runs"
res := worker.GetRedis(key)
if res == "" {
readCronFromDBToRedis(key)
return
} else {
var cid_workers []proto.CIDRUN
err := json.Unmarshal([]byte(res), &cid_workers)
if err != nil {
fmt.Println("json unmarshal failed")
}
isContain := false
for i, v := range cid_workers {
if v.CID == uint(id) {
//更新时间,不会继续原来的时间
cid_workers[i].Curr = time
cid_workers[i].Every = time
isContain = true
}
}
if isContain == false {
cid_worker := proto.CIDRUN{CID: uint(id), Curr: time, Every: time}
cid_workers = append(cid_workers, cid_worker)
}
//将定时任务加入redis
json_data, err := json.Marshal(cid_workers)
if err != nil {
fmt.Println("json marshal failed")
}
worker.SetRedis(key, string(json_data))
}
}

View File

@ -1,9 +1,11 @@
package handler package handler
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"net/http" "net/http"
"strconv" "strconv"
@ -206,71 +208,96 @@ func Restart(ip string) bool {
return false return false
} }
// 接收及发送消息 // 发送实时视频流
func GetRealTimeImage(c *gin.Context) { func GetRealTimeImage(c *gin.Context) {
id, _ := c.Get("id") id, _ := c.Get("id")
id1 := int(id.(float64)) id1 := int(id.(float64))
device_id := c.Query("device_id") device_id := c.Query("device_id")
//字符串转int
device_id_int, _ := strconv.Atoi(device_id) device_id_int, _ := strconv.Atoi(device_id)
device := service.GetDevice(device_id_int, id1) device := service.GetDevice(device_id_int, id1)
if device.ID == 0 { if device.ID == 0 {
c.JSON(http.StatusOK, gin.H{"code": proto.DataNotFound, "message": "device not found"}) c.JSON(http.StatusOK, gin.H{"code": proto.DataNotFound, "message": "device not found"})
return return
} }
//建立连接
// 升级HTTP连接为WebSocket连接
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
clients[ws] = true clients[ws] = true
if err != nil { if err != nil {
// log.Println(err) fmt.Println("connect wss err:", err)
fmt.Println(err)
return return
} }
worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5)
fmt.Println("device_id:", device_id_int, " has set is_play to 1")
go subscribeAndHandleMessages(ws, device_id_int)
}
func subscribeAndHandleMessages(ws *websocket.Conn, device_id int) {
ctx := context.Background()
pubsub := worker.RedisClient.Subscribe(ctx, strconv.Itoa(device_id)+"_frames_msgs")
// 生成唯一连接 uuid
con_id := uuid.New().String()
online_conn_key := "device_" + strconv.Itoa(device_id) + "_online_conn_ids"
// 加入设备在线连接集合
worker.SetRedisSetAddWithExpire(online_conn_key, con_id, time.Minute*5)
defer pubsub.Close()
defer ws.Close() defer ws.Close()
worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5) //设置播放状态 ch := pubsub.Channel()
// 接收客户端消息并发送到指定用户
go func(ws *websocket.Conn, device_id int) {
}(ws, device_id_int)
var check_cnt int var check_cnt int
var ticker *time.Ticker
for { for {
if v := clients[ws]; v == true { select {
res2 := worker.PopRedisListLeft(device_id + "_frames") case msg, _ := <-ch:
var res3 []byte var res3 []byte
var msg proto.Message var msgObj proto.Message
if res2 != "" { if msg.Payload != "" {
//若有消息则发送消息 msgObj.Type = "img"
msg.Type = "img" msgObj.Msg = msg.Payload
msg.Msg = res2 msgObj.From_user_id = -1
msg.From_user_id = id1 res3, _ = json.Marshal(msgObj)
res3, _ = json.Marshal(msg)
} else { } else {
//若无消息则发送心跳包
if check_cnt < 5 { if check_cnt < 5 {
check_cnt++ check_cnt++
time.Sleep(time.Millisecond * 200) //设置延时200ms time.Sleep(time.Millisecond * 200)
continue continue
} }
check_cnt = 0 check_cnt = 0
msg.Type = "check" msgObj.Type = "check"
msg.Msg = "check" msgObj.Msg = "check"
msg.From_user_id = -1 msgObj.From_user_id = -1
res3, _ = json.Marshal(msg) res3, _ = json.Marshal(msgObj)
} }
// fmt.Println("send message to client length:", len(res3))
err2 := ws.WriteMessage(websocket.TextMessage, res3) err2 := ws.WriteMessage(websocket.TextMessage, res3)
worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "1", time.Minute*5) //设置播放状态
if err2 != nil { if err2 != nil {
clientsMux.Lock() clientsMux.Lock()
clients[ws] = false clients[ws] = false
clientsMux.Unlock() clientsMux.Unlock()
//设置ws关闭状态信息 fmt.Println("send message to client err:", err2)
worker.SetRedisWithExpire(strconv.Itoa(int(device.ID))+"_is_play", "0", time.Minute*5) //设置播放状态 worker.SetRedisSetRemove(online_conn_key, con_id)
break goto end
}
default:
if ticker == nil {
ticker = time.NewTicker(time.Second)
}
select {
case <-ticker.C:
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second))
if err != nil {
fmt.Println("Connection check failed:", err)
worker.SetRedisSetRemove(online_conn_key, con_id)
goto end
}
default:
continue
} }
time.Sleep(time.Millisecond * 200) //设置延时200ms
} }
} }
end:
// 查看是否还有其他连接,没有则设置 is_play 为 0
if worker.IsContainKey(online_conn_key) == false {
worker.SetRedisWithExpire(strconv.Itoa(device_id)+"_is_play", "0", time.Minute*5)
fmt.Println("device_id:", device_id, " has set is_play to 0")
}
} }

View File

@ -1,6 +1,7 @@
package handler package handler
import ( import (
"context"
"crypto/rand" "crypto/rand"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
@ -74,7 +75,7 @@ func SetUpIMGroup(router *gin.Engine) {
imGroup.POST("/get_group", GetGroups) imGroup.POST("/get_group", GetGroups)
imGroup.POST("/get_group_req_user", GetFriendRequest) imGroup.POST("/get_group_req_user", GetFriendRequest)
imGroup.GET("/sse_msg", ServerSendMsg) imGroup.GET("/sse_msg", ServerSendMsg)
imGroup.GET("/ws_v2", ServerSendMsgV2) imGroup.GET("/ws_v2", ServerSendMsgV3)
imGroup.POST("/get_friend_list", GetFriendList) //获取好友列表,包括群聊 imGroup.POST("/get_friend_list", GetFriendList) //获取好友列表,包括群聊
//获取好友请求 //获取好友请求
imGroup.POST("/get_friend_request", GetFriendRequest) imGroup.POST("/get_friend_request", GetFriendRequest)
@ -607,6 +608,28 @@ func ServerSendMsgV2(c *gin.Context) {
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
} }
} }
func ServerSendMsgV3(c *gin.Context) {
//ws
id, _ := c.Get("id")
user_id := int(id.(float64))
// 升级HTTP连接为WebSocket连接
ws, err1 := upgrader.Upgrade(c.Writer, c.Request, nil)
clients[ws] = true
if err1 != nil {
// log.Println(err)
fmt.Println(err1)
return
}
//设置用户在线状态
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60)
worker.SetRedisBitmap("im2_online_users", int64(user_id), 1)
worker.SetRedisSetAdd("im2_online_users_set", strconv.Itoa(user_id))
//发送消息
key := "user_" + strconv.Itoa(user_id) + "_msg_ids"
go subscribeAndHandleIMMessages(ws, key, user_id)
}
func ServerSendMsg(c *gin.Context) { func ServerSendMsg(c *gin.Context) {
//sse //sse
c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Content-Type", "text/event-stream")
@ -654,3 +677,54 @@ func ServerSendMsg(c *gin.Context) {
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
} }
} }
func subscribeAndHandleIMMessages(ws *websocket.Conn, chanel string, user_id int) {
ctx := context.Background()
pubsub := worker.RedisClient.Subscribe(ctx, chanel)
defer pubsub.Close()
defer ws.Close()
ch := pubsub.Channel()
for m := range ch {
msg_id := m.Payload //消息id
if msg_id != "" {
msg_id_num, _ := strconv.ParseInt(msg_id, 10, 64)
msgs := dao.FindMessageByID2(uint(msg_id_num))
if len(msgs) > 0 {
msg := msgs[0]
//发送消息
msg_str, _ := json.Marshal(msg)
var msg_ proto.Message
msg_.Type = "msg"
msg_.Msg = string(msg_str)
msg_str2, _ := json.Marshal(msg_)
err2 := ws.WriteMessage(websocket.TextMessage, msg_str2)
if err2 != nil {
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600)
clientsMux.Lock()
delete(clients, ws)
clientsMux.Unlock()
break
}
}
} else {
var msg proto.Message
msg.Type = "check"
msg.Msg = "check"
msg.From_user_id = -1
//发送心跳包
res3, _ := json.Marshal(msg)
err2 := ws.WriteMessage(websocket.TextMessage, res3)
if err2 != nil {
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "0", time.Second*3600)
worker.SetRedisBitmap("im2_online_users", int64(user_id), 0)
worker.SetRedisSetRemove("im2_online_users_set", strconv.Itoa(user_id))
clientsMux.Lock()
delete(clients, ws)
clientsMux.Unlock()
break
}
}
worker.SetRedisWithExpire("user_"+strconv.Itoa(user_id)+"_status_v2", "1", time.Second*60)
}
}

View File

@ -160,6 +160,10 @@ func GetVideoList(c *gin.Context) {
const layout = "2006-01-02 15:04:05" const layout = "2006-01-02 15:04:05"
tm1 := time.Unix(gvl_req.StartTime, 0).Format(layout) tm1 := time.Unix(gvl_req.StartTime, 0).Format(layout)
tm2 := time.Unix(gvl_req.EndTime, 0).Format(layout) tm2 := time.Unix(gvl_req.EndTime, 0).Format(layout)
if gvl_req.StartTime == 0 || gvl_req.EndTime == 0 {
tm1 = ""
tm2 = ""
}
videos := service.GetVideoList(int(id.(float64)), tm1, tm2, gvl_req.Hour) videos := service.GetVideoList(int(id.(float64)), tm1, tm2, gvl_req.Hour)
c.JSON(http.StatusOK, gin.H{"data": videos, "code": proto.SuccessCode, "message": "success"}) c.JSON(http.StatusOK, gin.H{"data": videos, "code": proto.SuccessCode, "message": "success"})
} else { } else {

16
main.go
View File

@ -3,7 +3,9 @@ package main
import ( import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/golang-jwt/jwt" "github.com/golang-jwt/jwt"
"github.com/robfig/cron/v3"
"io" "io"
"log"
"os" "os"
"strings" "strings"
"videoplayer/dao" "videoplayer/dao"
@ -33,6 +35,14 @@ func main() {
handler.SetUpToolGroup(r) // Tool handler.SetUpToolGroup(r) // Tool
defer dao.Close() defer dao.Close()
defer worker.CloseRedis() defer worker.CloseRedis()
//定时任务
c := cron.New(cron.WithSeconds())
// 添加每 10 秒执行一次的任务
_, err = c.AddFunc("@every 10s", myTask)
if err != nil {
log.Fatal("添加定时任务失败: ", err)
}
c.Start()
r.Run(":8083") // listen and serve on 0.0.0.0:8083 r.Run(":8083") // listen and serve on 0.0.0.0:8083
} }
func init() { func init() {
@ -138,3 +148,9 @@ func JWTAuthMiddleware() gin.HandlerFunc {
c.Next() c.Next()
} }
} }
func myTask() {
// 定时任务
//redis中取出数据
handler.RunCron()
}

View File

@ -11,3 +11,9 @@ type UpdateUserInfoReq struct {
Run bool `json:"run" form:"run"` //是否运行 Run bool `json:"run" form:"run"` //是否运行
Avatar string `json:"avatar" form:"avatar"` //头像 Avatar string `json:"avatar" form:"avatar"` //头像
} }
type CIDRUN struct {
CID uint `json:"cid" form:"cid"` //持续集成ID,查找持续集成任务
Curr int `json:"curr" form:"curr"` //当前剩余时间每次执行减10s小于等于0则执行
Every int `json:"every" form:"every"` //每隔多少秒执行一次,小于等于0表示不执行时间粒度为10s
}

View File

@ -45,6 +45,8 @@ func CreateGeneralMessageService(from_id, to_id, msg_type, group_id int, content
if res == "1" { if res == "1" {
//在线,存入redis //在线,存入redis
worker.PushRedisListWithExpire("user_"+strconv.Itoa(to_id)+"_msg_ids", strconv.Itoa(int(id)), time.Second*300) worker.PushRedisListWithExpire("user_"+strconv.Itoa(to_id)+"_msg_ids", strconv.Itoa(int(id)), time.Second*300)
//发布消息
worker.Publish("user_"+strconv.Itoa(to_id)+"_msg_ids", strconv.Itoa(int(id)), time.Second*300)
} }
//判断接收方是否是机器人 //判断接收方是否是机器人
id_str := strconv.Itoa(to_id) id_str := strconv.Itoa(to_id)
@ -96,6 +98,8 @@ func CreateGeneralMessageService(from_id, to_id, msg_type, group_id int, content
} }
//在线,存入redis //在线,存入redis
worker.PushRedisListWithExpire("user_"+user_id+"_msg_ids", strconv.Itoa(int(id)), time.Second*300) worker.PushRedisListWithExpire("user_"+user_id+"_msg_ids", strconv.Itoa(int(id)), time.Second*300)
//发布消息
worker.Publish("user_"+user_id+"_msg_ids", strconv.Itoa(int(id)), time.Second*300)
} }
case 3: case 3:
//user := dao.FindUserByID(to_id) //user := dao.FindUserByID(to_id)

View File

@ -12,19 +12,19 @@ import (
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
) )
var redisClient *redis.Client // Redis 客户端, 用于连接 Redis 服务器 var RedisClient *redis.Client // Redis 客户端, 用于连接 Redis 服务器
func InitRedis() error { func InitRedis() error {
ctx := context.Background() ctx := context.Background()
if proto.Config.REDIS_User_PW == false { if proto.Config.REDIS_User_PW == false {
// 连接redis // 连接redis
redisClient = redis.NewClient(&redis.Options{ RedisClient = redis.NewClient(&redis.Options{
Addr: proto.Config.REDIS_ADDR, // Redis 服务器地址 Addr: proto.Config.REDIS_ADDR, // Redis 服务器地址
DB: proto.Config.REDIS_DB, // 使用的数据库编号 DB: proto.Config.REDIS_DB, // 使用的数据库编号
}) })
} else { } else {
// 连接redis // 连接redis
redisClient = redis.NewClient(&redis.Options{ RedisClient = redis.NewClient(&redis.Options{
Addr: proto.Config.REDIS_ADDR, // Redis 服务器地址 Addr: proto.Config.REDIS_ADDR, // Redis 服务器地址
Password: proto.Config.REDIS_PASSWORD, // 如果 Redis 设置了密码 Password: proto.Config.REDIS_PASSWORD, // 如果 Redis 设置了密码
DB: proto.Config.REDIS_DB, // 使用的数据库编号 DB: proto.Config.REDIS_DB, // 使用的数据库编号
@ -32,7 +32,7 @@ func InitRedis() error {
} }
// 验证 Redis 客户端是否可以正常工作 // 验证 Redis 客户端是否可以正常工作
_, err := redisClient.Ping(ctx).Result() _, err := RedisClient.Ping(ctx).Result()
if err != nil { if err != nil {
fmt.Println("Error connecting to Redis: %v", err) fmt.Println("Error connecting to Redis: %v", err)
} }
@ -41,14 +41,14 @@ func InitRedis() error {
func CloseRedis() { func CloseRedis() {
// 关闭 Redis 客户端 // 关闭 Redis 客户端
if err := redisClient.Close(); err != nil { if err := RedisClient.Close(); err != nil {
fmt.Println("Error closing Redis client: %v", err) fmt.Println("Error closing Redis client: %v", err)
} }
} }
func IsContainKey(key string) bool { func IsContainKey(key string) bool {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.Exists(ctx, key).Result() // 检查键是否存在, 如果存在则返回 1, 否则返回 0 val, err := RedisClient.Exists(ctx, key).Result() // 检查键是否存在, 如果存在则返回 1, 否则返回 0
if err != nil { if err != nil {
fmt.Println("Error getting key: %v", err) fmt.Println("Error getting key: %v", err)
return false return false
@ -63,7 +63,7 @@ func IsContainKey(key string) bool {
func SetRedis(key string, value string) bool { func SetRedis(key string, value string) bool {
ctx := context.Background() ctx := context.Background()
// 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等
err := redisClient.Set(ctx, key, value, time.Minute*30).Err() err := RedisClient.Set(ctx, key, value, time.Minute*30).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -83,12 +83,12 @@ func SetHashWithTime(key string, id int, name, email string, duration time.Durat
} }
// 设置哈希表的字段值, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 // 设置哈希表的字段值, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等
err := redisClient.HSet(ctx, key, fields).Err() err := RedisClient.HSet(ctx, key, fields).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
} }
err = redisClient.Expire(ctx, key, time.Hour*10).Err() err = RedisClient.Expire(ctx, key, time.Hour*10).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -99,12 +99,12 @@ func SetHashWithTime(key string, id int, name, email string, duration time.Durat
// 设置redis hash设置过期时间 // 设置redis hash设置过期时间
func SetHash(key string, data map[string]interface{}) bool { func SetHash(key string, data map[string]interface{}) bool {
ctx := context.Background() ctx := context.Background()
err := redisClient.HSet(ctx, key, data).Err() err := RedisClient.HSet(ctx, key, data).Err()
if err != nil { if err != nil {
fmt.Println("%v :Error setting hash: %v", key, err) fmt.Println("%v :Error setting hash: %v", key, err)
return false return false
} }
err = redisClient.Expire(ctx, key, time.Minute*30).Err() err = RedisClient.Expire(ctx, key, time.Minute*30).Err()
if err != nil { if err != nil {
fmt.Println("%v :Error setting expire: %v", key, err) fmt.Println("%v :Error setting expire: %v", key, err)
return false return false
@ -114,7 +114,7 @@ func SetHash(key string, data map[string]interface{}) bool {
func SetHashWithField(key string, field string, value string) bool { func SetHashWithField(key string, field string, value string) bool {
ctx := context.Background() ctx := context.Background()
err := redisClient.HSet(ctx, key, field, value).Err() err := RedisClient.HSet(ctx, key, field, value).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -124,7 +124,7 @@ func SetHashWithField(key string, field string, value string) bool {
func GetHash(key string, field string) string { func GetHash(key string, field string) string {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.HGet(ctx, key, field).Result() val, err := RedisClient.HGet(ctx, key, field).Result()
if err != nil { if err != nil {
fmt.Println("Error getting hash: %v", err) fmt.Println("Error getting hash: %v", err)
return "" return ""
@ -134,7 +134,7 @@ func GetHash(key string, field string) string {
func GetHashAll(key string) map[string]string { func GetHashAll(key string) map[string]string {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.HGetAll(ctx, key).Result() val, err := RedisClient.HGetAll(ctx, key).Result()
if err != nil { if err != nil {
fmt.Println("Error getting hash: %v", err) fmt.Println("Error getting hash: %v", err)
return nil return nil
@ -146,7 +146,7 @@ func GetHashAll(key string) map[string]string {
func SetRedisWithExpire(key string, value string, expire time.Duration) bool { // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 func SetRedisWithExpire(key string, value string, expire time.Duration) bool { // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等
ctx := context.Background() ctx := context.Background()
// 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等 // 设置键值对, 0 表示不设置过期时间, 如果需要设置过期时间, 可以设置为 time.Second * 10 等
err := redisClient.Set(ctx, key, value, expire).Err() err := RedisClient.Set(ctx, key, value, expire).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -157,7 +157,7 @@ func SetRedisWithExpire(key string, value string, expire time.Duration) bool { /
// 获取redis // 获取redis
func GetRedis(key string) string { func GetRedis(key string) string {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.Get(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 val, err := RedisClient.Get(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误
if err != nil { if err != nil {
fmt.Println(key, " Error getting key: %v", err) fmt.Println(key, " Error getting key: %v", err)
return "" return ""
@ -168,7 +168,7 @@ func GetRedis(key string) string {
// pop redis list from right,as stack // pop redis list from right,as stack
func PopRedisList(key string) string { func PopRedisList(key string) string {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.RPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 val, err := RedisClient.RPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误
if err != nil { if err != nil {
fmt.Println(key, " Error reading from Redis: %v", err) fmt.Println(key, " Error reading from Redis: %v", err)
return "" return ""
@ -179,7 +179,7 @@ func PopRedisList(key string) string {
// pop redis list from left,as queue // pop redis list from left,as queue
func PopRedisListLeft(key string) string { func PopRedisListLeft(key string) string {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.LPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误 val, err := RedisClient.LPop(ctx, key).Result() // 从 Redis 读取键值, 如果键不存在则返回空字符串, 如果出现错误则返回错误
if err != nil { if err != nil {
return "" return ""
} }
@ -188,7 +188,7 @@ func PopRedisListLeft(key string) string {
func DelRedis(key string) { func DelRedis(key string) {
ctx := context.Background() ctx := context.Background()
err := redisClient.Del(ctx, key).Err() err := RedisClient.Del(ctx, key).Err()
if err != nil { if err != nil {
fmt.Println("Error deleting key: %v", err) fmt.Println("Error deleting key: %v", err)
} }
@ -197,7 +197,7 @@ func DelRedis(key string) {
// push redis list from right // push redis list from right
func PushRedisList(key string, value string) bool { func PushRedisList(key string, value string) bool {
ctx := context.Background() ctx := context.Background()
err := redisClient.RPush(ctx, key, value).Err() err := RedisClient.RPush(ctx, key, value).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -207,12 +207,12 @@ func PushRedisList(key string, value string) bool {
func PushRedisListWithExpire(key string, value string, expire time.Duration) bool { func PushRedisListWithExpire(key string, value string, expire time.Duration) bool {
ctx := context.Background() ctx := context.Background()
err := redisClient.RPush(ctx, key, value).Err() err := RedisClient.RPush(ctx, key, value).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
} }
err = redisClient.Expire(ctx, key, expire).Err() err = RedisClient.Expire(ctx, key, expire).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -223,7 +223,7 @@ func PushRedisListWithExpire(key string, value string, expire time.Duration) boo
// delete redis key // delete redis key
func delRedis(key string) { func delRedis(key string) {
ctx := context.Background() ctx := context.Background()
err := redisClient.Del(ctx, key).Err() err := RedisClient.Del(ctx, key).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
} }
@ -249,7 +249,7 @@ func (u *RUser) toJSONString() string {
// put hash to redis // put hash to redis
func hSetRedis(key string, field string, value string) { func hSetRedis(key string, field string, value string) {
ctx := context.Background() ctx := context.Background()
err := redisClient.HSet(ctx, key, field, value).Err() err := RedisClient.HSet(ctx, key, field, value).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
} }
@ -258,7 +258,7 @@ func hSetRedis(key string, field string, value string) {
// get hash from redis // get hash from redis
func hGetRedis(key string, field string) string { func hGetRedis(key string, field string) string {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.HGet(ctx, key, field).Result() val, err := RedisClient.HGet(ctx, key, field).Result()
if err != nil { if err != nil {
fmt.Println("Error getting key: %v", err) fmt.Println("Error getting key: %v", err)
} }
@ -268,12 +268,12 @@ func hGetRedis(key string, field string) string {
// 设置set有过期时间 // 设置set有过期时间
func SetRedisSet(key string, values []string, expire time.Duration) bool { func SetRedisSet(key string, values []string, expire time.Duration) bool {
ctx := context.Background() ctx := context.Background()
err := redisClient.SAdd(ctx, key, values).Err() err := RedisClient.SAdd(ctx, key, values).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
} }
err = redisClient.Expire(ctx, key, expire).Err() err = RedisClient.Expire(ctx, key, expire).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -284,7 +284,23 @@ func SetRedisSet(key string, values []string, expire time.Duration) bool {
// 设置set,添加元素 // 设置set,添加元素
func SetRedisSetAdd(key string, value string) bool { func SetRedisSetAdd(key string, value string) bool {
ctx := context.Background() ctx := context.Background()
err := redisClient.SAdd(ctx, key, value).Err() err := RedisClient.SAdd(ctx, key, value).Err()
if err != nil {
fmt.Println("Error setting key: %v", err)
return false
}
return true
}
// 设置set,添加元素
func SetRedisSetAddWithExpire(key string, value string, expire time.Duration) bool {
ctx := context.Background()
err := RedisClient.SAdd(ctx, key, value).Err()
if err != nil {
fmt.Println("Error setting key: %v", err)
return false
}
err = RedisClient.Expire(ctx, key, expire).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -295,7 +311,7 @@ func SetRedisSetAdd(key string, value string) bool {
// 设置set,删除元素 // 设置set,删除元素
func SetRedisSetRemove(key string, value string) bool { func SetRedisSetRemove(key string, value string) bool {
ctx := context.Background() ctx := context.Background()
err := redisClient.SRem(ctx, key, value).Err() err := RedisClient.SRem(ctx, key, value).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -306,7 +322,7 @@ func SetRedisSetRemove(key string, value string) bool {
// 获取两个set的交集 // 获取两个set的交集
func GetRedisSetIntersect(key1 string, key2 string) []string { func GetRedisSetIntersect(key1 string, key2 string) []string {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.SInter(ctx, key1, key2).Result() val, err := RedisClient.SInter(ctx, key1, key2).Result()
if err != nil { if err != nil {
fmt.Println("Error getting key: %v", err) fmt.Println("Error getting key: %v", err)
return nil return nil
@ -317,7 +333,7 @@ func GetRedisSetIntersect(key1 string, key2 string) []string {
// 查看set是否包含元素 // 查看set是否包含元素
func IsContainSet(key string, value string) bool { func IsContainSet(key string, value string) bool {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.SIsMember(ctx, key, value).Result() val, err := RedisClient.SIsMember(ctx, key, value).Result()
if err != nil { if err != nil {
fmt.Println("Error getting key: %v", err) fmt.Println("Error getting key: %v", err)
return false return false
@ -328,7 +344,7 @@ func IsContainSet(key string, value string) bool {
// 查看set的所有元素 // 查看set的所有元素
func GetRedisSetMembers(key string) []string { func GetRedisSetMembers(key string) []string {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.SMembers(ctx, key).Result() val, err := RedisClient.SMembers(ctx, key).Result()
if err != nil { if err != nil {
fmt.Println("Error getting key: %v", err) fmt.Println("Error getting key: %v", err)
return nil return nil
@ -339,7 +355,7 @@ func GetRedisSetMembers(key string) []string {
// BITMAP // BITMAP
func SetRedisBitmap(key string, offset int64, value int) bool { func SetRedisBitmap(key string, offset int64, value int) bool {
ctx := context.Background() ctx := context.Background()
err := redisClient.SetBit(ctx, key, offset, value).Err() err := RedisClient.SetBit(ctx, key, offset, value).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
return false return false
@ -350,7 +366,7 @@ func SetRedisBitmap(key string, offset int64, value int) bool {
// BITMAP获取 // BITMAP获取
func GetRedisBitmap(key string, offset int64) int { func GetRedisBitmap(key string, offset int64) int {
ctx := context.Background() ctx := context.Background()
val, err := redisClient.GetBit(ctx, key, offset).Result() val, err := RedisClient.GetBit(ctx, key, offset).Result()
if err != nil { if err != nil {
fmt.Println("Error getting key: %v", err) fmt.Println("Error getting key: %v", err)
return 0 return 0
@ -361,11 +377,11 @@ func GetRedisBitmap(key string, offset int64) int {
// 发布订阅者模式-发布消息 // 发布订阅者模式-发布消息
func Publish(channel string, message string, expire time.Duration) { func Publish(channel string, message string, expire time.Duration) {
ctx := context.Background() ctx := context.Background()
err := redisClient.Publish(ctx, channel, message).Err() err := RedisClient.Publish(ctx, channel, message).Err()
if err != nil { if err != nil {
fmt.Println("Error publishing message: %v", err) fmt.Println("Error publishing message: %v", err)
} }
err = redisClient.Expire(ctx, channel, expire).Err() err = RedisClient.Expire(ctx, channel, expire).Err()
if err != nil { if err != nil {
fmt.Println("Error setting key: %v", err) fmt.Println("Error setting key: %v", err)
} }
@ -374,7 +390,7 @@ func Publish(channel string, message string, expire time.Duration) {
// 发布订阅者模式-订阅消息 // 发布订阅者模式-订阅消息
func Subscribe(channel string) []string { func Subscribe(channel string) []string {
ctx := context.Background() ctx := context.Background()
pubsub := redisClient.Subscribe(ctx, channel) pubsub := RedisClient.Subscribe(ctx, channel)
ch := pubsub.Channel() ch := pubsub.Channel()
defer pubsub.Close() defer pubsub.Close()
var messages []string var messages []string