admin - ecpm

This commit is contained in:
xiabin 2025-02-24 18:47:27 +08:00
parent 0f88992db9
commit ae0fd91158
13 changed files with 825 additions and 149 deletions

View File

@ -2,6 +2,7 @@ package main
import (
"fmt"
"gitea.youtukeji.com.cn/youtu/youtu_grpc/pkg/config"
"gitea.youtukeji.com.cn/youtu/youtu_grpc/app/admin_service/admin_service"

View File

@ -30,8 +30,33 @@ message AppInfo {
uint32 ipu = 7;
}
message CommonResponse {
bool success = 1;
}
message DeleteAppRequest {
string app_id = 1;
}
message AddAppRequest {
AppInfo app_info = 1;
}
message AddAppResponse {
bool success = 1;
}
service admin_service {
rpc Ping(Request) returns(Response);
// app列表
rpc GetAppList(GetAppListRequest) returns(GetAppListResponse);
// UpdateOne app数据
rpc UpdateOne(AppInfo) returns(CommonResponse);
// DeleteApp app
rpc DeleteApp(DeleteAppRequest) returns(CommonResponse);
// AddApp app
rpc AddApp(AddAppRequest) returns(AddAppResponse);
}

View File

@ -281,6 +281,182 @@ func (x *AppInfo) GetIpu() uint32 {
return 0
}
type CommonResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *CommonResponse) Reset() {
*x = CommonResponse{}
mi := &file_admin_service_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *CommonResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CommonResponse) ProtoMessage() {}
func (x *CommonResponse) ProtoReflect() protoreflect.Message {
mi := &file_admin_service_proto_msgTypes[5]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CommonResponse.ProtoReflect.Descriptor instead.
func (*CommonResponse) Descriptor() ([]byte, []int) {
return file_admin_service_proto_rawDescGZIP(), []int{5}
}
func (x *CommonResponse) GetSuccess() bool {
if x != nil {
return x.Success
}
return false
}
type DeleteAppRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
AppId string `protobuf:"bytes,1,opt,name=app_id,json=appId,proto3" json:"app_id,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *DeleteAppRequest) Reset() {
*x = DeleteAppRequest{}
mi := &file_admin_service_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *DeleteAppRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*DeleteAppRequest) ProtoMessage() {}
func (x *DeleteAppRequest) ProtoReflect() protoreflect.Message {
mi := &file_admin_service_proto_msgTypes[6]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use DeleteAppRequest.ProtoReflect.Descriptor instead.
func (*DeleteAppRequest) Descriptor() ([]byte, []int) {
return file_admin_service_proto_rawDescGZIP(), []int{6}
}
func (x *DeleteAppRequest) GetAppId() string {
if x != nil {
return x.AppId
}
return ""
}
type AddAppRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
AppInfo *AppInfo `protobuf:"bytes,1,opt,name=app_info,json=appInfo,proto3" json:"app_info,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *AddAppRequest) Reset() {
*x = AddAppRequest{}
mi := &file_admin_service_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *AddAppRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*AddAppRequest) ProtoMessage() {}
func (x *AddAppRequest) ProtoReflect() protoreflect.Message {
mi := &file_admin_service_proto_msgTypes[7]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use AddAppRequest.ProtoReflect.Descriptor instead.
func (*AddAppRequest) Descriptor() ([]byte, []int) {
return file_admin_service_proto_rawDescGZIP(), []int{7}
}
func (x *AddAppRequest) GetAppInfo() *AppInfo {
if x != nil {
return x.AppInfo
}
return nil
}
type AddAppResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *AddAppResponse) Reset() {
*x = AddAppResponse{}
mi := &file_admin_service_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *AddAppResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*AddAppResponse) ProtoMessage() {}
func (x *AddAppResponse) ProtoReflect() protoreflect.Message {
mi := &file_admin_service_proto_msgTypes[8]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use AddAppResponse.ProtoReflect.Descriptor instead.
func (*AddAppResponse) Descriptor() ([]byte, []int) {
return file_admin_service_proto_rawDescGZIP(), []int{8}
}
func (x *AddAppResponse) GetSuccess() bool {
if x != nil {
return x.Success
}
return false
}
var File_admin_service_proto protoreflect.FileDescriptor
var file_admin_service_proto_rawDesc = string([]byte{
@ -305,19 +481,45 @@ var file_admin_service_proto_rawDesc = string([]byte{
0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09,
0x52, 0x06, 0x72, 0x65, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x12, 0x0a, 0x04, 0x65, 0x63, 0x70, 0x6d,
0x18, 0x06, 0x20, 0x01, 0x28, 0x02, 0x52, 0x04, 0x65, 0x63, 0x70, 0x6d, 0x12, 0x10, 0x0a, 0x03,
0x69, 0x70, 0x75, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, 0x69, 0x70, 0x75, 0x32, 0x9b,
0x01, 0x0a, 0x0d, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
0x12, 0x37, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x16, 0x2e, 0x61, 0x64, 0x6d, 0x69, 0x6e,
0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x17, 0x2e, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x51, 0x0a, 0x0a, 0x47, 0x65, 0x74,
0x41, 0x70, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x20, 0x2e, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x5f,
0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x70, 0x70, 0x4c, 0x69,
0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x61, 0x64, 0x6d, 0x69,
0x6e, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x70, 0x70,
0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x11, 0x5a, 0x0f,
0x2e, 0x2f, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x62,
0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x69, 0x70, 0x75, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x03, 0x69, 0x70, 0x75, 0x22, 0x2a,
0x0a, 0x0e, 0x43, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28,
0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x22, 0x29, 0x0a, 0x10, 0x44, 0x65,
0x6c, 0x65, 0x74, 0x65, 0x41, 0x70, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x15,
0x0a, 0x06, 0x61, 0x70, 0x70, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05,
0x61, 0x70, 0x70, 0x49, 0x64, 0x22, 0x42, 0x0a, 0x0d, 0x41, 0x64, 0x64, 0x41, 0x70, 0x70, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x31, 0x0a, 0x08, 0x61, 0x70, 0x70, 0x5f, 0x69, 0x6e,
0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x61, 0x64, 0x6d, 0x69, 0x6e,
0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x41, 0x70, 0x70, 0x49, 0x6e, 0x66, 0x6f,
0x52, 0x07, 0x61, 0x70, 0x70, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x2a, 0x0a, 0x0e, 0x41, 0x64, 0x64,
0x41, 0x70, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73,
0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75,
0x63, 0x63, 0x65, 0x73, 0x73, 0x32, 0xf3, 0x02, 0x0a, 0x0d, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x5f,
0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x37, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12,
0x16, 0x2e, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x5f,
0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x51, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x41, 0x70, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x20,
0x2e, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x47,
0x65, 0x74, 0x41, 0x70, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x21, 0x2e, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
0x2e, 0x47, 0x65, 0x74, 0x41, 0x70, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x42, 0x0a, 0x09, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x4f, 0x6e, 0x65,
0x12, 0x16, 0x2e, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65,
0x2e, 0x41, 0x70, 0x70, 0x49, 0x6e, 0x66, 0x6f, 0x1a, 0x1d, 0x2e, 0x61, 0x64, 0x6d, 0x69, 0x6e,
0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4b, 0x0a, 0x09, 0x44, 0x65, 0x6c, 0x65, 0x74,
0x65, 0x41, 0x70, 0x70, 0x12, 0x1f, 0x2e, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x5f, 0x73, 0x65, 0x72,
0x76, 0x69, 0x63, 0x65, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x41, 0x70, 0x70, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x5f, 0x73, 0x65,
0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x45, 0x0a, 0x06, 0x41, 0x64, 0x64, 0x41, 0x70, 0x70, 0x12, 0x1c,
0x2e, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x41,
0x64, 0x64, 0x41, 0x70, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x61,
0x64, 0x6d, 0x69, 0x6e, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x41, 0x64, 0x64,
0x41, 0x70, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x11, 0x5a, 0x0f, 0x2e,
0x2f, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
})
var (
@ -332,25 +534,36 @@ func file_admin_service_proto_rawDescGZIP() []byte {
return file_admin_service_proto_rawDescData
}
var file_admin_service_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_admin_service_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
var file_admin_service_proto_goTypes = []any{
(*Request)(nil), // 0: admin_service.Request
(*Response)(nil), // 1: admin_service.Response
(*GetAppListRequest)(nil), // 2: admin_service.GetAppListRequest
(*GetAppListResponse)(nil), // 3: admin_service.GetAppListResponse
(*AppInfo)(nil), // 4: admin_service.AppInfo
(*CommonResponse)(nil), // 5: admin_service.CommonResponse
(*DeleteAppRequest)(nil), // 6: admin_service.DeleteAppRequest
(*AddAppRequest)(nil), // 7: admin_service.AddAppRequest
(*AddAppResponse)(nil), // 8: admin_service.AddAppResponse
}
var file_admin_service_proto_depIdxs = []int32{
4, // 0: admin_service.GetAppListResponse.app_list:type_name -> admin_service.AppInfo
0, // 1: admin_service.admin_service.Ping:input_type -> admin_service.Request
2, // 2: admin_service.admin_service.GetAppList:input_type -> admin_service.GetAppListRequest
1, // 3: admin_service.admin_service.Ping:output_type -> admin_service.Response
3, // 4: admin_service.admin_service.GetAppList:output_type -> admin_service.GetAppListResponse
3, // [3:5] is the sub-list for method output_type
1, // [1:3] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
4, // 1: admin_service.AddAppRequest.app_info:type_name -> admin_service.AppInfo
0, // 2: admin_service.admin_service.Ping:input_type -> admin_service.Request
2, // 3: admin_service.admin_service.GetAppList:input_type -> admin_service.GetAppListRequest
4, // 4: admin_service.admin_service.UpdateOne:input_type -> admin_service.AppInfo
6, // 5: admin_service.admin_service.DeleteApp:input_type -> admin_service.DeleteAppRequest
7, // 6: admin_service.admin_service.AddApp:input_type -> admin_service.AddAppRequest
1, // 7: admin_service.admin_service.Ping:output_type -> admin_service.Response
3, // 8: admin_service.admin_service.GetAppList:output_type -> admin_service.GetAppListResponse
5, // 9: admin_service.admin_service.UpdateOne:output_type -> admin_service.CommonResponse
5, // 10: admin_service.admin_service.DeleteApp:output_type -> admin_service.CommonResponse
8, // 11: admin_service.admin_service.AddApp:output_type -> admin_service.AddAppResponse
7, // [7:12] is the sub-list for method output_type
2, // [2:7] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
}
func init() { file_admin_service_proto_init() }
@ -364,7 +577,7 @@ func file_admin_service_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_admin_service_proto_rawDesc), len(file_admin_service_proto_rawDesc)),
NumEnums: 0,
NumMessages: 5,
NumMessages: 9,
NumExtensions: 0,
NumServices: 1,
},

View File

@ -21,6 +21,9 @@ const _ = grpc.SupportPackageIsVersion9
const (
AdminService_Ping_FullMethodName = "/admin_service.admin_service/Ping"
AdminService_GetAppList_FullMethodName = "/admin_service.admin_service/GetAppList"
AdminService_UpdateOne_FullMethodName = "/admin_service.admin_service/UpdateOne"
AdminService_DeleteApp_FullMethodName = "/admin_service.admin_service/DeleteApp"
AdminService_AddApp_FullMethodName = "/admin_service.admin_service/AddApp"
)
// AdminServiceClient is the client API for AdminService service.
@ -30,6 +33,12 @@ type AdminServiceClient interface {
Ping(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error)
// 获取app列表
GetAppList(ctx context.Context, in *GetAppListRequest, opts ...grpc.CallOption) (*GetAppListResponse, error)
// 修改app列表
UpdateOne(ctx context.Context, in *AppInfo, opts ...grpc.CallOption) (*CommonResponse, error)
// 删除app
DeleteApp(ctx context.Context, in *DeleteAppRequest, opts ...grpc.CallOption) (*CommonResponse, error)
// 添加app
AddApp(ctx context.Context, in *AddAppRequest, opts ...grpc.CallOption) (*AddAppResponse, error)
}
type adminServiceClient struct {
@ -60,6 +69,36 @@ func (c *adminServiceClient) GetAppList(ctx context.Context, in *GetAppListReque
return out, nil
}
func (c *adminServiceClient) UpdateOne(ctx context.Context, in *AppInfo, opts ...grpc.CallOption) (*CommonResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CommonResponse)
err := c.cc.Invoke(ctx, AdminService_UpdateOne_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *adminServiceClient) DeleteApp(ctx context.Context, in *DeleteAppRequest, opts ...grpc.CallOption) (*CommonResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CommonResponse)
err := c.cc.Invoke(ctx, AdminService_DeleteApp_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *adminServiceClient) AddApp(ctx context.Context, in *AddAppRequest, opts ...grpc.CallOption) (*AddAppResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(AddAppResponse)
err := c.cc.Invoke(ctx, AdminService_AddApp_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// AdminServiceServer is the server API for AdminService service.
// All implementations must embed UnimplementedAdminServiceServer
// for forward compatibility.
@ -67,6 +106,12 @@ type AdminServiceServer interface {
Ping(context.Context, *Request) (*Response, error)
// 获取app列表
GetAppList(context.Context, *GetAppListRequest) (*GetAppListResponse, error)
// 修改app列表
UpdateOne(context.Context, *AppInfo) (*CommonResponse, error)
// 删除app
DeleteApp(context.Context, *DeleteAppRequest) (*CommonResponse, error)
// 添加app
AddApp(context.Context, *AddAppRequest) (*AddAppResponse, error)
mustEmbedUnimplementedAdminServiceServer()
}
@ -83,6 +128,15 @@ func (UnimplementedAdminServiceServer) Ping(context.Context, *Request) (*Respons
func (UnimplementedAdminServiceServer) GetAppList(context.Context, *GetAppListRequest) (*GetAppListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetAppList not implemented")
}
func (UnimplementedAdminServiceServer) UpdateOne(context.Context, *AppInfo) (*CommonResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method UpdateOne not implemented")
}
func (UnimplementedAdminServiceServer) DeleteApp(context.Context, *DeleteAppRequest) (*CommonResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method DeleteApp not implemented")
}
func (UnimplementedAdminServiceServer) AddApp(context.Context, *AddAppRequest) (*AddAppResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddApp not implemented")
}
func (UnimplementedAdminServiceServer) mustEmbedUnimplementedAdminServiceServer() {}
func (UnimplementedAdminServiceServer) testEmbeddedByValue() {}
@ -140,6 +194,60 @@ func _AdminService_GetAppList_Handler(srv interface{}, ctx context.Context, dec
return interceptor(ctx, in, info, handler)
}
func _AdminService_UpdateOne_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AppInfo)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AdminServiceServer).UpdateOne(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: AdminService_UpdateOne_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AdminServiceServer).UpdateOne(ctx, req.(*AppInfo))
}
return interceptor(ctx, in, info, handler)
}
func _AdminService_DeleteApp_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeleteAppRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AdminServiceServer).DeleteApp(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: AdminService_DeleteApp_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AdminServiceServer).DeleteApp(ctx, req.(*DeleteAppRequest))
}
return interceptor(ctx, in, info, handler)
}
func _AdminService_AddApp_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AddAppRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(AdminServiceServer).AddApp(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: AdminService_AddApp_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(AdminServiceServer).AddApp(ctx, req.(*AddAppRequest))
}
return interceptor(ctx, in, info, handler)
}
// AdminService_ServiceDesc is the grpc.ServiceDesc for AdminService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -155,6 +263,18 @@ var AdminService_ServiceDesc = grpc.ServiceDesc{
MethodName: "GetAppList",
Handler: _AdminService_GetAppList_Handler,
},
{
MethodName: "UpdateOne",
Handler: _AdminService_UpdateOne_Handler,
},
{
MethodName: "DeleteApp",
Handler: _AdminService_DeleteApp_Handler,
},
{
MethodName: "AddApp",
Handler: _AdminService_AddApp_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "admin_service.proto",

View File

@ -14,7 +14,11 @@ import (
)
type (
AddAppRequest = admin_service.AddAppRequest
AddAppResponse = admin_service.AddAppResponse
AppInfo = admin_service.AppInfo
CommonResponse = admin_service.CommonResponse
DeleteAppRequest = admin_service.DeleteAppRequest
GetAppListRequest = admin_service.GetAppListRequest
GetAppListResponse = admin_service.GetAppListResponse
Request = admin_service.Request
@ -24,6 +28,12 @@ type (
Ping(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error)
// 获取app列表
GetAppList(ctx context.Context, in *GetAppListRequest, opts ...grpc.CallOption) (*GetAppListResponse, error)
// 修改app列表
UpdateOne(ctx context.Context, in *AppInfo, opts ...grpc.CallOption) (*CommonResponse, error)
// 删除app
DeleteApp(ctx context.Context, in *DeleteAppRequest, opts ...grpc.CallOption) (*CommonResponse, error)
// 添加app
AddApp(ctx context.Context, in *AddAppRequest, opts ...grpc.CallOption) (*AddAppResponse, error)
}
defaultAdminService struct {
@ -47,3 +57,21 @@ func (m *defaultAdminService) GetAppList(ctx context.Context, in *GetAppListRequ
client := admin_service.NewAdminServiceClient(m.cli.Conn())
return client.GetAppList(ctx, in, opts...)
}
// 修改app列表
func (m *defaultAdminService) UpdateOne(ctx context.Context, in *AppInfo, opts ...grpc.CallOption) (*CommonResponse, error) {
client := admin_service.NewAdminServiceClient(m.cli.Conn())
return client.UpdateOne(ctx, in, opts...)
}
// 删除app
func (m *defaultAdminService) DeleteApp(ctx context.Context, in *DeleteAppRequest, opts ...grpc.CallOption) (*CommonResponse, error) {
client := admin_service.NewAdminServiceClient(m.cli.Conn())
return client.DeleteApp(ctx, in, opts...)
}
// 添加app
func (m *defaultAdminService) AddApp(ctx context.Context, in *AddAppRequest, opts ...grpc.CallOption) (*AddAppResponse, error) {
client := admin_service.NewAdminServiceClient(m.cli.Conn())
return client.AddApp(ctx, in, opts...)
}

View File

@ -0,0 +1,38 @@
package logic
import (
"context"
"gitea.youtukeji.com.cn/youtu/youtu_grpc/app/admin_service/admin_service"
"gitea.youtukeji.com.cn/youtu/youtu_grpc/app/admin_service/internal/svc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/zeromicro/go-zero/core/logx"
)
type AddAppLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewAddAppLogic(ctx context.Context, svcCtx *svc.ServiceContext) *AddAppLogic {
return &AddAppLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
// AddApp 添加app
func (l *AddAppLogic) AddApp(in *admin_service.AddAppRequest) (*admin_service.AddAppResponse, error) {
// 检查app是否存在
if _, find := l.svcCtx.Cached.FindAppById(in.AppInfo.AppId); find {
return nil, status.Errorf(codes.AlreadyExists, "app %s already exists", in.AppInfo.AppId)
}
// 添加app
l.svcCtx.Cached.PutToEtcd(l.ctx, in.AppInfo)
return &admin_service.AddAppResponse{}, nil
}

View File

@ -0,0 +1,30 @@
package logic
import (
"context"
"gitea.youtukeji.com.cn/youtu/youtu_grpc/app/admin_service/admin_service"
"gitea.youtukeji.com.cn/youtu/youtu_grpc/app/admin_service/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type DeleteAppLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewDeleteAppLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DeleteAppLogic {
return &DeleteAppLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
// DeleteApp 删除app
func (l *DeleteAppLogic) DeleteApp(in *admin_service.DeleteAppRequest) (*admin_service.CommonResponse, error) {
l.svcCtx.Cached.DeleteFromEtcd(l.ctx, in.AppId)
return &admin_service.CommonResponse{Success: true}, nil
}

View File

@ -2,15 +2,8 @@ package logic
import (
"context"
"encoding/json"
"fmt"
"gitea.youtukeji.com.cn/youtu/youtu_grpc/app/admin_service/admin_service"
"slices"
"strings"
"gitea.youtukeji.com.cn/youtu/youtu_grpc/app/admin_service/internal/svc"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/zeromicro/go-zero/core/logx"
)
@ -28,121 +21,9 @@ func NewGetAppListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetApp
}
}
// EcpmConfig 定义数据结构
type EcpmConfig struct {
AppID string `json:"appId"`
ECPM float32 `json:"eCPM"`
IPU uint32 `json:"IPU"`
}
type AppData struct {
AppID string `json:"appId"`
AppSecret string `json:"appSecret"`
Type string `json:"type"`
Remark string `json:"remark"`
}
type MergedAppInfo struct {
AppID string `json:"appId"`
ECPM float32 `json:"eCPM,omitempty"`
IPU uint32 `json:"IPU,omitempty"`
AppSecret string `json:"appSecret,omitempty"`
Type string `json:"type,omitempty"`
Remark string `json:"remark,omitempty"`
}
// GetAppList 获取app列表
func (l *GetAppListLogic) GetAppList(_ *admin_service.GetAppListRequest) (res *admin_service.GetAppListResponse, err error) {
// 从ETCD获取ecpm配置
ecpmResp, err := l.svcCtx.EtcdClient.Get(l.ctx, "/youtu/app/ecpm/config", clientv3.WithPrefix())
if err != nil {
return nil, fmt.Errorf("获取ecpm配置失败: %v", err)
}
// 解析ecpm配置
ecpmConfigs := make([]EcpmConfig, 0, len(ecpmResp.Kvs))
for _, kv := range ecpmResp.Kvs {
cfg := EcpmConfig{}
if err := json.Unmarshal(kv.Value, &cfg); err != nil {
return nil, fmt.Errorf("解析ecpm配置失败: %v", err)
}
cfg.AppID = strings.TrimPrefix(string(kv.Key), "/youtu/app/ecpm/config/")
ecpmConfigs = append(ecpmConfigs, cfg)
}
// 从ETCD获取app数据
appDataResp, err := l.svcCtx.EtcdClient.Get(l.ctx, "/youtu/app/account", clientv3.WithPrefix())
if err != nil {
return nil, fmt.Errorf("获取app数据失败: %v", err)
}
appDatas := make([]AppData, 0, len(appDataResp.Kvs))
for _, kv := range appDataResp.Kvs {
appData := AppData{}
if err := json.Unmarshal(kv.Value, &appData); err != nil {
return nil, fmt.Errorf("解析app数据失败: %v", err)
}
appData.AppID = strings.TrimPrefix(string(kv.Key), "/youtu/app/account/")
appDatas = append(appDatas, appData)
}
// 创建合并映射
merged := make(map[string]*MergedAppInfo)
// 合并ecpm数据
for _, ecpm := range ecpmConfigs {
merged[ecpm.AppID] = &MergedAppInfo{
AppID: ecpm.AppID,
ECPM: ecpm.ECPM,
IPU: ecpm.IPU,
}
}
// 合并app数据
for _, app := range appDatas {
if info, exists := merged[app.AppID]; exists {
info.AppSecret = app.AppSecret
info.Type = app.Type
info.Remark = app.Remark
} else {
merged[app.AppID] = &MergedAppInfo{
AppID: app.AppID,
AppSecret: app.AppSecret,
Type: app.Type,
Remark: app.Remark,
}
}
}
// 构建返回结果
res = &admin_service.GetAppListResponse{
AppList: make([]*admin_service.AppInfo, 0, len(merged)),
}
// 将合并后的数据转换为响应格式
for _, info := range merged {
appInfo := &admin_service.AppInfo{
AppId: info.AppID,
Secret: info.AppSecret,
Ecpm: info.ECPM,
Ipu: info.IPU,
Type: info.Type,
Remark: info.Remark,
}
res.AppList = append(res.AppList, appInfo)
}
slices.SortFunc(res.AppList, func(i, j *admin_service.AppInfo) int {
if i.AppId < j.AppId {
return -1
}
return 1
})
for i, info := range res.AppList {
info.Id = int32(i + 1)
}
return res, nil
return &admin_service.GetAppListResponse{
AppList: l.svcCtx.Cached.GetAppDataCache(),
}, nil
}

View File

@ -0,0 +1,31 @@
package logic
import (
"context"
"gitea.youtukeji.com.cn/youtu/youtu_grpc/app/admin_service/admin_service"
"gitea.youtukeji.com.cn/youtu/youtu_grpc/app/admin_service/internal/svc"
"github.com/zeromicro/go-zero/core/logx"
)
type UpdateOneLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewUpdateOneLogic(ctx context.Context, svcCtx *svc.ServiceContext) *UpdateOneLogic {
return &UpdateOneLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
// UpdateOne 修改app数据
func (l *UpdateOneLogic) UpdateOne(in *admin_service.AppInfo) (*admin_service.CommonResponse, error) {
l.svcCtx.Cached.UpdateOne(in)
return &admin_service.CommonResponse{}, nil
}

View File

@ -33,3 +33,21 @@ func (s *AdminServiceServer) GetAppList(ctx context.Context, in *admin_service.G
l := logic.NewGetAppListLogic(ctx, s.svcCtx)
return l.GetAppList(in)
}
// 修改app列表
func (s *AdminServiceServer) UpdateOne(ctx context.Context, in *admin_service.AppInfo) (*admin_service.CommonResponse, error) {
l := logic.NewUpdateOneLogic(ctx, s.svcCtx)
return l.UpdateOne(in)
}
// 删除app
func (s *AdminServiceServer) DeleteApp(ctx context.Context, in *admin_service.DeleteAppRequest) (*admin_service.CommonResponse, error) {
l := logic.NewDeleteAppLogic(ctx, s.svcCtx)
return l.DeleteApp(in)
}
// 添加app
func (s *AdminServiceServer) AddApp(ctx context.Context, in *admin_service.AddAppRequest) (*admin_service.AddAppResponse, error) {
l := logic.NewAddAppLogic(ctx, s.svcCtx)
return l.AddApp(in)
}

View File

@ -0,0 +1,285 @@
package svc
import (
"context"
"encoding/json"
"slices"
"strings"
"sync"
"go.etcd.io/etcd/api/v3/mvccpb"
"gitea.youtukeji.com.cn/youtu/youtu_grpc/app/admin_service/admin_service"
"gitea.youtukeji.com.cn/youtu/youtu_grpc/pkg/config"
clientv3 "go.etcd.io/etcd/client/v3"
)
// EcpmConfig 定义数据结构
type EcpmConfig struct {
AppID string `json:"appId"`
ECPM float32 `json:"eCPM"`
IPU uint32 `json:"IPU"`
}
// AppData 定义数据结构
type AppData struct {
AppID string `json:"appId,omitempty"`
AppSecret string `json:"appSecret"`
Type string `json:"type"`
Remark string `json:"remark"`
}
// MergedAppInfo 合并后的数据结构
type MergedAppInfo struct {
AppID string `json:"appId,omitempty"`
ECPM float32 `json:"eCPM,omitempty"`
IPU uint32 `json:"IPU,omitempty"`
AppSecret string `json:"appSecret,omitempty"`
Type string `json:"type,omitempty"`
Remark string `json:"remark,omitempty"`
}
type AppDataCache struct {
Data []*admin_service.AppInfo
lock *sync.RWMutex
ecpmData map[string]EcpmConfig
appData []AppData
client *clientv3.Client
}
const (
appDataKey = "/youtu/app/account/"
ecpmDataKey = "/youtu/app/ecpm/config/"
)
func NewAppDataCache(client *clientv3.Client) *AppDataCache {
appData := &AppDataCache{
Data: make([]*admin_service.AppInfo, 0),
lock: &sync.RWMutex{},
ecpmData: make(map[string]EcpmConfig),
appData: make([]AppData, 0),
client: client,
}
go appData.StartWatchAppData(context.Background())
go appData.StartWatchEcpmData(context.Background())
return appData
}
// StartWatchEcpmData 获取、watch ecpm配置 并存储到ecpmData
func (data *AppDataCache) StartWatchEcpmData(ctx context.Context) {
ch := make(chan config.WatchKV)
go config.EtcdGetOneAndWatch(ctx, data.client, ecpmDataKey, ch)
for v := range ch {
switch v.Type {
case mvccpb.DELETE:
data.DeleteEcpmData(strings.TrimPrefix(v.Key, ecpmDataKey))
default:
ecpmData := EcpmConfig{}
if err := json.Unmarshal(v.Value, &ecpmData); err != nil {
panic(err)
}
ecpmData.AppID = strings.TrimPrefix(v.Key, ecpmDataKey)
if d, ok := data.ecpmData[ecpmData.AppID]; ok {
if d != ecpmData {
data.ecpmData[ecpmData.AppID] = ecpmData
}
} else {
data.ecpmData[ecpmData.AppID] = ecpmData
}
}
data.updateAppData()
}
}
// StartWatchAppData 获取、watch app配置 并k存储到appData
func (data *AppDataCache) StartWatchAppData(ctx context.Context) {
ch := make(chan config.WatchKV)
go config.EtcdGetOneAndWatch(ctx, data.client, appDataKey, ch)
for v := range ch {
switch v.Type {
case mvccpb.DELETE:
data.DeleteOne(strings.TrimPrefix(v.Key, appDataKey))
default:
appData := AppData{}
if err := json.Unmarshal(v.Value, &appData); err != nil {
panic(err)
}
appData.AppID = strings.TrimPrefix(v.Key, appDataKey)
data.setAppDate(appData)
}
data.updateAppData()
}
}
// setAppDate 设置appData
func (data *AppDataCache) setAppDate(appData AppData) {
data.lock.Lock()
defer data.lock.Unlock()
for i, app := range data.appData {
if app.AppID == appData.AppID {
if app != appData {
data.appData[i] = appData
data.updateAppData()
}
return
}
}
data.appData = append(data.appData, appData)
}
// updateAppData 使用ecpmData和appData重新生成appList,使用时需要加锁
func (data *AppDataCache) updateAppData() {
clear(data.Data)
data.Data = make([]*admin_service.AppInfo, 0, len(data.appData))
for _, app := range data.appData {
var (
ecpm float32
ipu uint32
)
if ecpmData, ok := data.ecpmData[app.AppID]; ok {
ecpm = ecpmData.ECPM
ipu = ecpmData.IPU
}
data.Data = append(data.Data, &admin_service.AppInfo{
AppId: app.AppID,
Secret: app.AppSecret,
Type: app.Type,
Remark: app.Remark,
Ecpm: ecpm,
Ipu: ipu,
})
}
slices.SortFunc(data.Data, func(a, b *admin_service.AppInfo) int {
if a.AppId > b.AppId {
return 1
}
return -1
})
}
// GetAppDataCache 获取appDataCache
func (data *AppDataCache) GetAppDataCache() []*admin_service.AppInfo {
data.lock.RLock()
defer data.lock.RUnlock()
return data.Data
}
func (data *AppDataCache) UpdateOne(newData *admin_service.AppInfo) {
data.lock.Lock()
defer data.lock.Unlock()
existingApp, find := data.FindAppById(newData.AppId)
var needUpdateAppData, needUpdateEcpmData bool
if !find { // 新增
needUpdateAppData = true
if newData.Ecpm != 0 || newData.Ipu != 0 {
needUpdateEcpmData = true
}
data.Data = append(data.Data, newData)
} else { // 更新
needUpdateAppData = existingApp.Secret != newData.Secret || existingApp.Type != newData.Type || existingApp.Remark != newData.Remark
needUpdateEcpmData = existingApp.Ecpm != newData.Ecpm || existingApp.Ipu != newData.Ipu
if needUpdateAppData || needUpdateEcpmData {
data.Data[data.getIndexById(newData.AppId)] = newData
}
}
// 更新etcd中的appData
if needUpdateAppData {
appData := &AppData{
AppSecret: newData.Secret,
Type: newData.Type,
Remark: newData.Remark,
}
b, _ := json.Marshal(appData)
_, _ = data.client.Put(context.Background(), appDataKey+newData.AppId, string(b))
}
// 更新etcd中的ecpmData
if needUpdateEcpmData {
ecpmData := &EcpmConfig{
ECPM: newData.Ecpm,
IPU: newData.Ipu,
}
b, _ := json.Marshal(ecpmData)
_, _ = data.client.Put(context.Background(), ecpmDataKey+newData.AppId, string(b))
}
}
// FindAppById 查找app
func (data *AppDataCache) FindAppById(appId string) (*admin_service.AppInfo, bool) {
for _, app := range data.Data {
if app.AppId == appId {
return app, true
}
}
return nil, false
}
// getIndexById 获取app在data.Data中的索引
func (data *AppDataCache) getIndexById(appId string) int {
for i, app := range data.Data {
if app.AppId == appId {
return i
}
}
return -1
}
// DeleteOne 删除一个app
func (data *AppDataCache) DeleteOne(appId string) {
data.lock.Lock()
defer data.lock.Unlock()
index := -1
for index = range data.appData {
if data.appData[index].AppID == appId {
break
}
}
if index == -1 {
return
}
data.appData = slices.Delete(data.appData, index, index+1)
data.updateAppData()
}
// DeleteEcpmData 删除一个ecpmData
func (data *AppDataCache) DeleteEcpmData(appId string) {
data.lock.Lock()
defer data.lock.Unlock()
delete(data.ecpmData, appId)
}
// DeleteFromEtcd 从etcd删除
func (data *AppDataCache) DeleteFromEtcd(ctx context.Context, key string) {
_, _ = data.client.Delete(ctx, appDataKey+key)
_, _ = data.client.Delete(ctx, ecpmDataKey+key)
}
// PutToEtcd 向etcd写入
func (data *AppDataCache) PutToEtcd(ctx context.Context, appInfo *admin_service.AppInfo) {
appData := &AppData{
AppSecret: appInfo.Secret,
Type: appInfo.Type,
Remark: appInfo.Remark,
}
b, _ := json.Marshal(appData)
_, _ = data.client.Put(ctx, appDataKey+appInfo.AppId, string(b))
ecpmData := &EcpmConfig{
ECPM: appInfo.Ecpm,
IPU: appInfo.Ipu,
}
b, _ = json.Marshal(ecpmData)
_, _ = data.client.Put(context.Background(), ecpmDataKey+appInfo.AppId, string(b))
}

View File

@ -11,6 +11,7 @@ import (
type ServiceContext struct {
Config config.Config
EtcdClient *clientv3.Client
Cached *AppDataCache
}
func NewServiceContext(c config.Config) *ServiceContext {
@ -22,8 +23,11 @@ func NewServiceContext(c config.Config) *ServiceContext {
log.Fatal(err)
}
return &ServiceContext{
svc := &ServiceContext{
Config: c,
EtcdClient: etcdClient,
Cached: NewAppDataCache(etcdClient),
}
return svc
}

View File

@ -6,6 +6,7 @@ import (
"github.com/spf13/viper"
_ "github.com/spf13/viper/remote"
"github.com/zeromicro/go-zero/zrpc"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"os"
)
@ -80,6 +81,7 @@ func SetConfig(b, serverName string) (err error) {
type WatchKV struct {
Key string
Value []byte
Type mvccpb.Event_EventType
}
// EtcdGetOneAndWatch 获取etcd配置并监听对应的key
@ -96,7 +98,7 @@ func EtcdGetOneAndWatch(ctx context.Context, cli *clientv3.Client, key string, c
resCh := cli.Watch(ctx, key, clientv3.WithPrefix())
for res := range resCh {
for _, event := range res.Events {
ch <- WatchKV{Key: string(event.Kv.Key), Value: event.Kv.Value}
ch <- WatchKV{Key: string(event.Kv.Key), Value: event.Kv.Value, Type: event.Type}
}
}