Merge branch 'refs/heads/feat-user-sync' into release

This commit is contained in:
junleea 2024-12-16 21:40:07 +08:00
commit de6b65022a
4 changed files with 82 additions and 31 deletions

View File

@ -112,7 +112,11 @@ func UpdateUserByID3(id int, req proto.UpdateUserInfoReq) error {
// 用户数据同步-添加 // 用户数据同步-添加
func AddUserSync(req proto.UserAddOrUpdate) uint { func AddUserSync(req proto.UserAddOrUpdate) uint {
res := DB.Exec("insert into users (id, created_at, updated_at, deleted_at, name, age, email, password,gender,role,redis,run,upload,video_func,device_func,cid_func,avatar,create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", req.ID, req.CreatedAt, req.UpdatedAt, req.DeletedAt, req.Name, req.Age, req.Email, req.Password, req.Gender, req.Role, req.Redis, req.Run, req.Upload, req.VideoFunc, req.DeviceFunc, req.CIDFunc, req.Avatar, req.CreateTime, req.UpdateTime) res := DB.Exec("insert into users (id, created_at, updated_at, deleted_at, name, age, email, password,gender,role,redis,run,upload,video_func,device_func,cid_func,avatar,create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", req.ID, req.CreatedAt, req.UpdatedAt, req.DeletedAt, req.Name, req.Age, req.Email, req.Password, req.Gender, req.Role, req.Redis, req.Run, req.Upload, req.VideoFunc, req.DeviceFunc, req.CIDFunc, req.Avatar, req.CreateTime, req.UpdateTime)
if res.Error != nil {
return 0
}
res = DB.Debug().Exec("update users set deleted_at=null where id=?", req.ID)
if res.Error != nil { if res.Error != nil {
return 0 return 0
} }
@ -121,10 +125,15 @@ func AddUserSync(req proto.UserAddOrUpdate) uint {
// 用户数据同步-更新 // 用户数据同步-更新
func UpdateUserSync(req proto.UserAddOrUpdate) uint { func UpdateUserSync(req proto.UserAddOrUpdate) uint {
//事务
res := DB.Exec("update users set created_at=?, updated_at=?, deleted_at=?, name=?, age=?, email=?, password=?,gender=?,role=?,redis=?,run=?,upload=?,video_func=?,device_func=?,cid_func=?,avatar=?,create_time=?,update_time=? where id=?", req.CreatedAt, req.UpdatedAt, req.DeletedAt, req.Name, req.Age, req.Email, req.Password, req.Gender, req.Role, req.Redis, req.Run, req.Upload, req.VideoFunc, req.DeviceFunc, req.CIDFunc, req.Avatar, req.CreateTime, req.UpdateTime, req.ID) res := DB.Exec("update users set created_at=?, updated_at=?, deleted_at=?, name=?, age=?, email=?, password=?,gender=?,role=?,redis=?,run=?,upload=?,video_func=?,device_func=?,cid_func=?,avatar=?,create_time=?,update_time=? where id=?", req.CreatedAt, req.UpdatedAt, req.DeletedAt, req.Name, req.Age, req.Email, req.Password, req.Gender, req.Role, req.Redis, req.Run, req.Upload, req.VideoFunc, req.DeviceFunc, req.CIDFunc, req.Avatar, req.CreateTime, req.UpdateTime, req.ID)
if res.Error != nil { if res.Error != nil {
return 0 return 0
} }
res = DB.Debug().Exec("update users set deleted_at=null where id=?", req.ID)
if res.Error != nil {
return 0
}
return req.ID return req.ID
} }
@ -150,3 +159,9 @@ type UserSyncResp struct {
Add []User `json:"add" form:"add"` //添加用户 Add []User `json:"add" form:"add"` //添加用户
Delete []proto.UserDelID `json:"delete" form:"delete"` //删除用户 Delete []proto.UserDelID `json:"delete" form:"delete"` //删除用户
} }
// 清空用户表
func ClearAllUsers() error {
res := DB.Exec("TRUNCATE TABLE users")
return res.Error
}

17
main.go
View File

@ -9,6 +9,7 @@ import (
"io" "io"
"log" "log"
"os" "os"
"strconv"
"strings" "strings"
"videoplayer/dao" "videoplayer/dao"
"videoplayer/handler" "videoplayer/handler"
@ -142,7 +143,10 @@ func JWTAuthMiddleware() gin.HandlerFunc {
}) })
return return
} }
c.Set("id", s_id) id, _ := strconv.Atoi(s_id)
id_float64 := float64(id)
//查看s_id类型
c.Set("id", id_float64)
c.Next() c.Next()
return return
} }
@ -225,8 +229,8 @@ func ReadConfigToSetSystem() {
} }
} }
var is_exist bool is_exist := false
user_sync_id := -1 user_sync_id := -1 //用户同步任务索引
for i, v := range cron_infos { for i, v := range cron_infos {
if v.Type == 2 { if v.Type == 2 {
is_exist = true is_exist = true
@ -235,6 +239,7 @@ func ReadConfigToSetSystem() {
v.Curr = proto.Config.USER_SYNC_TIME v.Curr = proto.Config.USER_SYNC_TIME
} }
user_sync_id = i user_sync_id = i
cron_infos[i] = v
break break
} }
} }
@ -289,7 +294,7 @@ func RunGeneralCron() {
if err != nil { if err != nil {
fmt.Println("RunGeneralCron Error decoding config,key value is :", res) fmt.Println("RunGeneralCron Error decoding config,key value is :", res)
} }
for _, v := range cron_infos { for i, v := range cron_infos {
//1:日志清理,其他待定 //1:日志清理,其他待定
if v.Type == 1 { if v.Type == 1 {
//日志清理 //日志清理
@ -300,6 +305,8 @@ func RunGeneralCron() {
} else { } else {
v.Curr -= 10 v.Curr -= 10
} }
cron_infos[i] = v
continue
} }
//2 从服务器同步数据 //2 从服务器同步数据
if v.Type == 2 { if v.Type == 2 {
@ -312,6 +319,8 @@ func RunGeneralCron() {
} else { } else {
v.Curr -= 10 v.Curr -= 10
} }
cron_infos[i] = v
continue
} }
} }
//存入redis //存入redis

View File

@ -112,14 +112,27 @@ func UserSyncDataFromMaster() {
req.Token = tokens[0] req.Token = tokens[0]
req.Device = proto.Config.SERVER_NAME req.Device = proto.Config.SERVER_NAME
all := worker.GetRedis("user_sync_all") all := worker.GetRedis("user_sync_all")
is_all := false //是否全量同步
if all == "" || all == "1" { if all == "" || all == "1" {
is_all = true
//清空数据表
err := dao.ClearAllUsers()
if err != nil {
fmt.Println("All ClearAllUsers error:", err)
return
}
worker.SetRedis("user_sync_all", "1")
req.Types = 1 req.Types = 1
} else { } else {
worker.SetRedis("user_sync_all", "2") worker.SetRedis("user_sync_all", "2")
req.Types = 2 req.Types = 2
} }
user_sync_data := worker.SyncDataFromMasterReq2(url, req) user_sync_data, err := worker.SyncDataFromMasterReq2(url, req)
if err != nil {
fmt.Println("UserSyncDataFromMaster error:", err)
return
}
add_users := user_sync_data.Add add_users := user_sync_data.Add
update_users := user_sync_data.Update update_users := user_sync_data.Update
delete_users := user_sync_data.Delete delete_users := user_sync_data.Delete
@ -158,17 +171,21 @@ func UserSyncDataFromMaster() {
} }
//确认同步数据 //确认同步数据
var data proto.UserSyncConfirm if is_all == false {
data.Add = add_confirm var data proto.UserSyncConfirm
data.Update = update_confirm data.Add = add_confirm
data.Delete = delete_confirm data.Update = update_confirm
//确认同步数据请求 data.Delete = delete_confirm
var confirm_req proto.SyncUserReq //确认同步数据请求
confirm_req.Token = tokens[0] var confirm_req proto.SyncUserReq
confirm_req.Device = proto.Config.SERVER_NAME confirm_req.Token = tokens[0]
confirm_req.Types = 3 confirm_req.Device = proto.Config.SERVER_NAME
confirm_req.Confirm = data confirm_req.Types = 3
worker.SyncDataFromMasterReq2(url, confirm_req) confirm_req.Confirm = data
worker.SyncDataFromMasterReq2(url, confirm_req)
} else {
worker.SetRedis("user_sync_all", "2")
}
} }
// 同步数据到主服务器-增删改数据 // 同步数据到主服务器-增删改数据

View File

@ -129,17 +129,29 @@ func SyncDataFromMasterReq(url string, token string) proto.UserSync {
return userSync return userSync
} }
type Response struct {
Code int `json:"code"`
Message string `json:"message"`
Data proto.UserSync `json:"data"`
}
// 获取数据,全量及增量 // 获取数据,全量及增量
func SyncDataFromMasterReq2(url string, data proto.SyncUserReq) proto.UserSync { func SyncDataFromMasterReq2(url string, data proto.SyncUserReq) (proto.UserSync, error) {
defer func() {
if r := recover(); r != nil {
fmt.Println("SyncDataFromMasterReq2 error:", r)
}
}()
var res proto.UserSync var res proto.UserSync
//从接口获取数据 //从接口获取数据
json_data, err := json.Marshal(data) json_data, err := json.Marshal(data)
if err != nil { if err != nil {
return res return res, err
} }
req, err := http.NewRequest("POST", url, bytes.NewBuffer(json_data)) req, err := http.NewRequest("POST", url, bytes.NewBuffer(json_data))
if err != nil { if err != nil {
return res return res, err
} }
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
//传输数据 //传输数据
@ -149,22 +161,20 @@ func SyncDataFromMasterReq2(url string, data proto.SyncUserReq) proto.UserSync {
//获取数据 //获取数据
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
return res return res, err
} }
defer resp.Body.Close() defer resp.Body.Close()
//解析数据 //解析数据
var m map[string]interface{} responseBod, err := io.ReadAll(resp.Body)
err = json.NewDecoder(resp.Body).Decode(&m)
if err != nil { if err != nil {
return res return res, err
} }
if m["code"].(float64) != 0 { var response Response
return res err = json.Unmarshal(responseBod, &response)
}
err = json.Unmarshal([]byte(m["data"].(string)), &res)
if err != nil { if err != nil {
fmt.Println("SyncDataFromMasterReq2 error decode data:", err) return res, err
return res
} }
return res res = response.Data
fmt.Println("SyncDataFromMasterReq2 result add data:", len(res.Add), "update data:", len(res.Update), "delete data:", len(res.Delete))
return res, nil
} }