package svc import ( "context" "encoding/json" "slices" "strings" "sync" "go.etcd.io/etcd/api/v3/mvccpb" "gitea.youtukeji.com.cn/youtu/youtu_grpc/app/admin_service/admin_service" "gitea.youtukeji.com.cn/youtu/youtu_grpc/pkg/config" clientv3 "go.etcd.io/etcd/client/v3" ) // EcpmConfig 定义数据结构 type EcpmConfig struct { AppID string `json:"appId,omitempty"` ECPM float32 `json:"eCPM"` IPU uint32 `json:"IPU"` } // AppData 定义数据结构 type AppData struct { AppID string `json:"appId,omitempty"` AppSecret string `json:"appSecret"` Type int32 `json:"type"` Remark string `json:"remark"` } // MergedAppInfo 合并后的数据结构 type MergedAppInfo struct { AppID string `json:"appId,omitempty"` ECPM float32 `json:"eCPM,omitempty"` IPU uint32 `json:"IPU,omitempty"` AppSecret string `json:"appSecret,omitempty"` Type string `json:"type,omitempty"` Remark string `json:"remark,omitempty"` } type AppDataCache struct { Data []*admin_service.AppInfo lock *sync.RWMutex ecpmData map[string]EcpmConfig appData []AppData client *clientv3.Client } const ( appDataKey = "/youtu/app/account/" ecpmDataKey = "/youtu/app/ecpm/config/" ) func NewAppDataCache(client *clientv3.Client) *AppDataCache { appData := &AppDataCache{ Data: make([]*admin_service.AppInfo, 0), lock: &sync.RWMutex{}, ecpmData: make(map[string]EcpmConfig), appData: make([]AppData, 0), client: client, } go appData.StartWatchAppData(context.Background()) go appData.StartWatchEcpmData(context.Background()) return appData } // StartWatchEcpmData 获取、watch ecpm配置 并存储到ecpmData func (data *AppDataCache) StartWatchEcpmData(ctx context.Context) { ch := make(chan config.WatchKV) go config.EtcdGetOneAndWatch(ctx, data.client, ecpmDataKey, ch) for v := range ch { switch v.Type { case mvccpb.DELETE: data.DeleteEcpmData(strings.TrimPrefix(v.Key, ecpmDataKey)) default: ecpmData := EcpmConfig{} if err := json.Unmarshal(v.Value, &ecpmData); err != nil { panic(err) } ecpmData.AppID = strings.TrimPrefix(v.Key, ecpmDataKey) if d, ok := data.ecpmData[ecpmData.AppID]; ok { if d != ecpmData { data.ecpmData[ecpmData.AppID] = ecpmData } } else { data.ecpmData[ecpmData.AppID] = ecpmData } } data.updateAppData() } } // StartWatchAppData 获取、watch app配置 并k存储到appData func (data *AppDataCache) StartWatchAppData(ctx context.Context) { ch := make(chan config.WatchKV) go config.EtcdGetOneAndWatch(ctx, data.client, appDataKey, ch) for v := range ch { switch v.Type { case mvccpb.DELETE: data.DeleteOne(strings.TrimPrefix(v.Key, appDataKey)) default: appData := AppData{} if err := json.Unmarshal(v.Value, &appData); err != nil { panic(err) } appData.AppID = strings.TrimPrefix(v.Key, appDataKey) data.setAppDate(appData) } data.updateAppData() } } // setAppDate 设置appData func (data *AppDataCache) setAppDate(appData AppData) { data.lock.Lock() defer data.lock.Unlock() for i, app := range data.appData { if app.AppID == appData.AppID { if app != appData { data.appData[i] = appData data.updateAppData() } return } } data.appData = append(data.appData, appData) } // updateAppData 使用ecpmData和appData重新生成appList,使用时需要加锁 func (data *AppDataCache) updateAppData() { clear(data.Data) data.Data = make([]*admin_service.AppInfo, 0, len(data.appData)) for _, app := range data.appData { var ( ecpm float32 ipu uint32 ) if ecpmData, ok := data.ecpmData[app.AppID]; ok { ecpm = ecpmData.ECPM ipu = ecpmData.IPU } data.Data = append(data.Data, &admin_service.AppInfo{ AppId: app.AppID, Secret: app.AppSecret, Type: app.Type, Remark: app.Remark, Ecpm: ecpm, Ipu: ipu, }) } slices.SortFunc(data.Data, func(a, b *admin_service.AppInfo) int { if a.AppId > b.AppId { return 1 } return -1 }) } // GetAppDataCache 获取appDataCache func (data *AppDataCache) GetAppDataCache() []*admin_service.AppInfo { data.lock.RLock() defer data.lock.RUnlock() return data.Data } func (data *AppDataCache) UpdateOne(newData *admin_service.AppInfo) { data.lock.Lock() defer data.lock.Unlock() existingApp, find := data.FindAppById(newData.AppId) var needUpdateAppData, needUpdateEcpmData bool if !find { // 新增 needUpdateAppData = true if newData.Ecpm != 0 || newData.Ipu != 0 { needUpdateEcpmData = true } data.Data = append(data.Data, newData) } else { // 更新 needUpdateAppData = existingApp.Secret != newData.Secret || existingApp.Type != newData.Type || existingApp.Remark != newData.Remark needUpdateEcpmData = existingApp.Ecpm != newData.Ecpm || existingApp.Ipu != newData.Ipu if needUpdateAppData || needUpdateEcpmData { data.Data[data.getIndexById(newData.AppId)] = newData } } // 更新etcd中的appData if needUpdateAppData { appData := &AppData{ AppSecret: newData.Secret, Type: newData.Type, Remark: newData.Remark, } b, _ := json.Marshal(appData) _, _ = data.client.Put(context.Background(), appDataKey+newData.AppId, string(b)) } // 更新etcd中的ecpmData if needUpdateEcpmData { ecpmData := &EcpmConfig{ ECPM: newData.Ecpm, IPU: newData.Ipu, } b, _ := json.Marshal(ecpmData) _, _ = data.client.Put(context.Background(), ecpmDataKey+newData.AppId, string(b)) } } // FindAppById 查找app func (data *AppDataCache) FindAppById(appId string) (*admin_service.AppInfo, bool) { for _, app := range data.Data { if app.AppId == appId { return app, true } } return nil, false } // getIndexById 获取app在data.Data中的索引 func (data *AppDataCache) getIndexById(appId string) int { for i, app := range data.Data { if app.AppId == appId { return i } } return -1 } // DeleteOne 删除一个app func (data *AppDataCache) DeleteOne(appId string) { data.lock.Lock() defer data.lock.Unlock() index := -1 for index = range data.appData { if data.appData[index].AppID == appId { break } } if index == -1 { return } data.appData = slices.Delete(data.appData, index, index+1) data.updateAppData() } // DeleteEcpmData 删除一个ecpmData func (data *AppDataCache) DeleteEcpmData(appId string) { data.lock.Lock() defer data.lock.Unlock() delete(data.ecpmData, appId) } // DeleteFromEtcd 从etcd删除 func (data *AppDataCache) DeleteFromEtcd(ctx context.Context, key string) { _, _ = data.client.Delete(ctx, appDataKey+key) _, _ = data.client.Delete(ctx, ecpmDataKey+key) } // DeleteAllFromEtcd 从etcd删除 func (data *AppDataCache) DeleteAllFromEtcd(ctx context.Context) { _, _ = data.client.Delete(ctx, appDataKey, clientv3.WithPrefix()) _, _ = data.client.Delete(ctx, ecpmDataKey, clientv3.WithPrefix()) } // PutToEtcd 向etcd写入 func (data *AppDataCache) PutToEtcd(ctx context.Context, appInfo *admin_service.AppInfo) { appData := &AppData{ AppSecret: appInfo.Secret, Type: appInfo.Type, Remark: appInfo.Remark, } b, _ := json.Marshal(appData) _, _ = data.client.Put(ctx, appDataKey+appInfo.AppId, string(b)) ecpmData := &EcpmConfig{ ECPM: appInfo.Ecpm, IPU: appInfo.Ipu, } b, _ = json.Marshal(ecpmData) _, _ = data.client.Put(context.Background(), ecpmDataKey+appInfo.AppId, string(b)) }