From 3571772bb1d04d5137b4f1bae0884fd25b8080b8 Mon Sep 17 00:00:00 2001 From: junleea <354425203@qq.com> Date: Fri, 13 Dec 2024 20:21:05 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E7=94=A8=E6=88=B7=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=90=8C=E6=AD=A5=E7=A1=AE=E8=AE=A4=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler/user.go | 14 +++++++--- proto/user_req.go | 11 ++++++++ service/userService.go | 61 ++++++++++++++++++++++++++++++++++++++++++ worker/redis.go | 50 ++++++++++++++++++++++++++++++++++ 4 files changed, 133 insertions(+), 3 deletions(-) diff --git a/handler/user.go b/handler/user.go index 26cfd5c..eb06568 100644 --- a/handler/user.go +++ b/handler/user.go @@ -371,9 +371,10 @@ func registerHandler(c *gin.Context) { } type SyncUserReq struct { - Token string `json:"token" form:"token"` - Types int `json:"type" form:"type"` // 1为全量同步 2为增量同步 - Device string `json:"device" form:"device"` + Token string `json:"token" form:"token"` + Types int `json:"type" form:"type"` // 1为全量同步 2为增量同步 + Device string `json:"device" form:"device"` + Confirm proto.UserSyncConfirm `json:"confirm" form:"confirm"` } func GetSyncUserInfo(c *gin.Context) { @@ -397,6 +398,13 @@ func GetSyncUserInfo(c *gin.Context) { } res := service.GetUserSyncData(req_data.Device) c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success", "data": res}) + } else if req_data.Types == 3 { //3为确认同步数据 + res := service.ConfirmSyncUserData(req_data.Device, req_data.Confirm) + if res == nil { + c.JSON(200, gin.H{"code": proto.SuccessCode, "message": "success", "data": "success"}) + } else { + c.JSON(200, gin.H{"code": proto.OperationFailed, "message": "failed", "data": "failed"}) + } } else { c.JSON(200, gin.H{"code": proto.ParameterError, "message": "type is error", "data": proto.UserSyncResp{}}) } diff --git a/proto/user_req.go b/proto/user_req.go index 1aa18e8..6711302 100644 --- a/proto/user_req.go +++ b/proto/user_req.go @@ -48,6 +48,17 @@ type UserSync struct { Delete []UserDelID `json:"delete" form:"delete"` //删除用户 } +// 用户数据同步确认 +type UserSyncConfirm struct { + Add []UserConfirmID `json:"add" form:"add"` //添加用户 + Update []UserConfirmID `json:"update" form:"update"` //更新用户 + Delete []UserConfirmID `json:"delete" form:"delete"` //删除用户 +} + +type UserConfirmID struct { + ID uint `json:"id" form:"id"` //用户id +} + type UserDelID struct { ID uint `json:"ID" form:"ID"` //用户id } diff --git a/service/userService.go b/service/userService.go index 10da872..0a7985e 100644 --- a/service/userService.go +++ b/service/userService.go @@ -4,6 +4,7 @@ import ( "errors" "regexp" "strconv" + "time" "videoplayer/dao" "videoplayer/proto" "videoplayer/worker" @@ -141,15 +142,27 @@ func GetUserSyncData(device string) proto.UserSyncResp { user := dao.FindUserByUserID(id) add_users = append(add_users, user) } + for _, v := range update_user_ids { id, _ := strconv.Atoi(v) user := dao.FindUserByUserID(id) update_users = append(update_users, user) } + for _, v := range delete_user_ids { id, _ := strconv.Atoi(v) delete_users = append(delete_users, proto.UserDelID{ID: uint(id)}) } + //将id存入暂存集合,清空原集合 + add_temp_key := device + "_sync_user_ids_add_confirm_temp" + update_temp_key := device + "_sync_user_ids_update_confirm_temp" + delete_temp_key := device + "_sync_user_ids_delete_confirm_temp" + worker.SetRedisSetUnionAndStore(add_temp_key, key+"_add") + worker.SetRedisSetClear(key + "_add") + worker.SetRedisSetUnionAndStore(update_temp_key, key+"_update") + worker.SetRedisSetClear(key + "_update") + worker.SetRedisSetUnionAndStore(delete_temp_key, key+"_delete") + worker.SetRedisSetClear(key + "_delete") return proto.UserSyncResp{Add: add_users, Update: update_users, Delete: delete_users} } @@ -173,3 +186,51 @@ func setSyncUserDataSet(t string, id int) error { } return err } + +// 确认同步数据 +func ConfirmSyncUserData(device string, data proto.UserSyncConfirm) error { + + var ids_add []string + var err error + for _, v := range data.Add { + ids_add = append(ids_add, strconv.Itoa(int(v.ID))) + } + add_key := device + "_sync_user_ids_add_confirm" + isSuccess := worker.SetRedisSetAddBatchWithExpire(add_key, ids_add, time.Second*30) + if !isSuccess { + err = errors.New("set confirm error") + } + var ids_update []string + for _, v := range data.Update { + ids_update = append(ids_update, strconv.Itoa(int(v.ID))) + } + update_key := device + "_sync_user_ids_update_confirm" + isSuccess = worker.SetRedisSetAddBatchWithExpire(update_key, ids_update, time.Second*30) + if !isSuccess { + err = errors.New("set confirm error") + } + + var ids_delete []string + for _, v := range data.Delete { + ids_delete = append(ids_delete, strconv.Itoa(int(v.ID))) + } + del_key := device + "_sync_user_ids_delete_confirm" + isSuccess = worker.SetRedisSetAddBatchWithExpire(del_key, ids_delete, time.Second*30) + if !isSuccess { + err = errors.New("set confirm error") + } + + //待确认集合暂存 + ids_add_confirm_temp := device + "_sync_user_ids_add_confirm_temp" + ids_update_confirm_temp := device + "_sync_user_ids_update_confirm_temp" + ids_delete_confirm_temp := device + "_sync_user_ids_delete_confirm_temp" + + //取差集 + add_diff := worker.SetRedisSetDiffAndStore(ids_add_confirm_temp, add_key) + update_diff := worker.SetRedisSetDiffAndStore(ids_update_confirm_temp, update_key) + delete_diff := worker.SetRedisSetDiffAndStore(ids_delete_confirm_temp, del_key) + if add_diff != true || update_diff != true || delete_diff != true { + err = errors.New("confirm error") + } + return err +} diff --git a/worker/redis.go b/worker/redis.go index a9957bb..84bc598 100644 --- a/worker/redis.go +++ b/worker/redis.go @@ -292,6 +292,23 @@ func SetRedisSetAdd(key string, value string) bool { return true } +// 批量添加元素 +func SetRedisSetAddBatchWithExpire(key string, values []string, expire time.Duration) bool { + ctx := context.Background() + err := RedisClient.SAdd(ctx, key, values).Err() + if err != nil { + fmt.Println("Error setting key: %v", err) + return false + } + err = RedisClient.Expire(ctx, key, expire).Err() + if err != nil { + fmt.Println("Error setting key: %v", err) + return false + } + return true + +} + // 设置set,添加元素 func SetRedisSetAddWithExpire(key string, value string, expire time.Duration) bool { ctx := context.Background() @@ -399,3 +416,36 @@ func Subscribe(channel string) []string { } return messages } + +// redis两个set求差集存入第一个set +func SetRedisSetDiffAndStore(key1 string, key2 string) bool { + ctx := context.Background() + err := RedisClient.SDiffStore(ctx, key1, key1, key2).Err() //将key1和key2的差集存入key1 + if err != nil { + fmt.Println("Error setting key: %v", err) + return false + } + return true +} + +// redis将第二个set存入第一个set +func SetRedisSetUnionAndStore(key1 string, key2 string) bool { + ctx := context.Background() + err := RedisClient.SUnionStore(ctx, key1, key1, key2).Err() //将key1和key2的并集存入key1 + if err != nil { + fmt.Println("Error setting key: %v", err) + return false + } + return true +} + +// redis 清空set +func ClearRedisSet(key string) bool { + ctx := context.Background() + err := RedisClient.Del(ctx, key).Err() + if err != nil { + fmt.Println("Error setting key: %v", err) + return false + } + return true +}