From e13b3dfa8ca4ab750eed46eadee6d0a3fb10d633 Mon Sep 17 00:00:00 2001 From: xiabin Date: Fri, 7 Feb 2025 15:17:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84ecpm=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../etc/douyin_ecpm_calculation_service.yaml | 4 +- .../internal/logic/get_ecpm_logic.go | 163 +++++++++++++++++- .../internal/logic/ping_logic.go | 4 +- .../internal/svc/ecpm_config.go | 51 ++++++ .../internal/svc/service_context.go | 78 ++++++++- go.mod | 13 +- go.sum | 28 ++- pkg/gorm/init_db.go | 24 +++ pkg/gorm/redis/cacher.go | 73 ++++++++ 9 files changed, 418 insertions(+), 20 deletions(-) create mode 100644 douyin_ecpm_calculation_service/internal/svc/ecpm_config.go create mode 100644 pkg/gorm/init_db.go create mode 100644 pkg/gorm/redis/cacher.go diff --git a/douyin_ecpm_calculation_service/etc/douyin_ecpm_calculation_service.yaml b/douyin_ecpm_calculation_service/etc/douyin_ecpm_calculation_service.yaml index e4b5aaa..bb133de 100644 --- a/douyin_ecpm_calculation_service/etc/douyin_ecpm_calculation_service.yaml +++ b/douyin_ecpm_calculation_service/etc/douyin_ecpm_calculation_service.yaml @@ -1,6 +1,6 @@ -Name: douyinecpmcalculationservice.rpc +Name: douyin_ecpm_calculation_service.rpc ListenOn: 0.0.0.0:8080 Etcd: Hosts: - 127.0.0.1:2379 - Key: douyinecpmcalculationservice.rpc + Key: douyin_ecpm_calculation_service.rpc diff --git a/douyin_ecpm_calculation_service/internal/logic/get_ecpm_logic.go b/douyin_ecpm_calculation_service/internal/logic/get_ecpm_logic.go index 204f82d..3d218fd 100644 --- a/douyin_ecpm_calculation_service/internal/logic/get_ecpm_logic.go +++ b/douyin_ecpm_calculation_service/internal/logic/get_ecpm_logic.go @@ -2,6 +2,14 @@ package logic import ( "context" + "encoding/json" + "errors" + "fmt" + "gitea.youtukeji.com.cn/xiabin/youtu_grpc/auth_service/auth_service" + "io" + "net/http" + "strconv" + "time" "gitea.youtukeji.com.cn/xiabin/youtu_grpc/douyin_ecpm_calculation_service/douyin_ecpm_calculation_service" "gitea.youtukeji.com.cn/xiabin/youtu_grpc/douyin_ecpm_calculation_service/internal/svc" @@ -23,8 +31,157 @@ func NewGetEcpmLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetEcpmLo } } -func (l *GetEcpmLogic) GetEcpm(in *douyin_ecpm_calculation_service.GetEcpmRequest) (*douyin_ecpm_calculation_service.GetEcpmResponse, error) { - // todo: add your logic here and delete this line +func (l *GetEcpmLogic) GetEcpm(in *douyin_ecpm_calculation_service.GetEcpmRequest) (response *douyin_ecpm_calculation_service.GetEcpmResponse, err error) { + //获取抖音accessToken + res, err := l.svcCtx.AuthServiceClient.GetAccessToken(l.ctx, &auth_service.GetAccessTokenRequest{ + AppId: in.AppId, + }) + if err != nil { + return nil, err + } - return &douyin_ecpm_calculation_service.GetEcpmResponse{}, nil + //获取抖音ECPM数据 + list, err := GetEcpmData(res.AccessToken, in.AppId, in.OpenId, time.Now().Format(time.DateOnly)) + if err != nil { + return nil, err + } + + //计算ECPM具体值 + ecpm := CalcEcpm(list) + + //获取后端配置的ECPM值 + config, ok := l.svcCtx.EcpmConfig.Get(in.AppId) + if !ok { + return nil, errors.New("未找到小程序配置") + } + + //判断是否满足条件 + if ecpm > config.Value && len(list) > config.View { + response.Ok = true + } + + return +} + +// 计算ECPM值 +func CalcEcpm(res []Record) (ecpm float64) { + // 计算 ECPM + totalRecords := len(res) + + // 如果没有记录,则返回 0 + if totalRecords == 0 { + return + } + + totalCost := 0 + + for _, record := range res { + totalCost += record.Cost + } + + // 总 cost / 100000 * 1000 / 总记录数 + ecpm = float64(totalCost) / 100000 * 1000 / float64(totalRecords) + return +} + +// GetEcpmData 获取ECPM数据 +// appId: 小程序id +// openId: 抖音openId +// dateHour: 日期 +func GetEcpmData(accessToken, appId, openId, dateHour string) (list []Record, err error) { + list, err = GetEcpm(GetEcpmParams{ + AppId: appId, + OpenId: openId, + AccessToken: accessToken, + DateHour: dateHour, + PageSize: 500, + PageNo: 1, + }) + return +} + +const getEcpm = "https://minigame.zijieapi.com/mgplatform/api/apps/data/get_ecpm" // 获取ECPM + +// GetEcpm 获取ECPM +// https://bytedance.larkoffice.com/docx/Vg4yd0RDSovZINxJDyIc6THhnod +// 根据分页大小循环获取,聚合总数返回 +func GetEcpm(params GetEcpmParams) (list []Record, err error) { + fullURL := fmt.Sprintf("%s?open_id=%s&mp_id=%s&access_token=%s&date_hour=%s&page_size=500&page_no=", getEcpm, params.OpenId, params.AppId, params.AccessToken, params.DateHour) + for { + fullURL += strconv.Itoa(params.PageNo) + var responseBody []byte + func() { + // 调用抖音 API + var resp *http.Response + resp, err = http.Get(fullURL) + if err != nil { + err = errors.New("调用抖音 API 失败") + return + } + defer resp.Body.Close() + // 读取抖音 API 响应数据 + responseBody, err = io.ReadAll(resp.Body) + if err != nil { + err = errors.New("读取抖音 API 响应失败") + return + } + }() + + // 解析抖音 API 响应数据 + var apiResponse GetEcpmResponseData + err = json.Unmarshal(responseBody, &apiResponse) + if err != nil { + err = errors.New("解析 API 响应数据失败") + return + } + + // 检查 API 是否返回错误 + if apiResponse.ErrNo != 0 { + err = fmt.Errorf("抖音 API 返回错误: %s (错误码: %d)", apiResponse.ErrMsg, apiResponse.ErrNo) + return + } + + list = append(list, apiResponse.Data.Records...) + + // 当页数据小于总页数时,无更多数据,返回 + if len(apiResponse.Data.Records) <= params.PageSize { + return + } + + params.PageNo++ + } +} + +// GetEcpmParams 获取ECPM参数 +type GetEcpmParams struct { + AppId string `json:"app_id" form:"app_id"` + OpenId string `json:"open_id" form:"open_id"` + AccessToken string `json:"access_token" form:"access_token"` + DateHour string `json:"date_hour" form:"date_hour"` + PageSize int `json:"page_size" form:"page_size"` + PageNo int `json:"page_no" form:"page_no"` +} + +type GetEcpmResponseData struct { + BaseResp struct { + StatusCode int `json:"StatusCode"` + StatusMessage string `json:"StatusMessage"` + } `json:"BaseResp"` + Data struct { + Records []Record `json:"records"` + Total int `json:"total"` + } `json:"data"` + ErrMsg string `json:"err_msg"` + ErrNo int `json:"err_no"` + LogID string `json:"log_id"` +} + +type Record struct { + Aid string `json:"aid"` + Cost int `json:"cost"` + Did string `json:"did"` + EventName string `json:"event_name"` + EventTime string `json:"event_time"` + OpenID string `json:"open_id"` + ID int `json:"id"` } diff --git a/douyin_ecpm_calculation_service/internal/logic/ping_logic.go b/douyin_ecpm_calculation_service/internal/logic/ping_logic.go index 57d2c32..89e9fb8 100644 --- a/douyin_ecpm_calculation_service/internal/logic/ping_logic.go +++ b/douyin_ecpm_calculation_service/internal/logic/ping_logic.go @@ -24,7 +24,5 @@ func NewPingLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PingLogic { } func (l *PingLogic) Ping(in *douyin_ecpm_calculation_service.Request) (*douyin_ecpm_calculation_service.Response, error) { - // todo: add your logic here and delete this line - - return &douyin_ecpm_calculation_service.Response{}, nil + return &douyin_ecpm_calculation_service.Response{Pong: in.Ping}, nil } diff --git a/douyin_ecpm_calculation_service/internal/svc/ecpm_config.go b/douyin_ecpm_calculation_service/internal/svc/ecpm_config.go new file mode 100644 index 0000000..db04995 --- /dev/null +++ b/douyin_ecpm_calculation_service/internal/svc/ecpm_config.go @@ -0,0 +1,51 @@ +package svc + +import ( + "encoding/json" + "sync" +) + +type EcpmConfigCli struct { + *sync.Map +} + +func NewEcpmConfig() *EcpmConfigCli { + return &EcpmConfigCli{ + Map: &sync.Map{}, + } +} + +type Ecpm struct { + AppId string `json:"app_id"` + Value float64 `json:"value"` + View int `json:"view"` +} + +func (c *EcpmConfigCli) Get(appId string) (*Ecpm, bool) { + v, ok := c.Map.Load(appId) + if !ok { + return nil, false + } + return v.(*Ecpm), true +} + +func (c *EcpmConfigCli) Set(appId string, ecpmConfig *Ecpm) { + c.Map.Store(appId, ecpmConfig) +} + +func (c *EcpmConfigCli) SetAll(list []Ecpm) { + c.Map.Clear() + for _, v := range list { + c.Map.Store(v.AppId, &v) + } +} + +func (c *EcpmConfigCli) SetAllByJson(data []byte) { + var list []Ecpm + err := json.Unmarshal(data, &list) + if err != nil { + return + } + + c.SetAll(list) +} diff --git a/douyin_ecpm_calculation_service/internal/svc/service_context.go b/douyin_ecpm_calculation_service/internal/svc/service_context.go index 4768d0c..bb660ed 100644 --- a/douyin_ecpm_calculation_service/internal/svc/service_context.go +++ b/douyin_ecpm_calculation_service/internal/svc/service_context.go @@ -1,13 +1,85 @@ package svc -import "gitea.youtukeji.com.cn/xiabin/youtu_grpc/douyin_ecpm_calculation_service/internal/config" +import ( + "context" + "gitea.youtukeji.com.cn/xiabin/youtu_grpc/auth_service/auth_service" + "gitea.youtukeji.com.cn/xiabin/youtu_grpc/douyin_ecpm_calculation_service/internal/config" + "github.com/zeromicro/go-zero/core/conf" + "github.com/zeromicro/go-zero/core/discov" + "github.com/zeromicro/go-zero/zrpc" + clientv3 "go.etcd.io/etcd/client/v3" +) type ServiceContext struct { - Config config.Config + Config config.Config + AuthServiceClient auth_service.AuthServiceClient + etcdCli *clientv3.Client + EcpmConfig *EcpmConfigCli } func NewServiceContext(c config.Config) *ServiceContext { - return &ServiceContext{ + svc := &ServiceContext{ Config: c, } + + //初始化etcd客户端 + svc.initEtcd() + + //初始化auth_service客户端 + svc.initAuthServiceClient() + + return svc +} + +const EcpmConfigWatchKey = "ecpm_config" + +func (svc *ServiceContext) initEtcd() { + + //初始化etcd客户端 + cli, err := clientv3.NewFromURL(svc.Config.Etcd.Hosts[0]) + if err != nil { + panic(err) + } + + //获取ecpm配置 + res, err := cli.Get(context.Background(), EcpmConfigWatchKey) + if err != nil { + panic(err) + } + + //设置ecpm配置 + for _, kv := range res.Kvs { + svc.EcpmConfig.SetAllByJson(kv.Value) + } + + //监听etcd + go func() { + ch := cli.Watch(context.Background(), EcpmConfigWatchKey) + //从通道中尝试取值(监视的信息) + for res := range ch { + for _, evt := range res.Events { + svc.EcpmConfig.SetAllByJson(evt.Kv.Value) + } + } + }() + + svc.etcdCli = cli + svc.EcpmConfig = NewEcpmConfig() +} + +// initAuthServiceClient inits the AuthServiceClient. +func (svc *ServiceContext) initAuthServiceClient() { + clientConf := zrpc.RpcClientConf{} + clientConf.Token = "auth_service.rpc.key" + clientConf.App = "auth_service.rpc" + err := conf.FillDefault(&clientConf) // 填充默认值,比如 trace 透传等,参考服务配置说明 + if err != nil { + panic(err) + } + clientConf.Etcd = discov.EtcdConf{ // 通过 etcd 服务发现 + Hosts: svc.Config.Etcd.Hosts, + Key: "auth_service.rpc", + } + + svc.AuthServiceClient = auth_service.NewAuthServiceClient(zrpc.MustNewClient(clientConf).Conn()) } diff --git a/go.mod b/go.mod index 8aae854..9be9604 100644 --- a/go.mod +++ b/go.mod @@ -6,13 +6,18 @@ toolchain go1.23.6 require ( gitea.youtukeji.com.cn/youtu/openapi-helper v0.0.0-20250123094517-b5871b3f4784 + github.com/go-gorm/caches/v4 v4.0.5 github.com/silenceper/wechat/v2 v2.1.7 github.com/zeromicro/go-zero v1.8.0 + go.etcd.io/etcd/client/v3 v3.5.18 google.golang.org/grpc v1.70.0 google.golang.org/protobuf v1.36.4 + gorm.io/driver/mysql v1.5.7 + gorm.io/gorm v1.25.12 ) require ( + filippo.io/edwards25519 v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect @@ -29,6 +34,7 @@ require ( github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.22.4 // indirect github.com/go-redis/redis/v8 v8.11.5 // indirect + github.com/go-sql-driver/mysql v1.8.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.4 // indirect @@ -37,6 +43,8 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect @@ -54,9 +62,8 @@ require ( github.com/prometheus/procfs v0.15.1 // indirect github.com/redis/go-redis/v9 v9.7.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - go.etcd.io/etcd/api/v3 v3.5.15 // indirect - go.etcd.io/etcd/client/pkg/v3 v3.5.15 // indirect - go.etcd.io/etcd/client/v3 v3.5.15 // indirect + go.etcd.io/etcd/api/v3 v3.5.18 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.18 // indirect go.opentelemetry.io/otel v1.32.0 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect diff --git a/go.sum b/go.sum index 1f99c51..aca8d03 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= gitea.youtukeji.com.cn/youtu/openapi-helper v0.0.0-20250123094517-b5871b3f4784 h1:Ey/s51gRB3rpN/TCXY1rAqZulktSlkdXrxyfBtnEtAc= gitea.youtukeji.com.cn/youtu/openapi-helper v0.0.0-20250123094517-b5871b3f4784/go.mod h1:o3XiYjUmxptrwcYPbTwNc2SQSOeOj7qbQPdNNVU0H5w= github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= @@ -43,6 +45,8 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/go-gorm/caches/v4 v4.0.5 h1:Sdj9vxbEM0sCmv5+s5o6GzoVMuraWF0bjJJvUU+7c1U= +github.com/go-gorm/caches/v4 v4.0.5/go.mod h1:Ms8LnWVoW4GkTofpDzFH8OfDGNTjLxQDyxBmRN67Ujw= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= @@ -58,6 +62,9 @@ github.com/go-openapi/swag v0.22.4 h1:QLMzNJnMGPRNDCbySlcj1x01tzU8/9LTTL9hZZZogB github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= @@ -100,6 +107,10 @@ github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslC github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -206,12 +217,12 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/zeromicro/go-zero v1.8.0 h1:4g/8VW+fOyM51HZYPeI3mXIZdEX+Fl6SsdYX2H5PYw4= github.com/zeromicro/go-zero v1.8.0/go.mod h1:xDBF+/iDzj30zPvu6HNUIbpz1J6+/g3Sx9D/DytJfss= -go.etcd.io/etcd/api/v3 v3.5.15 h1:3KpLJir1ZEBrYuV2v+Twaa/e2MdDCEZ/70H+lzEiwsk= -go.etcd.io/etcd/api/v3 v3.5.15/go.mod h1:N9EhGzXq58WuMllgH9ZvnEr7SI9pS0k0+DHZezGp7jM= -go.etcd.io/etcd/client/pkg/v3 v3.5.15 h1:fo0HpWz/KlHGMCC+YejpiCmyWDEuIpnTDzpJLB5fWlA= -go.etcd.io/etcd/client/pkg/v3 v3.5.15/go.mod h1:mXDI4NAOwEiszrHCb0aqfAYNCrZP4e9hRca3d1YK8EU= -go.etcd.io/etcd/client/v3 v3.5.15 h1:23M0eY4Fd/inNv1ZfU3AxrbbOdW79r9V9Rl62Nm6ip4= -go.etcd.io/etcd/client/v3 v3.5.15/go.mod h1:CLSJxrYjvLtHsrPKsy7LmZEE+DK2ktfd2bN4RhBMwlU= +go.etcd.io/etcd/api/v3 v3.5.18 h1:Q4oDAKnmwqTo5lafvB+afbgCDF7E35E4EYV2g+FNGhs= +go.etcd.io/etcd/api/v3 v3.5.18/go.mod h1:uY03Ob2H50077J7Qq0DeehjM/A9S8PhVfbQ1mSaMopU= +go.etcd.io/etcd/client/pkg/v3 v3.5.18 h1:mZPOYw4h8rTk7TeJ5+3udUkfVGBqc+GCjOJYd68QgNM= +go.etcd.io/etcd/client/pkg/v3 v3.5.18/go.mod h1:BxVf2o5wXG9ZJV+/Cu7QNUiJYk4A29sAhoI5tIRsCu4= +go.etcd.io/etcd/client/v3 v3.5.18 h1:nvvYmNHGumkDjZhTHgVU36A9pykGa2K4lAJ0yY7hcXA= +go.etcd.io/etcd/client/v3 v3.5.18/go.mod h1:kmemwOsPU9broExyhYsBxX4spCTDX3yLgPMWtpBXG6E= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= go.opentelemetry.io/otel/exporters/jaeger v1.17.0 h1:D7UpUy2Xc2wsi1Ras6V40q806WM07rqoCWzXu7Sqy+4= @@ -352,6 +363,11 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/mysql v1.5.7 h1:MndhOPYOfEp2rHKgkZIhJ16eVUIRf2HmzgoPmh7FCWo= +gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkDM= +gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= +gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8= +gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ= k8s.io/api v0.29.3 h1:2ORfZ7+bGC3YJqGpV0KSDDEVf8hdGQ6A03/50vj8pmw= k8s.io/api v0.29.3/go.mod h1:y2yg2NTyHUUkIoTC+phinTnEa3KFM6RZ3szxt014a80= k8s.io/apimachinery v0.29.4 h1:RaFdJiDmuKs/8cm1M6Dh1Kvyh59YQFDcFuFTSmXes6Q= diff --git a/pkg/gorm/init_db.go b/pkg/gorm/init_db.go new file mode 100644 index 0000000..c3405a9 --- /dev/null +++ b/pkg/gorm/init_db.go @@ -0,0 +1,24 @@ +package gorm + +import ( + redisCacher "gitea.youtukeji.com.cn/xiabin/youtu_grpc/pkg/gorm/redis" + "github.com/go-gorm/caches/v4" + "github.com/redis/go-redis/v9" + "gorm.io/gorm" +) + +func NewDBWithCache(dataSource gorm.Dialector, config *gorm.Config, redisClient *redis.Client) (db *gorm.DB, err error) { + db, err = gorm.Open(dataSource, config) + if err != nil { + return + } + err = UseCache(db, redisClient) + return +} + +func UseCache(db *gorm.DB, redisClient *redis.Client) (err error) { + return db.Use(&caches.Caches{Conf: &caches.Config{ + Easer: true, + Cacher: redisCacher.New(redisClient), + }}) +} diff --git a/pkg/gorm/redis/cacher.go b/pkg/gorm/redis/cacher.go new file mode 100644 index 0000000..aff32d1 --- /dev/null +++ b/pkg/gorm/redis/cacher.go @@ -0,0 +1,73 @@ +package redis + +import ( + "context" + "errors" + "fmt" + "github.com/go-gorm/caches/v4" + "github.com/redis/go-redis/v9" + "time" +) + +type Cacher struct { + rdb *redis.Client +} + +func New(rdb *redis.Client) *Cacher { + return &Cacher{rdb: rdb} +} + +func (c *Cacher) Get(ctx context.Context, key string, q *caches.Query[any]) (*caches.Query[any], error) { + res, err := c.rdb.Get(ctx, key).Result() + if errors.Is(err, redis.Nil) { + return nil, nil + } + + if err != nil { + return nil, err + } + + if err := q.Unmarshal([]byte(res)); err != nil { + return nil, err + } + + return q, nil +} + +func (c *Cacher) Store(ctx context.Context, key string, val *caches.Query[any]) error { + res, err := val.Marshal() + if err != nil { + return err + } + + c.rdb.Set(ctx, key, res, 300*time.Second) // Set proper cache time + return nil +} + +func (c *Cacher) Invalidate(ctx context.Context) error { + var ( + cursor uint64 + keys []string + ) + for { + var ( + k []string + err error + ) + k, cursor, err = c.rdb.Scan(ctx, cursor, fmt.Sprintf("%s*", caches.IdentifierPrefix), 0).Result() + if err != nil { + return err + } + keys = append(keys, k...) + if cursor == 0 { + break + } + } + + if len(keys) > 0 { + if _, err := c.rdb.Del(ctx, keys...).Result(); err != nil { + return err + } + } + return nil +}