From ebdf2be6bcaee556d743776d32da4a21405dd38f Mon Sep 17 00:00:00 2001 From: shibukazu Date: Fri, 23 Aug 2024 22:24:48 +0900 Subject: [PATCH 1/9] impl --- docker-compose.yml | 38 ++++- go/cmd/run/run.go | 94 ++++++++++- go/pkg/appError/serverError.go | 11 +- go/pkg/config/config.go | 14 ++ go/pkg/server/grpc.go | 35 ++-- go/pkg/services/slave/v1/register.go | 16 ++ go/pkg/services/slave/v1/service.go | 17 ++ go/pkg/slave/manager.go | 28 ++++ go/proto/slave/v1/slave.pb.go | 237 +++++++++++++++++++++++++++ go/proto/slave/v1/slave.pb.gw.go | 163 ++++++++++++++++++ go/proto/slave/v1/slave_grpc.pb.go | 110 +++++++++++++ openapi/openapi.swagger.json | 86 ++++++++-- proto/slave/v1/slave.proto | 30 ++++ 13 files changed, 840 insertions(+), 39 deletions(-) create mode 100644 go/pkg/services/slave/v1/register.go create mode 100644 go/pkg/services/slave/v1/service.go create mode 100644 go/pkg/slave/manager.go create mode 100644 go/proto/slave/v1/slave.pb.go create mode 100644 go/proto/slave/v1/slave.pb.gw.go create mode 100644 go/proto/slave/v1/slave_grpc.pb.go create mode 100644 proto/slave/v1/slave.proto diff --git a/docker-compose.yml b/docker-compose.yml index 46a5dfe..f600750 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,10 +9,10 @@ services: networks: - default restart: unless-stopped - server: + master-node: build: context: . - container_name: server + container_name: master-node ports: - "8080:8080" - "9000:9000" @@ -21,6 +21,40 @@ services: depends_on: - redis environment: + - OPEN-VE_MODE=master + - OPEN-VE_HTTP_ADDR= + - OPEN-VE_HTTP_CORS_ALLOWED_ORIGINS= + - OPEN-VE_HTTP_CORS_ALLOWED_HEADERS= + - OPEN-VE_HTTP_TLS_ENABLED= + - OPEN-VE_HTTP_TLS_CERT_PATH= + - OPEN-VE_HTTP_TLS_KEY_PATH= + - OPEN-VE_GRPC_ADDR= + - OPEN-VE_GRPC_TLS_ENABLED= + - OPEN-VE_GRPC_TLS_CERT_PATH= + - OPEN-VE_GRPC_TLS_KEY_PATH= + - OPEN-VE_STORE_ENGINE= + - OPEN-VE_STORE_REDIS_ADDR= + - OPEN-VE_STORE_REDIS_PASSWORD= + - OPEN-VE_STORE_REDIS_DB= + - OPEN-VE_STORE_REDIS_POOL_SIZE= + - OPEN-VE_LOG_LEVEL= + slave-node: + build: + context: . + container_name: slave-node + ports: + - "8090:8080" + - "9010:9000" + networks: + - default + depends_on: + - redis + - master-node + environment: + - OPEN-VE_MODE=slave + - OPEN-VE_SLAVE_ID=slave-node + - OPEN-VE_SLAVE_MASTER_GRPC_ADDR=master-node:9000 + - OPEN-VE_SLAVE_MASTER_TLS_ENABLED= - OPEN-VE_HTTP_ADDR= - OPEN-VE_HTTP_CORS_ALLOWED_ORIGINS= - OPEN-VE_HTTP_CORS_ALLOWED_HEADERS= diff --git a/go/cmd/run/run.go b/go/cmd/run/run.go index 4180313..2a880d1 100644 --- a/go/cmd/run/run.go +++ b/go/cmd/run/run.go @@ -2,22 +2,33 @@ package run import ( "context" + "crypto/tls" + "fmt" + "log" "log/slog" "os" "os/signal" "sync" "syscall" + "time" "github.com/go-redis/redis" + "github.com/morikuni/failure/v2" + "github.com/shibukazu/open-ve/go/pkg/appError" "github.com/shibukazu/open-ve/go/pkg/config" "github.com/shibukazu/open-ve/go/pkg/dsl/reader" "github.com/shibukazu/open-ve/go/pkg/logger" "github.com/shibukazu/open-ve/go/pkg/server" + "github.com/shibukazu/open-ve/go/pkg/slave" storePkg "github.com/shibukazu/open-ve/go/pkg/store" "github.com/shibukazu/open-ve/go/pkg/validator" + pbSlave "github.com/shibukazu/open-ve/go/proto/slave/v1" "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/spf13/viper" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" ) func NewRunCommand() *cobra.Command { @@ -25,13 +36,45 @@ func NewRunCommand() *cobra.Command { Use: "run", Short: "Run the Open-VE server.", Long: "Run the Open-VE server.", - Run: run, - Args: cobra.NoArgs, + PreRunE: func(cmd *cobra.Command, args []string) error { + mode := viper.GetString("mode") + if mode == "slave" { + id := viper.GetString("slave.id") + if id == "" { + return failure.New(appError.ErrConfigFileSyntaxError, failure.Message("ID of the slave server is required")) + } + masterAddr := viper.GetString("slave.masterGRPCAddr") + if masterAddr == "" { + return failure.New(appError.ErrConfigFileSyntaxError, failure.Message("gRPC address of the master server is required")) + } + } + return nil + }, + Run: run, + Args: cobra.NoArgs, } defaultConfig := config.DefaultConfig() flags := cmd.Flags() + // Mode + flags.String("mode", defaultConfig.Mode, "mode (master, slave)") + MustBindPFlag("mode", flags.Lookup("mode")) + viper.MustBindEnv("mode", "OPEN-VE_MODE") + + // Slave (If mode is slave, this is required) + flags.String("slave-id", defaultConfig.Slave.Id, "ID of the slave server") + MustBindPFlag("slave.id", flags.Lookup("slave-id")) + viper.MustBindEnv("slave.id", "OPEN-VE_SLAVE_ID") + + flags.String("slave-master-grpc-addr", defaultConfig.Slave.MasterGRPCAddr, "gRPC address of the master server") + MustBindPFlag("slave.masterGRPCAddr", flags.Lookup("slave-master-grpc-addr")) + viper.MustBindEnv("slave.masterGRPCAddr", "OPEN-VE_SLAVE_MASTER_GRPC_ADDR") + + flags.Bool("slave-master-grpc-tls-enabled", defaultConfig.Slave.MasterGRPCTLSEnabled, "connect to master server with TLS") + MustBindPFlag("slave.masterGRPCTLSEnabled", flags.Lookup("slave-master-grpc-tls-enabled")) + viper.MustBindEnv("slave.masterGRPCTLSEnabled", "OPEN-VE_SLAVE_MASTER_GRPC_TLS_ENABLED") + // HTTP flags.String("http-addr", defaultConfig.Http.Addr, "HTTP server address") MustBindPFlag("http.addr", flags.Lookup("http-addr")) @@ -152,6 +195,7 @@ func run(cmd *cobra.Command, args []string) { dslReader := reader.NewDSLReader(logger, store) validator := validator.NewValidator(logger, store) + slaveManager := slave.NewSlaveManager(logger) gw := server.NewGateway(&cfg.Http, &cfg.GRPC, logger, dslReader) wg.Add(1) @@ -162,13 +206,55 @@ func run(cmd *cobra.Command, args []string) { gw.Run(ctx, wg) }(wg) - grpc := server.NewGrpc(&cfg.GRPC, logger, validator, dslReader) + grpc := server.NewGrpc(&cfg.GRPC, logger, validator, dslReader, slaveManager) wg.Add(1) go func(wg *sync.WaitGroup) { logger.Info("πŸš€ grpc server: starting..") - grpc.Run(ctx, wg) + grpc.Run(ctx, wg, cfg.Mode) }(wg) + if cfg.Mode == "slave" { + go func() { + registerSlave(ctx, cfg, logger) + }() + } + wg.Wait() logger.Info("πŸ›‘ all server: stopped") } + +func registerSlave(ctx context.Context, cfg config.Config, logger *slog.Logger) { + var opts []grpc.DialOption + + if cfg.Slave.MasterGRPCTLSEnabled { + creds := credentials.NewTLS(&tls.Config{}) + opts = append(opts, grpc.WithTransportCredentials(creds)) + } + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), grpc.WithTimeout(5*time.Second)) + + conn, err := grpc.Dial(cfg.Slave.MasterGRPCAddr, opts...) + if err != nil { + log.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + client := pbSlave.NewSlaveServiceClient(conn) + + for { + select { + case <-ctx.Done(): + return + case <-time.Tick(5 * time.Second): + _, err := client.Register(ctx, &pbSlave.RegisterRequest{ + Id: cfg.Slave.Id, + Address: cfg.GRPC.Addr, + ValidationIds: []string{"validation1", "validation2"}, + }) + if err != nil { + logger.Error(failure.Translate(err, appError.ErrSlaveRegistrationFailed, failure.Message("Failed to register to master")).Error()) + } else { + logger.Info(fmt.Sprintf("πŸ““ slave (%s) registration success", cfg.Slave.Id)) + } + } + } +} diff --git a/go/pkg/appError/serverError.go b/go/pkg/appError/serverError.go index bd32663..5cd98b8 100644 --- a/go/pkg/appError/serverError.go +++ b/go/pkg/appError/serverError.go @@ -1,9 +1,10 @@ package appError const ( - ErrServerStartFailed = "ServerStartFailed" - ErrServerShutdownFailed = "ServerShutdownFailed" - ErrServerInternalError = "ServerInternalError" - ErrConfigFileNotFound = "ConfigFileNotFound" - ErrConfigFileSyntaxError = "ConfigFileSyntaxError" + ErrServerStartFailed = "ServerStartFailed" + ErrServerShutdownFailed = "ServerShutdownFailed" + ErrServerInternalError = "ServerInternalError" + ErrConfigFileNotFound = "ConfigFileNotFound" + ErrConfigFileSyntaxError = "ConfigFileSyntaxError" + ErrSlaveRegistrationFailed = "SlaveRegistrationFailed" ) diff --git a/go/pkg/config/config.go b/go/pkg/config/config.go index 495b7bb..6d54ff5 100644 --- a/go/pkg/config/config.go +++ b/go/pkg/config/config.go @@ -1,12 +1,20 @@ package config type Config struct { + Mode string `yaml:"mode"` + Slave SlaveConfig `yaml:"slave"` Http HttpConfig `yaml:"http"` GRPC GRPCConfig `yaml:"grpc"` Store StoreConfig `yaml:"store"` Log LogConfig `yaml:"log"` } +type SlaveConfig struct { + Id string `yaml:"id"` + MasterGRPCTLSEnabled bool `yaml:"masterGRPCTLSEnabled"` + MasterGRPCAddr string `yaml:"masterGRPCAddr"` +} + type HttpConfig struct { Addr string `yaml:"addr"` CORSAllowedOrigins []string `yaml:"corsAllowedOrigins"` @@ -43,6 +51,12 @@ type TLSConfig struct { func DefaultConfig() *Config { return &Config{ + Mode: "master", + Slave: SlaveConfig{ + Id: "", + MasterGRPCAddr: "", + MasterGRPCTLSEnabled: false, + }, Http: HttpConfig{ Addr: ":8080", CORSAllowedOrigins: []string{"*"}, diff --git a/go/pkg/server/grpc.go b/go/pkg/server/grpc.go index 2b8d955..e76c05f 100644 --- a/go/pkg/server/grpc.go +++ b/go/pkg/server/grpc.go @@ -13,37 +13,45 @@ import ( "github.com/shibukazu/open-ve/go/pkg/config" "github.com/shibukazu/open-ve/go/pkg/dsl/reader" svcDSL "github.com/shibukazu/open-ve/go/pkg/services/dsl/v1" + svcSlave "github.com/shibukazu/open-ve/go/pkg/services/slave/v1" svcValidate "github.com/shibukazu/open-ve/go/pkg/services/validate/v1" + "github.com/shibukazu/open-ve/go/pkg/slave" "github.com/shibukazu/open-ve/go/pkg/validator" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/reflection" pbDSL "github.com/shibukazu/open-ve/go/proto/dsl/v1" + pbSlave "github.com/shibukazu/open-ve/go/proto/slave/v1" pbValidate "github.com/shibukazu/open-ve/go/proto/validate/v1" ) type GRPC struct { - dslReader *reader.DSLReader - validator *validator.Validator - gRPCConfig *config.GRPCConfig - logger *slog.Logger - server *grpc.Server + dslReader *reader.DSLReader + validator *validator.Validator + slaveManager *slave.SlaveManager + gRPCConfig *config.GRPCConfig + logger *slog.Logger + server *grpc.Server } func NewGrpc( gRPCConfig *config.GRPCConfig, logger *slog.Logger, - validator *validator.Validator, dslReader *reader.DSLReader) *GRPC { + validator *validator.Validator, + dslReader *reader.DSLReader, + slaveManager *slave.SlaveManager, +) *GRPC { return &GRPC{ - validator: validator, - dslReader: dslReader, - gRPCConfig: gRPCConfig, - logger: logger, + validator: validator, + dslReader: dslReader, + slaveManager: slaveManager, + gRPCConfig: gRPCConfig, + logger: logger, } } -func (g *GRPC) Run(ctx context.Context, wg *sync.WaitGroup) { +func (g *GRPC) Run(ctx context.Context, wg *sync.WaitGroup, mode string) { listen, err := net.Listen("tcp", g.gRPCConfig.Addr) if err != nil { @@ -71,6 +79,11 @@ func (g *GRPC) Run(ctx context.Context, wg *sync.WaitGroup) { dslService := svcDSL.NewService(ctx, g.dslReader) pbDSL.RegisterDSLServiceServer(g.server, dslService) + if mode == "master" { + slaveService := svcSlave.NewService(ctx, g.slaveManager) + pbSlave.RegisterSlaveServiceServer(g.server, slaveService) + } + reflection.Register(g.server) go func() { diff --git a/go/pkg/services/slave/v1/register.go b/go/pkg/services/slave/v1/register.go new file mode 100644 index 0000000..4fbcd1a --- /dev/null +++ b/go/pkg/services/slave/v1/register.go @@ -0,0 +1,16 @@ +package slavev1 + +import ( + "context" + + pb "github.com/shibukazu/open-ve/go/proto/slave/v1" +) + +func (s *Service) Register(ctx context.Context, req *pb.RegisterRequest) (*pb.RegisterResponse, error) { + s.slaveManager.RegisterSlave( + req.Id, + req.Address, + req.ValidationIds, + ) + return &pb.RegisterResponse{}, nil +} diff --git a/go/pkg/services/slave/v1/service.go b/go/pkg/services/slave/v1/service.go new file mode 100644 index 0000000..a2b4bd5 --- /dev/null +++ b/go/pkg/services/slave/v1/service.go @@ -0,0 +1,17 @@ +package slavev1 + +import ( + "context" + + "github.com/shibukazu/open-ve/go/pkg/slave" + pb "github.com/shibukazu/open-ve/go/proto/slave/v1" +) + +type Service struct { + pb.UnimplementedSlaveServiceServer + slaveManager *slave.SlaveManager +} + +func NewService(ctx context.Context, slaveManager *slave.SlaveManager) *Service { + return &Service{slaveManager: slaveManager} +} diff --git a/go/pkg/slave/manager.go b/go/pkg/slave/manager.go new file mode 100644 index 0000000..d488f1d --- /dev/null +++ b/go/pkg/slave/manager.go @@ -0,0 +1,28 @@ +package slave + +import ( + "log/slog" + "sync" +) + +type SlaveManager struct { + Slaves map[string]*Slave + logger *slog.Logger + mu sync.RWMutex +} + +type Slave struct { + Id string + Addr string + ValidationIds []string +} + +func NewSlaveManager(logger *slog.Logger) *SlaveManager { + return &SlaveManager{Slaves: map[string]*Slave{}, logger: logger} +} + +func (m *SlaveManager) RegisterSlave(id, addr string, validationIds []string) { + m.mu.Lock() + m.Slaves[id] = &Slave{Id: id, Addr: addr, ValidationIds: validationIds} + m.mu.Unlock() +} diff --git a/go/proto/slave/v1/slave.pb.go b/go/proto/slave/v1/slave.pb.go new file mode 100644 index 0000000..4bb0794 --- /dev/null +++ b/go/proto/slave/v1/slave.pb.go @@ -0,0 +1,237 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.1 +// protoc (unknown) +// source: proto/slave/v1/slave.proto + +package v1 + +import ( + _ "github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2/options" + _ "google.golang.org/genproto/googleapis/api/annotations" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type RegisterRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` + ValidationIds []string `protobuf:"bytes,3,rep,name=validation_ids,json=validationIds,proto3" json:"validation_ids,omitempty"` +} + +func (x *RegisterRequest) Reset() { + *x = RegisterRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_slave_v1_slave_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RegisterRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterRequest) ProtoMessage() {} + +func (x *RegisterRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_slave_v1_slave_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterRequest.ProtoReflect.Descriptor instead. +func (*RegisterRequest) Descriptor() ([]byte, []int) { + return file_proto_slave_v1_slave_proto_rawDescGZIP(), []int{0} +} + +func (x *RegisterRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *RegisterRequest) GetAddress() string { + if x != nil { + return x.Address + } + return "" +} + +func (x *RegisterRequest) GetValidationIds() []string { + if x != nil { + return x.ValidationIds + } + return nil +} + +type RegisterResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *RegisterResponse) Reset() { + *x = RegisterResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_slave_v1_slave_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RegisterResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterResponse) ProtoMessage() {} + +func (x *RegisterResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_slave_v1_slave_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterResponse.ProtoReflect.Descriptor instead. +func (*RegisterResponse) Descriptor() ([]byte, []int) { + return file_proto_slave_v1_slave_proto_rawDescGZIP(), []int{1} +} + +var File_proto_slave_v1_slave_proto protoreflect.FileDescriptor + +var file_proto_slave_v1_slave_proto_rawDesc = []byte{ + 0x0a, 0x1a, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6c, 0x61, 0x76, 0x65, 0x2f, 0x76, 0x31, + 0x2f, 0x73, 0x6c, 0x61, 0x76, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x73, 0x6c, + 0x61, 0x76, 0x65, 0x2e, 0x76, 0x31, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, + 0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x5f, 0x62, 0x65, 0x68, 0x61, 0x76, 0x69, 0x6f, 0x72, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x2d, 0x67, 0x65, + 0x6e, 0x2d, 0x6f, 0x70, 0x65, 0x6e, 0x61, 0x70, 0x69, 0x76, 0x32, 0x2f, 0x6f, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x71, 0x0a, 0x0f, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, + 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x13, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x42, 0x03, 0xe0, 0x41, 0x02, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1d, 0x0a, + 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x42, 0x03, + 0xe0, 0x41, 0x02, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x2a, 0x0a, 0x0e, + 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x03, + 0x20, 0x03, 0x28, 0x09, 0x42, 0x03, 0xe0, 0x41, 0x02, 0x52, 0x0d, 0x76, 0x61, 0x6c, 0x69, 0x64, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x73, 0x22, 0x12, 0x0a, 0x10, 0x52, 0x65, 0x67, 0x69, + 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x9a, 0x01, 0x0a, + 0x0c, 0x53, 0x6c, 0x61, 0x76, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, 0x01, + 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x19, 0x2e, 0x73, 0x6c, 0x61, + 0x76, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x73, 0x6c, 0x61, 0x76, 0x65, 0x2e, 0x76, 0x31, + 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x46, 0x92, 0x41, 0x26, 0x0a, 0x05, 0x53, 0x6c, 0x61, 0x76, 0x65, 0x12, 0x13, 0x52, + 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x20, 0x53, 0x6c, 0x61, 0x76, 0x65, 0x20, 0x4e, 0x6f, + 0x64, 0x65, 0x2a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x82, 0xd3, 0xe4, 0x93, + 0x02, 0x17, 0x3a, 0x01, 0x2a, 0x22, 0x12, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x6c, 0x61, 0x76, 0x65, + 0x2f, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x42, 0x10, 0x5a, 0x0e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2f, 0x73, 0x6c, 0x61, 0x76, 0x65, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, +} + +var ( + file_proto_slave_v1_slave_proto_rawDescOnce sync.Once + file_proto_slave_v1_slave_proto_rawDescData = file_proto_slave_v1_slave_proto_rawDesc +) + +func file_proto_slave_v1_slave_proto_rawDescGZIP() []byte { + file_proto_slave_v1_slave_proto_rawDescOnce.Do(func() { + file_proto_slave_v1_slave_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_slave_v1_slave_proto_rawDescData) + }) + return file_proto_slave_v1_slave_proto_rawDescData +} + +var file_proto_slave_v1_slave_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_proto_slave_v1_slave_proto_goTypes = []interface{}{ + (*RegisterRequest)(nil), // 0: slave.v1.RegisterRequest + (*RegisterResponse)(nil), // 1: slave.v1.RegisterResponse +} +var file_proto_slave_v1_slave_proto_depIdxs = []int32{ + 0, // 0: slave.v1.SlaveService.Register:input_type -> slave.v1.RegisterRequest + 1, // 1: slave.v1.SlaveService.Register:output_type -> slave.v1.RegisterResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_proto_slave_v1_slave_proto_init() } +func file_proto_slave_v1_slave_proto_init() { + if File_proto_slave_v1_slave_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_slave_v1_slave_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RegisterRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_slave_v1_slave_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RegisterResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_slave_v1_slave_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_slave_v1_slave_proto_goTypes, + DependencyIndexes: file_proto_slave_v1_slave_proto_depIdxs, + MessageInfos: file_proto_slave_v1_slave_proto_msgTypes, + }.Build() + File_proto_slave_v1_slave_proto = out.File + file_proto_slave_v1_slave_proto_rawDesc = nil + file_proto_slave_v1_slave_proto_goTypes = nil + file_proto_slave_v1_slave_proto_depIdxs = nil +} diff --git a/go/proto/slave/v1/slave.pb.gw.go b/go/proto/slave/v1/slave.pb.gw.go new file mode 100644 index 0000000..81c2cc8 --- /dev/null +++ b/go/proto/slave/v1/slave.pb.gw.go @@ -0,0 +1,163 @@ +// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT. +// source: proto/slave/v1/slave.proto + +/* +Package v1 is a reverse proxy. + +It translates gRPC into RESTful JSON APIs. +*/ +package v1 + +import ( + "context" + "io" + "net/http" + + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "github.com/grpc-ecosystem/grpc-gateway/v2/utilities" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +// Suppress "imported and not used" errors +var _ codes.Code +var _ io.Reader +var _ status.Status +var _ = runtime.String +var _ = utilities.NewDoubleArray +var _ = metadata.Join + +func request_SlaveService_Register_0(ctx context.Context, marshaler runtime.Marshaler, client SlaveServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq RegisterRequest + var metadata runtime.ServerMetadata + + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := client.Register(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_SlaveService_Register_0(ctx context.Context, marshaler runtime.Marshaler, server SlaveServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq RegisterRequest + var metadata runtime.ServerMetadata + + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := server.Register(ctx, &protoReq) + return msg, metadata, err + +} + +// RegisterSlaveServiceHandlerServer registers the http handlers for service SlaveService to "mux". +// UnaryRPC :call SlaveServiceServer directly. +// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. +// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterSlaveServiceHandlerFromEndpoint instead. +func RegisterSlaveServiceHandlerServer(ctx context.Context, mux *runtime.ServeMux, server SlaveServiceServer) error { + + mux.Handle("POST", pattern_SlaveService_Register_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/slave.v1.SlaveService/Register", runtime.WithHTTPPathPattern("/v1/slave/register")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_SlaveService_Register_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_SlaveService_Register_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +// RegisterSlaveServiceHandlerFromEndpoint is same as RegisterSlaveServiceHandler but +// automatically dials to "endpoint" and closes the connection when "ctx" gets done. +func RegisterSlaveServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { + conn, err := grpc.NewClient(endpoint, opts...) + if err != nil { + return err + } + defer func() { + if err != nil { + if cerr := conn.Close(); cerr != nil { + grpclog.Errorf("Failed to close conn to %s: %v", endpoint, cerr) + } + return + } + go func() { + <-ctx.Done() + if cerr := conn.Close(); cerr != nil { + grpclog.Errorf("Failed to close conn to %s: %v", endpoint, cerr) + } + }() + }() + + return RegisterSlaveServiceHandler(ctx, mux, conn) +} + +// RegisterSlaveServiceHandler registers the http handlers for service SlaveService to "mux". +// The handlers forward requests to the grpc endpoint over "conn". +func RegisterSlaveServiceHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error { + return RegisterSlaveServiceHandlerClient(ctx, mux, NewSlaveServiceClient(conn)) +} + +// RegisterSlaveServiceHandlerClient registers the http handlers for service SlaveService +// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "SlaveServiceClient". +// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "SlaveServiceClient" +// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in +// "SlaveServiceClient" to call the correct interceptors. +func RegisterSlaveServiceHandlerClient(ctx context.Context, mux *runtime.ServeMux, client SlaveServiceClient) error { + + mux.Handle("POST", pattern_SlaveService_Register_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/slave.v1.SlaveService/Register", runtime.WithHTTPPathPattern("/v1/slave/register")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_SlaveService_Register_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_SlaveService_Register_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +var ( + pattern_SlaveService_Register_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "slave", "register"}, "")) +) + +var ( + forward_SlaveService_Register_0 = runtime.ForwardResponseMessage +) diff --git a/go/proto/slave/v1/slave_grpc.pb.go b/go/proto/slave/v1/slave_grpc.pb.go new file mode 100644 index 0000000..a47f89e --- /dev/null +++ b/go/proto/slave/v1/slave_grpc.pb.go @@ -0,0 +1,110 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.4.0 +// - protoc (unknown) +// source: proto/slave/v1/slave.proto + +package v1 + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.62.0 or later. +const _ = grpc.SupportPackageIsVersion8 + +const ( + SlaveService_Register_FullMethodName = "/slave.v1.SlaveService/Register" +) + +// SlaveServiceClient is the client API for SlaveService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type SlaveServiceClient interface { + Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error) +} + +type slaveServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewSlaveServiceClient(cc grpc.ClientConnInterface) SlaveServiceClient { + return &slaveServiceClient{cc} +} + +func (c *slaveServiceClient) Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(RegisterResponse) + err := c.cc.Invoke(ctx, SlaveService_Register_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// SlaveServiceServer is the server API for SlaveService service. +// All implementations must embed UnimplementedSlaveServiceServer +// for forward compatibility +type SlaveServiceServer interface { + Register(context.Context, *RegisterRequest) (*RegisterResponse, error) + mustEmbedUnimplementedSlaveServiceServer() +} + +// UnimplementedSlaveServiceServer must be embedded to have forward compatible implementations. +type UnimplementedSlaveServiceServer struct { +} + +func (UnimplementedSlaveServiceServer) Register(context.Context, *RegisterRequest) (*RegisterResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Register not implemented") +} +func (UnimplementedSlaveServiceServer) mustEmbedUnimplementedSlaveServiceServer() {} + +// UnsafeSlaveServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to SlaveServiceServer will +// result in compilation errors. +type UnsafeSlaveServiceServer interface { + mustEmbedUnimplementedSlaveServiceServer() +} + +func RegisterSlaveServiceServer(s grpc.ServiceRegistrar, srv SlaveServiceServer) { + s.RegisterService(&SlaveService_ServiceDesc, srv) +} + +func _SlaveService_Register_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RegisterRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SlaveServiceServer).Register(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: SlaveService_Register_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SlaveServiceServer).Register(ctx, req.(*RegisterRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// SlaveService_ServiceDesc is the grpc.ServiceDesc for SlaveService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var SlaveService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "slave.v1.SlaveService", + HandlerType: (*SlaveServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Register", + Handler: _SlaveService_Register_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "proto/slave/v1/slave.proto", +} diff --git a/openapi/openapi.swagger.json b/openapi/openapi.swagger.json index ddb32f3..0ea2563 100644 --- a/openapi/openapi.swagger.json +++ b/openapi/openapi.swagger.json @@ -69,7 +69,7 @@ "200": { "description": "A successful response.", "schema": { - "$ref": "#/definitions/RegisterResponse" + "$ref": "#/definitions/dsl.v1.RegisterResponse" } } }, @@ -79,7 +79,7 @@ "in": "body", "required": true, "schema": { - "$ref": "#/definitions/RegisterRequest" + "$ref": "#/definitions/dsl.v1.RegisterRequest" } } ], @@ -87,6 +87,33 @@ "DSL" ] } + }, + "/v1/slave/register": { + "post": { + "summary": "Register Slave Node", + "operationId": "Register", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/slave.v1.RegisterResponse" + } + } + }, + "parameters": [ + { + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/slave.v1.RegisterRequest" + } + } + ], + "tags": [ + "Slave" + ] + } } }, "definitions": { @@ -159,21 +186,6 @@ } } }, - "RegisterRequest": { - "type": "object", - "properties": { - "validations": { - "type": "array", - "items": { - "type": "object", - "$ref": "#/definitions/dsl.v1.Validation" - } - } - } - }, - "RegisterResponse": { - "type": "object" - }, "ValidationResult": { "type": "object", "properties": { @@ -208,6 +220,21 @@ "type" ] }, + "dsl.v1.RegisterRequest": { + "type": "object", + "properties": { + "validations": { + "type": "array", + "items": { + "type": "object", + "$ref": "#/definitions/dsl.v1.Validation" + } + } + } + }, + "dsl.v1.RegisterResponse": { + "type": "object" + }, "dsl.v1.Validation": { "type": "object", "properties": { @@ -243,6 +270,31 @@ "variables" ] }, + "slave.v1.RegisterRequest": { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "address": { + "type": "string" + }, + "validationIds": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "required": [ + "id", + "address", + "validationIds" + ] + }, + "slave.v1.RegisterResponse": { + "type": "object" + }, "validate.v1.Validation": { "type": "object", "properties": { diff --git a/proto/slave/v1/slave.proto b/proto/slave/v1/slave.proto new file mode 100644 index 0000000..58c4ed6 --- /dev/null +++ b/proto/slave/v1/slave.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; +package slave.v1; + +import "google/api/annotations.proto"; +import "google/api/field_behavior.proto"; +import "protoc-gen-openapiv2/options/annotations.proto"; + +option go_package = "proto/slave/v1"; + +message RegisterRequest { + string id = 1 [(google.api.field_behavior) = REQUIRED]; + string address = 2 [(google.api.field_behavior) = REQUIRED]; + repeated string validation_ids = 3 [(google.api.field_behavior) = REQUIRED]; +} +message RegisterResponse {} + +service SlaveService { + rpc Register(RegisterRequest) returns (RegisterResponse) { + option (google.api.http) = { + post: "/v1/slave/register" + body: "*" + }; + + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + summary: "Register Slave Node" + tags: ["Slave"] + operation_id: "Register" + }; + } +} From dccb558d7a0e88696ca659172a9c69e5d2524749 Mon Sep 17 00:00:00 2001 From: shibukazu Date: Fri, 23 Aug 2024 23:22:48 +0900 Subject: [PATCH 2/9] fix --- docker-compose.yml | 4 +- go/cmd/run/run.go | 55 ++-------------- go/pkg/services/slave/v1/register.go | 1 + go/pkg/slave/manager.go | 5 +- go/pkg/slave/registrar.go | 96 ++++++++++++++++++++++++++++ go/proto/slave/v1/slave.pb.go | 54 +++++++++------- openapi/openapi.swagger.json | 3 + proto/slave/v1/slave.proto | 3 +- 8 files changed, 146 insertions(+), 75 deletions(-) create mode 100644 go/pkg/slave/registrar.go diff --git a/docker-compose.yml b/docker-compose.yml index f600750..2c9a1e0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -32,7 +32,7 @@ services: - OPEN-VE_GRPC_TLS_ENABLED= - OPEN-VE_GRPC_TLS_CERT_PATH= - OPEN-VE_GRPC_TLS_KEY_PATH= - - OPEN-VE_STORE_ENGINE= + - OPEN-VE_STORE_ENGINE=redis - OPEN-VE_STORE_REDIS_ADDR= - OPEN-VE_STORE_REDIS_PASSWORD= - OPEN-VE_STORE_REDIS_DB= @@ -65,7 +65,7 @@ services: - OPEN-VE_GRPC_TLS_ENABLED= - OPEN-VE_GRPC_TLS_CERT_PATH= - OPEN-VE_GRPC_TLS_KEY_PATH= - - OPEN-VE_STORE_ENGINE= + - OPEN-VE_STORE_ENGINE=redis - OPEN-VE_STORE_REDIS_ADDR= - OPEN-VE_STORE_REDIS_PASSWORD= - OPEN-VE_STORE_REDIS_DB= diff --git a/go/cmd/run/run.go b/go/cmd/run/run.go index 2a880d1..c316144 100644 --- a/go/cmd/run/run.go +++ b/go/cmd/run/run.go @@ -2,15 +2,11 @@ package run import ( "context" - "crypto/tls" - "fmt" - "log" "log/slog" "os" "os/signal" "sync" "syscall" - "time" "github.com/go-redis/redis" "github.com/morikuni/failure/v2" @@ -22,13 +18,9 @@ import ( "github.com/shibukazu/open-ve/go/pkg/slave" storePkg "github.com/shibukazu/open-ve/go/pkg/store" "github.com/shibukazu/open-ve/go/pkg/validator" - pbSlave "github.com/shibukazu/open-ve/go/proto/slave/v1" "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/spf13/viper" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" ) func NewRunCommand() *cobra.Command { @@ -214,47 +206,14 @@ func run(cmd *cobra.Command, args []string) { }(wg) if cfg.Mode == "slave" { - go func() { - registerSlave(ctx, cfg, logger) - }() + wg.Add(1) + slaveRegistrar := slave.NewSlaveRegistrar(cfg.Slave.Id, cfg.GRPC.Addr, cfg.GRPC.TLS.Enabled, cfg.Slave.MasterGRPCAddr, cfg.Slave.MasterGRPCTLSEnabled, dslReader, logger) + go func(wg *sync.WaitGroup) { + logger.Info("πŸš€ slave registration timer: starting..") + slaveRegistrar.RegisterTimer(ctx, wg) + }(wg) } wg.Wait() - logger.Info("πŸ›‘ all server: stopped") -} - -func registerSlave(ctx context.Context, cfg config.Config, logger *slog.Logger) { - var opts []grpc.DialOption - - if cfg.Slave.MasterGRPCTLSEnabled { - creds := credentials.NewTLS(&tls.Config{}) - opts = append(opts, grpc.WithTransportCredentials(creds)) - } - opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), grpc.WithTimeout(5*time.Second)) - - conn, err := grpc.Dial(cfg.Slave.MasterGRPCAddr, opts...) - if err != nil { - log.Fatalf("Failed to connect: %v", err) - } - defer conn.Close() - - client := pbSlave.NewSlaveServiceClient(conn) - - for { - select { - case <-ctx.Done(): - return - case <-time.Tick(5 * time.Second): - _, err := client.Register(ctx, &pbSlave.RegisterRequest{ - Id: cfg.Slave.Id, - Address: cfg.GRPC.Addr, - ValidationIds: []string{"validation1", "validation2"}, - }) - if err != nil { - logger.Error(failure.Translate(err, appError.ErrSlaveRegistrationFailed, failure.Message("Failed to register to master")).Error()) - } else { - logger.Info(fmt.Sprintf("πŸ““ slave (%s) registration success", cfg.Slave.Id)) - } - } - } + logger.Info("πŸ›‘ all server and timer: stopped") } diff --git a/go/pkg/services/slave/v1/register.go b/go/pkg/services/slave/v1/register.go index 4fbcd1a..6578163 100644 --- a/go/pkg/services/slave/v1/register.go +++ b/go/pkg/services/slave/v1/register.go @@ -10,6 +10,7 @@ func (s *Service) Register(ctx context.Context, req *pb.RegisterRequest) (*pb.Re s.slaveManager.RegisterSlave( req.Id, req.Address, + req.TlsEnabled, req.ValidationIds, ) return &pb.RegisterResponse{}, nil diff --git a/go/pkg/slave/manager.go b/go/pkg/slave/manager.go index d488f1d..93ab843 100644 --- a/go/pkg/slave/manager.go +++ b/go/pkg/slave/manager.go @@ -14,6 +14,7 @@ type SlaveManager struct { type Slave struct { Id string Addr string + TLSEnabled bool ValidationIds []string } @@ -21,8 +22,8 @@ func NewSlaveManager(logger *slog.Logger) *SlaveManager { return &SlaveManager{Slaves: map[string]*Slave{}, logger: logger} } -func (m *SlaveManager) RegisterSlave(id, addr string, validationIds []string) { +func (m *SlaveManager) RegisterSlave(id, addr string, tlsEnabled bool, validationIds []string) { m.mu.Lock() - m.Slaves[id] = &Slave{Id: id, Addr: addr, ValidationIds: validationIds} + m.Slaves[id] = &Slave{Id: id, Addr: addr, ValidationIds: validationIds, TLSEnabled: tlsEnabled} m.mu.Unlock() } diff --git a/go/pkg/slave/registrar.go b/go/pkg/slave/registrar.go new file mode 100644 index 0000000..8b9d266 --- /dev/null +++ b/go/pkg/slave/registrar.go @@ -0,0 +1,96 @@ +package slave + +import ( + "context" + "crypto/tls" + "log" + "log/slog" + "sync" + "time" + + "github.com/morikuni/failure/v2" + "github.com/shibukazu/open-ve/go/pkg/appError" + "github.com/shibukazu/open-ve/go/pkg/dsl/reader" + pb "github.com/shibukazu/open-ve/go/proto/slave/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" +) + +type SlaveRegistrar struct { + Id string + Address string + TLSEnabled bool + dslReader *reader.DSLReader + gRPCClient pb.SlaveServiceClient + gRPCConn *grpc.ClientConn + logger *slog.Logger +} + +func NewSlaveRegistrar(id, address string, tlsEnabled bool, masterAddress string, masterTLSEnabled bool, dslReader *reader.DSLReader, logger *slog.Logger) *SlaveRegistrar { + var opts []grpc.DialOption + + if masterTLSEnabled { + creds := credentials.NewTLS(&tls.Config{}) + opts = append(opts, grpc.WithTransportCredentials(creds)) + } + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), grpc.WithTimeout(5*time.Second)) + + conn, err := grpc.Dial(masterAddress, opts...) + if err != nil { + log.Fatalf("Failed to connect: %v", err) + } + + gRPCClient := pb.NewSlaveServiceClient(conn) + + return &SlaveRegistrar{ + Id: id, + Address: address, + TLSEnabled: tlsEnabled, + dslReader: dslReader, + gRPCClient: gRPCClient, + gRPCConn: conn, + logger: logger, + } +} + +func (s *SlaveRegistrar) RegisterTimer(ctx context.Context, wg *sync.WaitGroup) { + s.logger.Info("🟒 slave registration timer started") + s.register(ctx) + ticker := time.NewTicker(30 * time.Second) + for { + select { + case <-ctx.Done(): + ticker.Stop() + s.gRPCConn.Close() + s.logger.Info("πŸ›‘ slave registration timer stopped") + wg.Done() + return + case <-ticker.C: + s.register(ctx) + } + } +} + +func (s *SlaveRegistrar) register(ctx context.Context) { + dsl, err := s.dslReader.Read(ctx) + if err != nil { + s.logger.Error(err.Error()) + return + } + validationIds := make([]string, len(dsl.Validations)) + for i, validation := range dsl.Validations { + validationIds[i] = validation.ID + } + _, err = s.gRPCClient.Register(ctx, &pb.RegisterRequest{ + Id: s.Id, + Address: s.Address, + TlsEnabled: s.TLSEnabled, + ValidationIds: validationIds, + }) + if err != nil { + s.logger.Error(failure.Translate(err, appError.ErrSlaveRegistrationFailed, failure.Message("Failed to register to master")).Error()) + } else { + s.logger.Info("πŸ““ slave registration success") + } +} diff --git a/go/proto/slave/v1/slave.pb.go b/go/proto/slave/v1/slave.pb.go index 4bb0794..af14b49 100644 --- a/go/proto/slave/v1/slave.pb.go +++ b/go/proto/slave/v1/slave.pb.go @@ -29,7 +29,8 @@ type RegisterRequest struct { Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` - ValidationIds []string `protobuf:"bytes,3,rep,name=validation_ids,json=validationIds,proto3" json:"validation_ids,omitempty"` + TlsEnabled bool `protobuf:"varint,3,opt,name=tls_enabled,json=tlsEnabled,proto3" json:"tls_enabled,omitempty"` + ValidationIds []string `protobuf:"bytes,4,rep,name=validation_ids,json=validationIds,proto3" json:"validation_ids,omitempty"` } func (x *RegisterRequest) Reset() { @@ -78,6 +79,13 @@ func (x *RegisterRequest) GetAddress() string { return "" } +func (x *RegisterRequest) GetTlsEnabled() bool { + if x != nil { + return x.TlsEnabled + } + return false +} + func (x *RegisterRequest) GetValidationIds() []string { if x != nil { return x.ValidationIds @@ -135,27 +143,29 @@ var file_proto_slave_v1_slave_proto_rawDesc = []byte{ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x2d, 0x67, 0x65, 0x6e, 0x2d, 0x6f, 0x70, 0x65, 0x6e, 0x61, 0x70, 0x69, 0x76, 0x32, 0x2f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x71, 0x0a, 0x0f, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, - 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x13, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x42, 0x03, 0xe0, 0x41, 0x02, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1d, 0x0a, - 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x42, 0x03, - 0xe0, 0x41, 0x02, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x2a, 0x0a, 0x0e, - 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x73, 0x18, 0x03, - 0x20, 0x03, 0x28, 0x09, 0x42, 0x03, 0xe0, 0x41, 0x02, 0x52, 0x0d, 0x76, 0x61, 0x6c, 0x69, 0x64, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x73, 0x22, 0x12, 0x0a, 0x10, 0x52, 0x65, 0x67, 0x69, - 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x9a, 0x01, 0x0a, - 0x0c, 0x53, 0x6c, 0x61, 0x76, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x89, 0x01, - 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x19, 0x2e, 0x73, 0x6c, 0x61, - 0x76, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x73, 0x6c, 0x61, 0x76, 0x65, 0x2e, 0x76, 0x31, - 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x46, 0x92, 0x41, 0x26, 0x0a, 0x05, 0x53, 0x6c, 0x61, 0x76, 0x65, 0x12, 0x13, 0x52, - 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x20, 0x53, 0x6c, 0x61, 0x76, 0x65, 0x20, 0x4e, 0x6f, - 0x64, 0x65, 0x2a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x82, 0xd3, 0xe4, 0x93, - 0x02, 0x17, 0x3a, 0x01, 0x2a, 0x22, 0x12, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x6c, 0x61, 0x76, 0x65, - 0x2f, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x42, 0x10, 0x5a, 0x0e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2f, 0x73, 0x6c, 0x61, 0x76, 0x65, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x92, 0x01, 0x0a, 0x0f, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, + 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x13, 0x0a, 0x02, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x03, 0xe0, 0x41, 0x02, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1d, + 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x42, + 0x03, 0xe0, 0x41, 0x02, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1f, 0x0a, + 0x0b, 0x74, 0x6c, 0x73, 0x5f, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x0a, 0x74, 0x6c, 0x73, 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x12, 0x2a, + 0x0a, 0x0e, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x73, + 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x42, 0x03, 0xe0, 0x41, 0x02, 0x52, 0x0d, 0x76, 0x61, 0x6c, + 0x69, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x73, 0x22, 0x12, 0x0a, 0x10, 0x52, 0x65, + 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x9a, + 0x01, 0x0a, 0x0c, 0x53, 0x6c, 0x61, 0x76, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, + 0x89, 0x01, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x19, 0x2e, 0x73, + 0x6c, 0x61, 0x76, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x73, 0x6c, 0x61, 0x76, 0x65, 0x2e, + 0x76, 0x31, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x46, 0x92, 0x41, 0x26, 0x0a, 0x05, 0x53, 0x6c, 0x61, 0x76, 0x65, 0x12, + 0x13, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x20, 0x53, 0x6c, 0x61, 0x76, 0x65, 0x20, + 0x4e, 0x6f, 0x64, 0x65, 0x2a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x82, 0xd3, + 0xe4, 0x93, 0x02, 0x17, 0x3a, 0x01, 0x2a, 0x22, 0x12, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x6c, 0x61, + 0x76, 0x65, 0x2f, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x42, 0x10, 0x5a, 0x0e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6c, 0x61, 0x76, 0x65, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/openapi/openapi.swagger.json b/openapi/openapi.swagger.json index 0ea2563..06d4249 100644 --- a/openapi/openapi.swagger.json +++ b/openapi/openapi.swagger.json @@ -279,6 +279,9 @@ "address": { "type": "string" }, + "tlsEnabled": { + "type": "boolean" + }, "validationIds": { "type": "array", "items": { diff --git a/proto/slave/v1/slave.proto b/proto/slave/v1/slave.proto index 58c4ed6..b28706c 100644 --- a/proto/slave/v1/slave.proto +++ b/proto/slave/v1/slave.proto @@ -10,7 +10,8 @@ option go_package = "proto/slave/v1"; message RegisterRequest { string id = 1 [(google.api.field_behavior) = REQUIRED]; string address = 2 [(google.api.field_behavior) = REQUIRED]; - repeated string validation_ids = 3 [(google.api.field_behavior) = REQUIRED]; + bool tls_enabled = 3; + repeated string validation_ids = 4 [(google.api.field_behavior) = REQUIRED]; } message RegisterResponse {} From 57b3368d5cc469a6163fad1e1b4ec0c9cb15c0da Mon Sep 17 00:00:00 2001 From: shibukazu Date: Fri, 23 Aug 2024 23:34:02 +0900 Subject: [PATCH 3/9] fix store --- go/cmd/run/run.go | 11 +++++++++-- go/pkg/store/memory.go | 17 +++++++++-------- go/pkg/store/redis.go | 19 +++++++++---------- go/pkg/store/util.go | 8 ++++---- 4 files changed, 31 insertions(+), 24 deletions(-) diff --git a/go/cmd/run/run.go b/go/cmd/run/run.go index c316144..a5c0628 100644 --- a/go/cmd/run/run.go +++ b/go/cmd/run/run.go @@ -169,6 +169,13 @@ func run(cmd *cobra.Command, args []string) { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, os.Kill) defer cancel() + var nodeId string + if cfg.Mode == "master" { + nodeId = "master" + } else { + nodeId = cfg.Slave.Id + } + var store storePkg.Store switch cfg.Store.Engine { case "redis": @@ -178,9 +185,9 @@ func run(cmd *cobra.Command, args []string) { DB: cfg.Store.Redis.DB, PoolSize: cfg.Store.Redis.PoolSize, }) - store = storePkg.NewRedisStore(redis) + store = storePkg.NewRedisStore(nodeId, redis) case "memory": - store = storePkg.NewMemoryStore() + store = storePkg.NewMemoryStore(nodeId) default: panic("invalid store engine") } diff --git a/go/pkg/store/memory.go b/go/pkg/store/memory.go index 2050367..8f85f33 100644 --- a/go/pkg/store/memory.go +++ b/go/pkg/store/memory.go @@ -11,13 +11,14 @@ import ( ) type MemoryStore struct { + id string memory map[string][]byte mu sync.RWMutex } -func NewMemoryStore() *MemoryStore { +func NewMemoryStore(id string) *MemoryStore { mamory := make(map[string][]byte) - return &MemoryStore{memory: mamory} + return &MemoryStore{id: id, memory: mamory} } func (s *MemoryStore) Reset() error { @@ -35,7 +36,7 @@ func (s *MemoryStore) WriteSchema(dsl *dsl.DSL) error { return failure.Translate(err, appError.ErrDSLSyntaxError) } s.mu.Lock() - s.memory["schema"] = dslJson.Bytes() + s.memory[s.id+":schema"] = dslJson.Bytes() s.mu.Unlock() return nil @@ -44,7 +45,7 @@ func (s *MemoryStore) WriteSchema(dsl *dsl.DSL) error { func (s *MemoryStore) ReadSchema() (*dsl.DSL, error) { dsl := &dsl.DSL{} s.mu.RLock() - dslJSON, ok := s.memory["schema"] + dslJSON, ok := s.memory[s.id+":schema"] s.mu.RUnlock() if !ok { return nil, failure.New(appError.ErrRedisOperationFailed) @@ -62,14 +63,14 @@ func (s *MemoryStore) WriteVariables(id string, variables []dsl.Variable) error return failure.Translate(err, appError.ErrDSLSyntaxError) } s.mu.Lock() - s.memory[getVariablesID(id)] = variablesJson + s.memory[getVariablesID(s.id, id)] = variablesJson s.mu.Unlock() return nil } func (s *MemoryStore) ReadVariables(id string) ([]dsl.Variable, error) { s.mu.RLock() - variablesJSON, ok := s.memory[getVariablesID(id)] + variablesJSON, ok := s.memory[getVariablesID(s.id, id)] s.mu.RUnlock() if !ok { return nil, failure.New(appError.ErrRedisOperationFailed) @@ -91,14 +92,14 @@ func (s *MemoryStore) WriteAllEncodedAST(id string, allEncodedAST [][]byte) erro return err } s.mu.Lock() - s.memory[getAstID(id)] = jsonEncodedAllEncodedAST + s.memory[getAstID(s.id, id)] = jsonEncodedAllEncodedAST s.mu.Unlock() return nil } func (s *MemoryStore) ReadAllEncodedAST(id string) ([][]byte, error) { s.mu.RLock() - jsonEncodedAllEncodedAST, ok := s.memory[getAstID(id)] + jsonEncodedAllEncodedAST, ok := s.memory[getAstID(s.id, id)] s.mu.RUnlock() if !ok { return nil, failure.New(appError.ErrRedisOperationFailed) diff --git a/go/pkg/store/redis.go b/go/pkg/store/redis.go index 34fd21e..96dd0d9 100644 --- a/go/pkg/store/redis.go +++ b/go/pkg/store/redis.go @@ -11,11 +11,12 @@ import ( ) type RedisStore struct { + id string redisClient *redis.Client } -func NewRedisStore(redisClient *redis.Client) *RedisStore { - return &RedisStore{redisClient: redisClient} +func NewRedisStore(id string, redisClient *redis.Client) *RedisStore { + return &RedisStore{id: id, redisClient: redisClient} } func (s *RedisStore) Reset() error { @@ -32,7 +33,7 @@ func (s *RedisStore) WriteSchema(dsl *dsl.DSL) error { if err := enc.Encode(dsl); err != nil { return failure.Translate(err, appError.ErrDSLSyntaxError) } - if err := s.redisClient.Set("schema", dslJson.String(), 0).Err(); err != nil { + if err := s.redisClient.Set(s.id+":schema", dslJson.String(), 0).Err(); err != nil { return failure.Translate(err, appError.ErrRedisOperationFailed) } return nil @@ -40,7 +41,7 @@ func (s *RedisStore) WriteSchema(dsl *dsl.DSL) error { func (s *RedisStore) ReadSchema() (*dsl.DSL, error) { dsl := &dsl.DSL{} - dslJSON, err := s.redisClient.Get("schema").Bytes() + dslJSON, err := s.redisClient.Get(s.id + ":schema").Bytes() if err != nil { return nil, failure.Translate(err, appError.ErrRedisOperationFailed) } @@ -56,14 +57,14 @@ func (s *RedisStore) WriteVariables(id string, variables []dsl.Variable) error { if err != nil { return failure.Translate(err, appError.ErrDSLSyntaxError) } - if err := s.redisClient.Set(getVariablesID(id), variablesJson, 0).Err(); err != nil { + if err := s.redisClient.Set(getVariablesID(s.id, id), variablesJson, 0).Err(); err != nil { return failure.Translate(err, appError.ErrRedisOperationFailed) } return nil } func (s *RedisStore) ReadVariables(id string) ([]dsl.Variable, error) { - variablesJson, err := s.redisClient.Get(getVariablesID(id)).Bytes() + variablesJson, err := s.redisClient.Get(getVariablesID(s.id, id)).Bytes() if err != nil { return nil, failure.Translate(err, appError.ErrRedisOperationFailed) } @@ -85,14 +86,14 @@ func (s *RedisStore) WriteAllEncodedAST(id string, allEncodedAST [][]byte) error return err } - if err := s.redisClient.Set(getAstID(id), jsonEncodedAllEncodedAST, 0).Err(); err != nil { + if err := s.redisClient.Set(getAstID(s.id, id), jsonEncodedAllEncodedAST, 0).Err(); err != nil { return failure.Translate(err, appError.ErrRedisOperationFailed) } return nil } func (s *RedisStore) ReadAllEncodedAST(id string) ([][]byte, error) { - jsonEncodedAllEncodedAST, err := s.redisClient.Get(getAstID(id)).Bytes() + jsonEncodedAllEncodedAST, err := s.redisClient.Get(getAstID(s.id, id)).Bytes() if err != nil { return nil, failure.Translate(err, appError.ErrRedisOperationFailed) } @@ -102,5 +103,3 @@ func (s *RedisStore) ReadAllEncodedAST(id string) ([][]byte, error) { } return allEncodedAST, nil } - - diff --git a/go/pkg/store/util.go b/go/pkg/store/util.go index e6b086b..68a67ad 100644 --- a/go/pkg/store/util.go +++ b/go/pkg/store/util.go @@ -8,12 +8,12 @@ import ( "github.com/shibukazu/open-ve/go/pkg/appError" ) -func getVariablesID(id string) string { - return "variables:" + id +func getVariablesID(nodeId string, id string) string { + return nodeId + ":variables:" + id } -func getAstID(id string) string { - return "ast:" + id +func getAstID(nodeId string, id string) string { + return nodeId + ":ast:" + id } func jsonEncodeAllEncodedAST(allEncodedAST [][]byte) ([]byte, error) { From db3aa53cf903b7d13afd8ecdd19d036644b770be Mon Sep 17 00:00:00 2001 From: shibukazu Date: Fri, 23 Aug 2024 23:51:51 +0900 Subject: [PATCH 4/9] fix address --- docker-compose.yml | 9 +++++---- go/cmd/run/run.go | 22 +++++++++++++++------- go/pkg/config/config.go | 10 ++++++---- go/pkg/server/gateway.go | 6 +++--- go/pkg/server/grpc.go | 2 +- go/pkg/slave/registrar.go | 6 +++--- 6 files changed, 33 insertions(+), 22 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 2c9a1e0..0a8e15b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,13 +22,13 @@ services: - redis environment: - OPEN-VE_MODE=master - - OPEN-VE_HTTP_ADDR= + - OPEN-VE_HTTP_PORT= - OPEN-VE_HTTP_CORS_ALLOWED_ORIGINS= - OPEN-VE_HTTP_CORS_ALLOWED_HEADERS= - OPEN-VE_HTTP_TLS_ENABLED= - OPEN-VE_HTTP_TLS_CERT_PATH= - OPEN-VE_HTTP_TLS_KEY_PATH= - - OPEN-VE_GRPC_ADDR= + - OPEN-VE_GRPC_PORT= - OPEN-VE_GRPC_TLS_ENABLED= - OPEN-VE_GRPC_TLS_CERT_PATH= - OPEN-VE_GRPC_TLS_KEY_PATH= @@ -53,15 +53,16 @@ services: environment: - OPEN-VE_MODE=slave - OPEN-VE_SLAVE_ID=slave-node + - OPEN-VE_SLAVE_SLAVE_GRPC_ADDR=slave-node:9000 - OPEN-VE_SLAVE_MASTER_GRPC_ADDR=master-node:9000 - OPEN-VE_SLAVE_MASTER_TLS_ENABLED= - - OPEN-VE_HTTP_ADDR= + - OPEN-VE_HTTP_PORT= - OPEN-VE_HTTP_CORS_ALLOWED_ORIGINS= - OPEN-VE_HTTP_CORS_ALLOWED_HEADERS= - OPEN-VE_HTTP_TLS_ENABLED= - OPEN-VE_HTTP_TLS_CERT_PATH= - OPEN-VE_HTTP_TLS_KEY_PATH= - - OPEN-VE_GRPC_ADDR= + - OPEN-VE_GRPC_PORT= - OPEN-VE_GRPC_TLS_ENABLED= - OPEN-VE_GRPC_TLS_CERT_PATH= - OPEN-VE_GRPC_TLS_KEY_PATH= diff --git a/go/cmd/run/run.go b/go/cmd/run/run.go index a5c0628..24b909c 100644 --- a/go/cmd/run/run.go +++ b/go/cmd/run/run.go @@ -35,6 +35,10 @@ func NewRunCommand() *cobra.Command { if id == "" { return failure.New(appError.ErrConfigFileSyntaxError, failure.Message("ID of the slave server is required")) } + slaveAddr := viper.GetString("slave.slaveGRPCAddr") + if slaveAddr == "" { + return failure.New(appError.ErrConfigFileSyntaxError, failure.Message("gRPC address of the slave server is required")) + } masterAddr := viper.GetString("slave.masterGRPCAddr") if masterAddr == "" { return failure.New(appError.ErrConfigFileSyntaxError, failure.Message("gRPC address of the master server is required")) @@ -59,6 +63,10 @@ func NewRunCommand() *cobra.Command { MustBindPFlag("slave.id", flags.Lookup("slave-id")) viper.MustBindEnv("slave.id", "OPEN-VE_SLAVE_ID") + flags.String("slave-slave-grpc-addr", defaultConfig.Slave.SlaveGRPCAddr, "gRPC address of the slave server") + MustBindPFlag("slave.slaveGRPCAddr", flags.Lookup("slave-slave-grpc-addr")) + viper.MustBindEnv("slave.slaveGRPCAddr", "OPEN-VE_SLAVE_SLAVE_GRPC_ADDR") + flags.String("slave-master-grpc-addr", defaultConfig.Slave.MasterGRPCAddr, "gRPC address of the master server") MustBindPFlag("slave.masterGRPCAddr", flags.Lookup("slave-master-grpc-addr")) viper.MustBindEnv("slave.masterGRPCAddr", "OPEN-VE_SLAVE_MASTER_GRPC_ADDR") @@ -68,9 +76,9 @@ func NewRunCommand() *cobra.Command { viper.MustBindEnv("slave.masterGRPCTLSEnabled", "OPEN-VE_SLAVE_MASTER_GRPC_TLS_ENABLED") // HTTP - flags.String("http-addr", defaultConfig.Http.Addr, "HTTP server address") - MustBindPFlag("http.addr", flags.Lookup("http-addr")) - viper.MustBindEnv("http.addr", "OPEN-VE_HTTP_ADDR") + flags.String("http-port", defaultConfig.Http.Port, "HTTP server port") + MustBindPFlag("http.port", flags.Lookup("http-port")) + viper.MustBindEnv("http.port", "OPEN-VE_HTTP_PORT") flags.StringSlice("http-cors-allowed-origins", defaultConfig.Http.CORSAllowedOrigins, "CORS allowed origins") MustBindPFlag("http.corsAllowedOrigins", flags.Lookup("http-cors-allowed-origins")) @@ -92,9 +100,9 @@ func NewRunCommand() *cobra.Command { MustBindPFlag("http.tls.keyPath", flags.Lookup("http-tls-key-path")) viper.MustBindEnv("http.tls.keyPath", "OPEN-VE_HTTP_TLS_KEY_PATH") // GRPC - flags.String("grpc-addr", defaultConfig.GRPC.Addr, "gRPC server address") - MustBindPFlag("grpc.addr", flags.Lookup("grpc-addr")) - viper.MustBindEnv("grpc.addr", "OPEN-VE_GRPC_ADDR") + flags.String("grpc-port", defaultConfig.GRPC.Port, "gRPC server port") + MustBindPFlag("grpc.port", flags.Lookup("grpc-port")) + viper.MustBindEnv("grpc.port", "OPEN-VE_GRPC_PORT") flags.Bool("grpc-tls-enabled", defaultConfig.GRPC.TLS.Enabled, "gRPC server TLS enabled") MustBindPFlag("grpc.tls.enabled", flags.Lookup("grpc-tls-enabled")) @@ -214,7 +222,7 @@ func run(cmd *cobra.Command, args []string) { if cfg.Mode == "slave" { wg.Add(1) - slaveRegistrar := slave.NewSlaveRegistrar(cfg.Slave.Id, cfg.GRPC.Addr, cfg.GRPC.TLS.Enabled, cfg.Slave.MasterGRPCAddr, cfg.Slave.MasterGRPCTLSEnabled, dslReader, logger) + slaveRegistrar := slave.NewSlaveRegistrar(cfg.Slave.Id, cfg.Slave.SlaveGRPCAddr, cfg.GRPC.TLS.Enabled, cfg.Slave.MasterGRPCAddr, cfg.Slave.MasterGRPCTLSEnabled, dslReader, logger) go func(wg *sync.WaitGroup) { logger.Info("πŸš€ slave registration timer: starting..") slaveRegistrar.RegisterTimer(ctx, wg) diff --git a/go/pkg/config/config.go b/go/pkg/config/config.go index 6d54ff5..4c5ffd5 100644 --- a/go/pkg/config/config.go +++ b/go/pkg/config/config.go @@ -11,19 +11,20 @@ type Config struct { type SlaveConfig struct { Id string `yaml:"id"` + SlaveGRPCAddr string `yaml:"slaveAddr"` MasterGRPCTLSEnabled bool `yaml:"masterGRPCTLSEnabled"` MasterGRPCAddr string `yaml:"masterGRPCAddr"` } type HttpConfig struct { - Addr string `yaml:"addr"` + Port string `yaml:"port"` CORSAllowedOrigins []string `yaml:"corsAllowedOrigins"` CORSAllowedHeaders []string `yaml:"corsAllowedHeaders"` TLS TLSConfig `yaml:"tls"` } type GRPCConfig struct { - Addr string `yaml:"addr"` + Port string `yaml:"port"` TLS TLSConfig `yaml:"tls"` } @@ -54,11 +55,12 @@ func DefaultConfig() *Config { Mode: "master", Slave: SlaveConfig{ Id: "", + SlaveGRPCAddr: "", MasterGRPCAddr: "", MasterGRPCTLSEnabled: false, }, Http: HttpConfig{ - Addr: ":8080", + Port: "8080", CORSAllowedOrigins: []string{"*"}, CORSAllowedHeaders: []string{"*"}, TLS: TLSConfig{ @@ -66,7 +68,7 @@ func DefaultConfig() *Config { }, }, GRPC: GRPCConfig{ - Addr: ":9000", + Port: "9000", TLS: TLSConfig{ Enabled: false, }, diff --git a/go/pkg/server/gateway.go b/go/pkg/server/gateway.go index 0057ec8..41e8dd9 100644 --- a/go/pkg/server/gateway.go +++ b/go/pkg/server/gateway.go @@ -63,11 +63,11 @@ func (g *Gateway) Run(ctx context.Context, wg *sync.WaitGroup) { dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) } - if err := pbValidate.RegisterValidateServiceHandlerFromEndpoint(ctx, grpcGateway, g.gRPCConfig.Addr, dialOpts); err != nil { + if err := pbValidate.RegisterValidateServiceHandlerFromEndpoint(ctx, grpcGateway, ":"+g.gRPCConfig.Port, dialOpts); err != nil { panic(failure.Translate(err, appError.ErrServerStartFailed, failure.Messagef("failed to register validate service on gateway"))) } - if err := pbDSL.RegisterDSLServiceHandlerFromEndpoint(ctx, grpcGateway, g.gRPCConfig.Addr, dialOpts); err != nil { + if err := pbDSL.RegisterDSLServiceHandlerFromEndpoint(ctx, grpcGateway, ":"+g.gRPCConfig.Port, dialOpts); err != nil { panic(failure.Translate(err, appError.ErrServerStartFailed, failure.Messagef("failed to register dsl service on gateway"))) } @@ -82,7 +82,7 @@ func (g *Gateway) Run(ctx context.Context, wg *sync.WaitGroup) { }).Handler(withMiddleware) g.server = &http.Server{ - Addr: g.httpConfig.Addr, + Addr: ":" + g.httpConfig.Port, Handler: withCors, } diff --git a/go/pkg/server/grpc.go b/go/pkg/server/grpc.go index e76c05f..14eaa49 100644 --- a/go/pkg/server/grpc.go +++ b/go/pkg/server/grpc.go @@ -53,7 +53,7 @@ func NewGrpc( func (g *GRPC) Run(ctx context.Context, wg *sync.WaitGroup, mode string) { - listen, err := net.Listen("tcp", g.gRPCConfig.Addr) + listen, err := net.Listen("tcp", ":"+g.gRPCConfig.Port) if err != nil { panic(failure.Translate(err, appError.ErrServerStartFailed)) } diff --git a/go/pkg/slave/registrar.go b/go/pkg/slave/registrar.go index 8b9d266..d926be4 100644 --- a/go/pkg/slave/registrar.go +++ b/go/pkg/slave/registrar.go @@ -27,7 +27,7 @@ type SlaveRegistrar struct { logger *slog.Logger } -func NewSlaveRegistrar(id, address string, tlsEnabled bool, masterAddress string, masterTLSEnabled bool, dslReader *reader.DSLReader, logger *slog.Logger) *SlaveRegistrar { +func NewSlaveRegistrar(id, slaveAddress string, slaveTLSEnabled bool, masterAddress string, masterTLSEnabled bool, dslReader *reader.DSLReader, logger *slog.Logger) *SlaveRegistrar { var opts []grpc.DialOption if masterTLSEnabled { @@ -45,8 +45,8 @@ func NewSlaveRegistrar(id, address string, tlsEnabled bool, masterAddress string return &SlaveRegistrar{ Id: id, - Address: address, - TLSEnabled: tlsEnabled, + Address: slaveAddress, + TLSEnabled: slaveTLSEnabled, dslReader: dslReader, gRPCClient: gRPCClient, gRPCConn: conn, From 29dd8020548a849d17b2eec6759889cf3a0811e5 Mon Sep 17 00:00:00 2001 From: shibukazu Date: Sat, 24 Aug 2024 03:31:49 +0900 Subject: [PATCH 5/9] impl --- docker-compose.yml | 4 +- go/cmd/run/run.go | 34 +++--- go/pkg/appError/serviceError.go | 3 +- go/pkg/config/config.go | 12 +- go/pkg/server/gateway.go | 210 ++++++++++++++++++++++++++++++-- go/pkg/server/grpc.go | 7 +- go/pkg/slave/manager.go | 15 +++ go/pkg/slave/registrar.go | 93 ++++++++------ 8 files changed, 300 insertions(+), 78 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 0a8e15b..4c5a3af 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -53,8 +53,8 @@ services: environment: - OPEN-VE_MODE=slave - OPEN-VE_SLAVE_ID=slave-node - - OPEN-VE_SLAVE_SLAVE_GRPC_ADDR=slave-node:9000 - - OPEN-VE_SLAVE_MASTER_GRPC_ADDR=master-node:9000 + - OPEN-VE_SLAVE_SLAVE_HTTP_ADDR=http://slave-node:8080 + - OPEN-VE_SLAVE_MASTER_HTTP_ADDR=http://master-node:8080 - OPEN-VE_SLAVE_MASTER_TLS_ENABLED= - OPEN-VE_HTTP_PORT= - OPEN-VE_HTTP_CORS_ALLOWED_ORIGINS= diff --git a/go/cmd/run/run.go b/go/cmd/run/run.go index 24b909c..02f8753 100644 --- a/go/cmd/run/run.go +++ b/go/cmd/run/run.go @@ -35,13 +35,13 @@ func NewRunCommand() *cobra.Command { if id == "" { return failure.New(appError.ErrConfigFileSyntaxError, failure.Message("ID of the slave server is required")) } - slaveAddr := viper.GetString("slave.slaveGRPCAddr") - if slaveAddr == "" { - return failure.New(appError.ErrConfigFileSyntaxError, failure.Message("gRPC address of the slave server is required")) + slaveHTTPAddr := viper.GetString("slave.slaveHTTPAddr") + if slaveHTTPAddr == "" { + return failure.New(appError.ErrConfigFileSyntaxError, failure.Message("HTTP address of the slave server is required")) } - masterAddr := viper.GetString("slave.masterGRPCAddr") - if masterAddr == "" { - return failure.New(appError.ErrConfigFileSyntaxError, failure.Message("gRPC address of the master server is required")) + masterHTTPAddr := viper.GetString("slave.masterHTTPAddr") + if masterHTTPAddr == "" { + return failure.New(appError.ErrConfigFileSyntaxError, failure.Message("HTTP address of the master server is required")) } } return nil @@ -63,17 +63,17 @@ func NewRunCommand() *cobra.Command { MustBindPFlag("slave.id", flags.Lookup("slave-id")) viper.MustBindEnv("slave.id", "OPEN-VE_SLAVE_ID") - flags.String("slave-slave-grpc-addr", defaultConfig.Slave.SlaveGRPCAddr, "gRPC address of the slave server") - MustBindPFlag("slave.slaveGRPCAddr", flags.Lookup("slave-slave-grpc-addr")) - viper.MustBindEnv("slave.slaveGRPCAddr", "OPEN-VE_SLAVE_SLAVE_GRPC_ADDR") + flags.String("slave-slave-http-addr", defaultConfig.Slave.SlaveHTTPAddr, "HTTP address of the slave server") + MustBindPFlag("slave.slaveHTTPAddr", flags.Lookup("slave-slave-http-addr")) + viper.MustBindEnv("slave.slaveHTTPAddr", "OPEN-VE_SLAVE_SLAVE_HTTP_ADDR") - flags.String("slave-master-grpc-addr", defaultConfig.Slave.MasterGRPCAddr, "gRPC address of the master server") - MustBindPFlag("slave.masterGRPCAddr", flags.Lookup("slave-master-grpc-addr")) - viper.MustBindEnv("slave.masterGRPCAddr", "OPEN-VE_SLAVE_MASTER_GRPC_ADDR") + flags.String("slave-master-http-addr", defaultConfig.Slave.MasterHTTPAddr, "HTTP address of the master server") + MustBindPFlag("slave.masterHTTPAddr", flags.Lookup("slave-master-http-addr")) + viper.MustBindEnv("slave.masterHTTPAddr", "OPEN-VE_SLAVE_MASTER_HTTP_ADDR") - flags.Bool("slave-master-grpc-tls-enabled", defaultConfig.Slave.MasterGRPCTLSEnabled, "connect to master server with TLS") - MustBindPFlag("slave.masterGRPCTLSEnabled", flags.Lookup("slave-master-grpc-tls-enabled")) - viper.MustBindEnv("slave.masterGRPCTLSEnabled", "OPEN-VE_SLAVE_MASTER_GRPC_TLS_ENABLED") + flags.Bool("slave-master-http-tls-enabled", defaultConfig.Slave.MasterHTTPTLSEnabled, "connect to master server with TLS") + MustBindPFlag("slave.masterHTTPTLSEnabled", flags.Lookup("slave-master-http-tls-enabled")) + viper.MustBindEnv("slave.masterHTTPTLSEnabled", "OPEN-VE_SLAVE_MASTER_HTTP_TLS_ENABLED") // HTTP flags.String("http-port", defaultConfig.Http.Port, "HTTP server port") @@ -204,7 +204,7 @@ func run(cmd *cobra.Command, args []string) { validator := validator.NewValidator(logger, store) slaveManager := slave.NewSlaveManager(logger) - gw := server.NewGateway(&cfg.Http, &cfg.GRPC, logger, dslReader) + gw := server.NewGateway(cfg.Mode, &cfg.Http, &cfg.GRPC, logger, dslReader, slaveManager) wg.Add(1) logger.Info("πŸš€ Open-VE: starting...", slog.Any("config", cfg)) @@ -222,7 +222,7 @@ func run(cmd *cobra.Command, args []string) { if cfg.Mode == "slave" { wg.Add(1) - slaveRegistrar := slave.NewSlaveRegistrar(cfg.Slave.Id, cfg.Slave.SlaveGRPCAddr, cfg.GRPC.TLS.Enabled, cfg.Slave.MasterGRPCAddr, cfg.Slave.MasterGRPCTLSEnabled, dslReader, logger) + slaveRegistrar := slave.NewSlaveRegistrar(cfg.Slave.Id, cfg.Slave.SlaveHTTPAddr, cfg.GRPC.TLS.Enabled, cfg.Slave.MasterHTTPAddr, cfg.Slave.MasterHTTPTLSEnabled, dslReader, logger) go func(wg *sync.WaitGroup) { logger.Info("πŸš€ slave registration timer: starting..") slaveRegistrar.RegisterTimer(ctx, wg) diff --git a/go/pkg/appError/serviceError.go b/go/pkg/appError/serviceError.go index 8508f75..4f9a19c 100644 --- a/go/pkg/appError/serviceError.go +++ b/go/pkg/appError/serviceError.go @@ -16,7 +16,8 @@ const ( ErrRequestParameterInvalid = "RequestParameterInvalid" - ErrValidateServiceIDNotFound = "ValidateServiceIDNotFound" + ErrValidateServiceIDNotFound = "ValidateServiceIDNotFound" + ErrValidateServiceForwardFailed = "ValidateServiceForwardFailed" ErrDSLServiceDSLSyntaxError = "DSLServiceDSLSyntaxError" diff --git a/go/pkg/config/config.go b/go/pkg/config/config.go index 4c5ffd5..39f3f6b 100644 --- a/go/pkg/config/config.go +++ b/go/pkg/config/config.go @@ -11,9 +11,9 @@ type Config struct { type SlaveConfig struct { Id string `yaml:"id"` - SlaveGRPCAddr string `yaml:"slaveAddr"` - MasterGRPCTLSEnabled bool `yaml:"masterGRPCTLSEnabled"` - MasterGRPCAddr string `yaml:"masterGRPCAddr"` + SlaveHTTPAddr string `yaml:"slaveHTTPAddr"` + MasterHTTPTLSEnabled bool `yaml:"masterHTTPTLSEnabled"` + MasterHTTPAddr string `yaml:"masterHTTPAddr"` } type HttpConfig struct { @@ -55,9 +55,9 @@ func DefaultConfig() *Config { Mode: "master", Slave: SlaveConfig{ Id: "", - SlaveGRPCAddr: "", - MasterGRPCAddr: "", - MasterGRPCTLSEnabled: false, + SlaveHTTPAddr: "", + MasterHTTPAddr: "", + MasterHTTPTLSEnabled: false, }, Http: HttpConfig{ Port: "8080", diff --git a/go/pkg/server/gateway.go b/go/pkg/server/gateway.go index 41e8dd9..47f86f8 100644 --- a/go/pkg/server/gateway.go +++ b/go/pkg/server/gateway.go @@ -3,7 +3,9 @@ package server import ( "bytes" "context" + "crypto/tls" "encoding/json" + "fmt" "io" "log/slog" "net/http" @@ -16,7 +18,9 @@ import ( "github.com/shibukazu/open-ve/go/pkg/appError" "github.com/shibukazu/open-ve/go/pkg/config" "github.com/shibukazu/open-ve/go/pkg/dsl/reader" + "github.com/shibukazu/open-ve/go/pkg/slave" pbDSL "github.com/shibukazu/open-ve/go/proto/dsl/v1" + pbSlave "github.com/shibukazu/open-ve/go/proto/slave/v1" pbValidate "github.com/shibukazu/open-ve/go/proto/validate/v1" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -24,24 +28,30 @@ import ( ) type Gateway struct { - httpConfig *config.HttpConfig - gRPCConfig *config.GRPCConfig - logger *slog.Logger - dslReader *reader.DSLReader - server *http.Server + mode string + httpConfig *config.HttpConfig + gRPCConfig *config.GRPCConfig + logger *slog.Logger + dslReader *reader.DSLReader + slaveManager *slave.SlaveManager + server *http.Server } func NewGateway( + mode string, httpConfig *config.HttpConfig, gRPCConfig *config.GRPCConfig, logger *slog.Logger, dslReader *reader.DSLReader, + slaveManager *slave.SlaveManager, ) *Gateway { return &Gateway{ - httpConfig: httpConfig, - gRPCConfig: gRPCConfig, - logger: logger, - dslReader: dslReader, + mode: mode, + httpConfig: httpConfig, + gRPCConfig: gRPCConfig, + logger: logger, + dslReader: dslReader, + slaveManager: slaveManager, } } @@ -71,7 +81,13 @@ func (g *Gateway) Run(ctx context.Context, wg *sync.WaitGroup) { panic(failure.Translate(err, appError.ErrServerStartFailed, failure.Messagef("failed to register dsl service on gateway"))) } - withMiddleware := g.validateRequestTypeConvertMiddleware(grpcGateway) + if g.mode == "master" { + if err := pbSlave.RegisterSlaveServiceHandlerFromEndpoint(ctx, grpcGateway, ":"+g.gRPCConfig.Port, dialOpts); err != nil { + panic(failure.Translate(err, appError.ErrServerStartFailed, failure.Messagef("failed to register slave service on gateway"))) + } + } + + withMiddleware := g.forwardCheckRequestMiddleware(g.validateRequestTypeConvertMiddleware(grpcGateway)) withCors := cors.New(cors.Options{ AllowedOrigins: g.httpConfig.CORSAllowedOrigins, @@ -122,6 +138,180 @@ func (g *Gateway) shutdown(ctx context.Context) { g.logger.Info("πŸ›‘ gateway server is stopped") } +type responseRecorder struct { + http.ResponseWriter + statusCode int + body *bytes.Buffer +} + +func (rec *responseRecorder) WriteHeader(code int) { + rec.statusCode = code +} + +func (rec *responseRecorder) Write(b []byte) (int, error) { + return rec.body.Write(b) +} + +func (g *Gateway) forwardCheckRequestMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if g.mode == "master" && r.URL.Path == "/v1/check" && r.Method == "POST" { + ctx := context.Background() + modifiedRequestValidations := make([]interface{}, 0) + validationResults := make([]interface{}, 0) + + var reqBody map[string]interface{} + var resBody map[string]interface{} + if err := json.NewDecoder(r.Body).Decode(&reqBody); err != nil { + http.Error(w, failure.Translate(err, appError.ErrRequestParameterInvalid).Error(), http.StatusBadRequest) + return + } + + validations, ok := reqBody["validations"].([]interface{}) + if !ok { + http.Error(w, failure.New(appError.ErrRequestParameterInvalid, failure.Messagef("validations field is invalid")).Error(), http.StatusBadRequest) + return + } + + dslFound := false + dsl, err := g.dslReader.Read(ctx) + if err == nil { + dslFound = true + } + // TODO: ε„ε‡¦η†γ‚’δΈ¦εˆ—εŒ–γ™γ‚‹ + for _, validation := range validations { + validation, ok := validation.(map[string]interface{}) + if !ok { + http.Error(w, failure.New(appError.ErrRequestParameterInvalid, failure.Messagef("validation field is invalid")).Error(), http.StatusBadRequest) + return + } + id, ok := validation["id"].(string) + if !ok { + http.Error(w, failure.New(appError.ErrRequestParameterInvalid, failure.Messagef("id field is invalid")).Error(), http.StatusBadRequest) + return + } + + // Check if the request forward is needed + isForwardNeed := false + if !dslFound { + isForwardNeed = true + } else { + for _, validation := range dsl.Validations { + if validation.ID == id { + isForwardNeed = true + break + } + } + } + + if isForwardNeed { + // Find the slave node that can handle validation ID + slaveNode, err := g.slaveManager.FindSlave(id) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + var client *http.Client + if slaveNode.TLSEnabled { + transport := &http.Transport{ + TLSClientConfig: &tls.Config{}, + } + client = &http.Client{Transport: transport} + } else { + client = &http.Client{} + } + client.Timeout = 5 * time.Second + + reqBody := map[string]interface{}{ + "validations": []interface{}{validation}, + } + body, err := json.Marshal(reqBody) + if err != nil { + http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError) + return + } + req, err := http.NewRequest("POST", slaveNode.Addr+"/v1/check", bytes.NewBuffer(body)) + if err != nil { + http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError) + return + } + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + http.Error(w, failure.New(appError.ErrValidateServiceForwardFailed, failure.Messagef("Failed to forward the validate request to slave: %d", resp.StatusCode)).Error(), http.StatusInternalServerError) + return + } + + var respBody map[string]interface{} + if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil { + http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError) + return + } + results, ok := respBody["results"].([]interface{}) + if !ok { + http.Error(w, failure.New(appError.ErrValidateServiceForwardFailed, failure.Messagef("results field is invalid")).Error(), http.StatusInternalServerError) + return + } + validationResults = append(validationResults, results...) + + g.logger.Info(fmt.Sprintf("⚽️ Request (id:%s) Forwarded to Slave %s", id, slaveNode.Id)) + } else { + modifiedRequestValidations = append(modifiedRequestValidations, validation) + } + } + + reqBody["validations"] = modifiedRequestValidations + modifiedReqBody, err := json.Marshal(reqBody) + if err != nil { + http.Error(w, failure.Translate(err, appError.ErrRequestParameterInvalid).Error(), http.StatusInternalServerError) + return + } + r.Body = io.NopCloser(bytes.NewBuffer(modifiedReqBody)) + r.ContentLength = int64(len(modifiedReqBody)) + + rec := &responseRecorder{ + ResponseWriter: w, + body: &bytes.Buffer{}, + } + next.ServeHTTP(rec, r) + + // Concat the validation results + if err := json.Unmarshal(rec.body.Bytes(), &resBody); err != nil { + http.Error(w, failure.Translate(err, appError.ErrRequestParameterInvalid).Error(), http.StatusInternalServerError) + return + } + originalValidationResults, ok := resBody["results"].([]interface{}) + if !ok { + http.Error(w, failure.New(appError.ErrRequestParameterInvalid, failure.Messagef("results field is invalid")).Error(), http.StatusInternalServerError) + return + } + resBody["results"] = append(originalValidationResults, validationResults...) + resBodyJson, err := json.Marshal(resBody) + if err != nil { + http.Error(w, failure.Translate(err, appError.ErrRequestParameterInvalid).Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Content-Length", fmt.Sprint(len(resBodyJson))) + w.WriteHeader(http.StatusOK) + _, err = w.Write(resBodyJson) + if err != nil { + g.logger.Error(failure.Translate(err, appError.ErrServerInternalError).Error()) + } + } else { + next.ServeHTTP(w, r) + } + }) +} + func (g *Gateway) validateRequestTypeConvertMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/v1/check" && r.Method == "POST" { diff --git a/go/pkg/server/grpc.go b/go/pkg/server/grpc.go index 14eaa49..bc7cac3 100644 --- a/go/pkg/server/grpc.go +++ b/go/pkg/server/grpc.go @@ -59,7 +59,7 @@ func (g *GRPC) Run(ctx context.Context, wg *sync.WaitGroup, mode string) { } grpcServerOpts := []grpc.ServerOption{} - grpcServerOpts = append(grpcServerOpts, grpc.UnaryInterceptor(g.accessLogInterceptor())) + grpcServerOpts = append(grpcServerOpts, grpc.UnaryInterceptor(g.interceptor())) if g.gRPCConfig.TLS.Enabled { if g.gRPCConfig.TLS.CertPath == "" || g.gRPCConfig.TLS.KeyPath == "" { panic(failure.New(appError.ErrServerStartFailed, failure.Message("certPath and keyPath must be set"))) @@ -122,11 +122,12 @@ func (g *GRPC) shutdown(ctx context.Context) { } } -func (g *GRPC) accessLogInterceptor() grpc.UnaryServerInterceptor { +func (g *GRPC) interceptor() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + // Log the request g.logger.Info("πŸ” Access Log", slog.String("Method", info.FullMethod), slog.String("Request", fmt.Sprintf("%+v", req))) - resp, err := handler(ctx, req) + resp, err := handler(ctx, req) return resp, err } } diff --git a/go/pkg/slave/manager.go b/go/pkg/slave/manager.go index 93ab843..42b7c89 100644 --- a/go/pkg/slave/manager.go +++ b/go/pkg/slave/manager.go @@ -3,6 +3,8 @@ package slave import ( "log/slog" "sync" + + "github.com/morikuni/failure/v2" ) type SlaveManager struct { @@ -27,3 +29,16 @@ func (m *SlaveManager) RegisterSlave(id, addr string, tlsEnabled bool, validatio m.Slaves[id] = &Slave{Id: id, Addr: addr, ValidationIds: validationIds, TLSEnabled: tlsEnabled} m.mu.Unlock() } + +func (m *SlaveManager) FindSlave(validationId string) (*Slave, error) { + m.mu.RLock() + defer m.mu.RUnlock() + for _, slave := range m.Slaves { + for _, id := range slave.ValidationIds { + if id == validationId { + return slave, nil + } + } + } + return nil, failure.New("slave node that can handle the validation ID is not found") +} diff --git a/go/pkg/slave/registrar.go b/go/pkg/slave/registrar.go index d926be4..913a477 100644 --- a/go/pkg/slave/registrar.go +++ b/go/pkg/slave/registrar.go @@ -1,56 +1,51 @@ package slave import ( + "bytes" "context" "crypto/tls" - "log" + "encoding/json" "log/slog" + "net/http" "sync" "time" "github.com/morikuni/failure/v2" "github.com/shibukazu/open-ve/go/pkg/appError" "github.com/shibukazu/open-ve/go/pkg/dsl/reader" - pb "github.com/shibukazu/open-ve/go/proto/slave/v1" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" ) type SlaveRegistrar struct { - Id string - Address string - TLSEnabled bool - dslReader *reader.DSLReader - gRPCClient pb.SlaveServiceClient - gRPCConn *grpc.ClientConn - logger *slog.Logger + Id string + SlaveHTTPAddress string + SlaveTLSEnabled bool + MasterHTTPAddress string + dslReader *reader.DSLReader + httpClient *http.Client + logger *slog.Logger } -func NewSlaveRegistrar(id, slaveAddress string, slaveTLSEnabled bool, masterAddress string, masterTLSEnabled bool, dslReader *reader.DSLReader, logger *slog.Logger) *SlaveRegistrar { - var opts []grpc.DialOption +func NewSlaveRegistrar(id, slaveHTTPAddress string, slaveTLSEnabled bool, masterHTTPAddress string, masterTLSEnabled bool, dslReader *reader.DSLReader, logger *slog.Logger) *SlaveRegistrar { + var client *http.Client if masterTLSEnabled { - creds := credentials.NewTLS(&tls.Config{}) - opts = append(opts, grpc.WithTransportCredentials(creds)) - } - opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), grpc.WithTimeout(5*time.Second)) - - conn, err := grpc.Dial(masterAddress, opts...) - if err != nil { - log.Fatalf("Failed to connect: %v", err) + transport := &http.Transport{ + TLSClientConfig: &tls.Config{}, + } + client = &http.Client{Transport: transport} + } else { + client = &http.Client{} } - - gRPCClient := pb.NewSlaveServiceClient(conn) + client.Timeout = 5 * time.Second return &SlaveRegistrar{ - Id: id, - Address: slaveAddress, - TLSEnabled: slaveTLSEnabled, - dslReader: dslReader, - gRPCClient: gRPCClient, - gRPCConn: conn, - logger: logger, + Id: id, + SlaveHTTPAddress: slaveHTTPAddress, + SlaveTLSEnabled: slaveTLSEnabled, + MasterHTTPAddress: masterHTTPAddress, + dslReader: dslReader, + httpClient: client, + logger: logger, } } @@ -62,7 +57,6 @@ func (s *SlaveRegistrar) RegisterTimer(ctx context.Context, wg *sync.WaitGroup) select { case <-ctx.Done(): ticker.Stop() - s.gRPCConn.Close() s.logger.Info("πŸ›‘ slave registration timer stopped") wg.Done() return @@ -82,14 +76,35 @@ func (s *SlaveRegistrar) register(ctx context.Context) { for i, validation := range dsl.Validations { validationIds[i] = validation.ID } - _, err = s.gRPCClient.Register(ctx, &pb.RegisterRequest{ - Id: s.Id, - Address: s.Address, - TlsEnabled: s.TLSEnabled, - ValidationIds: validationIds, - }) + reqBody := map[string]interface{}{ + "id": s.Id, + "address": s.SlaveHTTPAddress, + "tls_enabled": s.SlaveTLSEnabled, + "validation_ids": validationIds, + } + body, err := json.Marshal(reqBody) if err != nil { - s.logger.Error(failure.Translate(err, appError.ErrSlaveRegistrationFailed, failure.Message("Failed to register to master")).Error()) + s.logger.Error(failure.Translate(err, appError.ErrSlaveRegistrationFailed, failure.Message("Failed to marshal request body")).Error()) + return + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.MasterHTTPAddress+"/v1/slave/register", bytes.NewReader(body)) + if err != nil { + s.logger.Error(failure.Translate(err, appError.ErrSlaveRegistrationFailed, failure.Message("Failed to create request")).Error()) + return + } + req.Header.Set("Content-Type", "application/json") + + resp, err := s.httpClient.Do(req) + if err != nil { + s.logger.Error(failure.Translate(err, appError.ErrSlaveRegistrationFailed, failure.Message("Failed to send request")).Error()) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + s.logger.Error(failure.New(appError.ErrSlaveRegistrationFailed, failure.Messagef("Failed to register to master: %d", resp.StatusCode)).Error()) + return } else { s.logger.Info("πŸ““ slave registration success") } From e8089ce6cdeae9d0a881e2b69c585ee5dc6b6713 Mon Sep 17 00:00:00 2001 From: shibukazu Date: Sat, 24 Aug 2024 03:51:26 +0900 Subject: [PATCH 6/9] fix --- go/pkg/server/gateway.go | 8 +++----- go/pkg/slave/manager.go | 3 ++- go/pkg/store/memory.go | 7 ++++++- go/pkg/store/redis.go | 6 +++++- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/go/pkg/server/gateway.go b/go/pkg/server/gateway.go index 47f86f8..e97c1a4 100644 --- a/go/pkg/server/gateway.go +++ b/go/pkg/server/gateway.go @@ -191,13 +191,11 @@ func (g *Gateway) forwardCheckRequestMiddleware(next http.Handler) http.Handler } // Check if the request forward is needed - isForwardNeed := false - if !dslFound { - isForwardNeed = true - } else { + isForwardNeed := true + if dslFound { for _, validation := range dsl.Validations { if validation.ID == id { - isForwardNeed = true + isForwardNeed = false break } } diff --git a/go/pkg/slave/manager.go b/go/pkg/slave/manager.go index 42b7c89..1dc10b6 100644 --- a/go/pkg/slave/manager.go +++ b/go/pkg/slave/manager.go @@ -1,6 +1,7 @@ package slave import ( + "fmt" "log/slog" "sync" @@ -40,5 +41,5 @@ func (m *SlaveManager) FindSlave(validationId string) (*Slave, error) { } } } - return nil, failure.New("slave node that can handle the validation ID is not found") + return nil, failure.New(fmt.Sprintf("slave node that can handle the validation ID (%s) is not found", validationId)) } diff --git a/go/pkg/store/memory.go b/go/pkg/store/memory.go index 8f85f33..ccb5afc 100644 --- a/go/pkg/store/memory.go +++ b/go/pkg/store/memory.go @@ -3,6 +3,7 @@ package store import ( "bytes" "encoding/json" + "strings" "sync" "github.com/morikuni/failure/v2" @@ -23,7 +24,11 @@ func NewMemoryStore(id string) *MemoryStore { func (s *MemoryStore) Reset() error { s.mu.Lock() - s.memory = make(map[string][]byte) + for k := range s.memory { + if strings.HasPrefix(k, s.id+":") { + delete(s.memory, k) + } + } s.mu.Unlock() return nil } diff --git a/go/pkg/store/redis.go b/go/pkg/store/redis.go index 96dd0d9..f91e184 100644 --- a/go/pkg/store/redis.go +++ b/go/pkg/store/redis.go @@ -20,7 +20,11 @@ func NewRedisStore(id string, redisClient *redis.Client) *RedisStore { } func (s *RedisStore) Reset() error { - if err := s.redisClient.FlushDB().Err(); err != nil { + keys, _ := s.redisClient.Scan(0, s.id+":*", 0).Val() + if len(keys) == 0 { + return nil + } + if err := s.redisClient.Del(keys...).Err(); err != nil { return failure.Translate(err, appError.ErrRedisOperationFailed) } return nil From ec826578bc35c8812f0e098529b87c89f2faf994 Mon Sep 17 00:00:00 2001 From: shibukazu Date: Sat, 24 Aug 2024 04:16:31 +0900 Subject: [PATCH 7/9] make operation parallel --- go/pkg/server/gateway.go | 123 ++++++++++++++++++++++----------------- 1 file changed, 70 insertions(+), 53 deletions(-) diff --git a/go/pkg/server/gateway.go b/go/pkg/server/gateway.go index e97c1a4..4e89b4c 100644 --- a/go/pkg/server/gateway.go +++ b/go/pkg/server/gateway.go @@ -177,7 +177,10 @@ func (g *Gateway) forwardCheckRequestMiddleware(next http.Handler) http.Handler if err == nil { dslFound = true } - // TODO: ε„ε‡¦η†γ‚’δΈ¦εˆ—εŒ–γ™γ‚‹ + + ch := make(chan []interface{}) + errCh := make(chan error) + numForwarded := 0 for _, validation := range validations { validation, ok := validation.(map[string]interface{}) if !ok { @@ -202,69 +205,83 @@ func (g *Gateway) forwardCheckRequestMiddleware(next http.Handler) http.Handler } if isForwardNeed { - // Find the slave node that can handle validation ID - slaveNode, err := g.slaveManager.FindSlave(id) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - var client *http.Client - if slaveNode.TLSEnabled { - transport := &http.Transport{ - TLSClientConfig: &tls.Config{}, + numForwarded++ + go func(id string, ch chan []interface{}) { + // Find the slave node that can handle validation ID + slaveNode, err := g.slaveManager.FindSlave(id) + if err != nil { + errCh <- err + return } - client = &http.Client{Transport: transport} - } else { - client = &http.Client{} - } - client.Timeout = 5 * time.Second - reqBody := map[string]interface{}{ - "validations": []interface{}{validation}, - } - body, err := json.Marshal(reqBody) - if err != nil { - http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError) - return - } - req, err := http.NewRequest("POST", slaveNode.Addr+"/v1/check", bytes.NewBuffer(body)) - if err != nil { - http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError) - return - } - req.Header.Set("Content-Type", "application/json") + var client *http.Client + if slaveNode.TLSEnabled { + transport := &http.Transport{ + TLSClientConfig: &tls.Config{}, + } + client = &http.Client{Transport: transport} + } else { + client = &http.Client{} + } + client.Timeout = 5 * time.Second - resp, err := client.Do(req) - if err != nil { - http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError) - return - } - defer resp.Body.Close() + reqBody := map[string]interface{}{ + "validations": []interface{}{validation}, + } + body, err := json.Marshal(reqBody) + if err != nil { + errCh <- failure.Translate(err, appError.ErrValidateServiceForwardFailed) + return + } + req, err := http.NewRequest("POST", slaveNode.Addr+"/v1/check", bytes.NewBuffer(body)) + if err != nil { + errCh <- failure.Translate(err, appError.ErrValidateServiceForwardFailed) + return + } + req.Header.Set("Content-Type", "application/json") - if resp.StatusCode != http.StatusOK { - http.Error(w, failure.New(appError.ErrValidateServiceForwardFailed, failure.Messagef("Failed to forward the validate request to slave: %d", resp.StatusCode)).Error(), http.StatusInternalServerError) - return - } + resp, err := client.Do(req) + if err != nil { + errCh <- failure.Translate(err, appError.ErrValidateServiceForwardFailed) + return + } + defer resp.Body.Close() - var respBody map[string]interface{} - if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil { - http.Error(w, failure.Translate(err, appError.ErrValidateServiceForwardFailed).Error(), http.StatusInternalServerError) - return - } - results, ok := respBody["results"].([]interface{}) - if !ok { - http.Error(w, failure.New(appError.ErrValidateServiceForwardFailed, failure.Messagef("results field is invalid")).Error(), http.StatusInternalServerError) - return - } - validationResults = append(validationResults, results...) + if resp.StatusCode != http.StatusOK { + errCh <- failure.New(appError.ErrValidateServiceForwardFailed, failure.Messagef("Failed to forward the validate request to slave: %d", resp.StatusCode)) + return + } - g.logger.Info(fmt.Sprintf("⚽️ Request (id:%s) Forwarded to Slave %s", id, slaveNode.Id)) + var respBody map[string]interface{} + if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil { + errCh <- failure.Translate(err, appError.ErrValidateServiceForwardFailed) + return + } + results, ok := respBody["results"].([]interface{}) + if !ok { + errCh <- failure.New(appError.ErrRequestParameterInvalid, failure.Messagef("results field is invalid")) + return + } + ch <- results + g.logger.Info(fmt.Sprintf("⚽️ Request (id:%s) Forwarded to Slave %s", id, slaveNode.Id)) + }(id, ch) } else { modifiedRequestValidations = append(modifiedRequestValidations, validation) } } + for i := 0; i < numForwarded; i++ { + select { + case err := <-errCh: + http.Error(w, err.Error(), http.StatusInternalServerError) + return + case results := <-ch: + validationResults = append(validationResults, results...) + case <-time.After(30 * time.Second): + http.Error(w, failure.New(appError.ErrValidateServiceForwardFailed, failure.Message("Timeout")).Error(), http.StatusInternalServerError) + } + } + reqBody["validations"] = modifiedRequestValidations modifiedReqBody, err := json.Marshal(reqBody) if err != nil { From 01648d5d3cea168adee3f37679edd7c2a364c681 Mon Sep 17 00:00:00 2001 From: shibukazu Date: Sat, 24 Aug 2024 05:07:18 +0900 Subject: [PATCH 8/9] add readme --- README.md | 203 ++++++++--------------------------- docs/Master-Slave-Example.md | 181 +++++++++++++++++++++++++++++++ docs/Monolithic-Example.md | 133 +++++++++++++++++++++++ 3 files changed, 358 insertions(+), 159 deletions(-) create mode 100644 docs/Master-Slave-Example.md create mode 100644 docs/Monolithic-Example.md diff --git a/README.md b/README.md index b0da5f7..6e217d6 100644 --- a/README.md +++ b/README.md @@ -18,24 +18,29 @@ Open-VE offers an HTTP API and a gRPC API. We will provide a client SDK in the f #### 1. CLI Flags or Environment Variables -| CLI Args | Env | Default | Desc | -| ----------------------------- | ----------------------------------- | ------------ | --------------------------- | -| `--http-addr` | `OPEN-VE_HTTP_ADDR` | `:8080` | HTTP server address | -| `--http-cors-allowed-origins` | `OPEN-VE_HTTP_CORS_ALLOWED_ORIGINS` | `["*"]` | CORS allowed origins | -| `--http-cors-allowed-headers` | `OPEN-VE_HTTP_CORS_ALLOWED_HEADERS` | `["*"]` | CORS allowed headers | -| `--http-tls-enabled` | `OPEN-VE_HTTP_TLS_ENABLED` | `false` | HTTP server TLS enabled | -| `--http-tls-cert-path` | `OPEN-VE_HTTP_TLS_CERT_PATH` | `""` | HTTP server TLS cert path | -| `--http-tls-key-path` | `OPEN-VE_HTTP_TLS_KEY_PATH` | `""` | HTTP server TLS key path | -| `--grpc-addr` | `OPEN-VE_GRPC_ADDR` | `:9000` | gRPC server address | -| `--grpc-tls-enabled` | `OPEN-VE_GRPC_TLS_ENABLED` | `false` | gRPC server TLS enabled | -| `--grpc-tls-cert-path` | `OPEN-VE_GRPC_TLS_CERT_PATH` | `""` | gRPC server TLS cert path | -| `--grpc-tls-key-path` | `OPEN-VE_GRPC_TLS_KEY_PATH` | `""` | gRPC server TLS key path | -| `--store-engine` | `OPEN-VE_STORE_ENGINE` | `memory` | store engine (redis/memory) | -| `--store-redis-addr` | `OPEN-VE_STORE_REDIS_ADDR` | `redis:6379` | Redis address | -| `--store-redis-password` | `OPEN-VE_STORE_REDIS_PASSWORD` | `""` | Redis password | -| `--store-redis-db` | `OPEN-VE_STORE_REDIS_DB` | `0` | Redis DB | -| `--store-redis-pool-size` | `OPEN-VE_STORE_REDIS_POOL_SIZE` | `1000` | Redis pool size | -| `--log-level` | `OPEN-VE_LOG_LEVEL` | `info` | Log level | +| CLI Args | Env | Default | Desc | +| ----------------------------- | ----------------------------------- | ------------ | ---------------------------------------------------------------------- | +| `--mode` | `OPEN-VE_MODE` | `master` | master or slave | +| `--slave-id` | `OPEN-VE_SLAVE_ID` | | Unique slave ID (if mode is slave, this is required) | +| `--slave-slave-http-addr` | `OPEN-VE_SLAVE_SLAVE_HTTP_ADDR` | | HTTP server address (if mode is slave, this is required) | +| `--slave-master-http-addr` | `OPEN-VE_SLAVE_MASTER_HTTP_ADDR` | | Master HTTP server address (if mode is slave, this is required) | +| `--slave-master-tsl-enabled` | `OPEN-VE_SLAVE_MASTER_TLS_ENABLED` | `false` | Connect to master server with TLS (if mode is slave, this is required) | +| `--http-port` | `OPEN-VE_HTTP_PORT` | `8080` | HTTP server port number | +| `--http-cors-allowed-origins` | `OPEN-VE_HTTP_CORS_ALLOWED_ORIGINS` | `["*"]` | CORS allowed origins | +| `--http-cors-allowed-headers` | `OPEN-VE_HTTP_CORS_ALLOWED_HEADERS` | `["*"]` | CORS allowed headers | +| `--http-tls-enabled` | `OPEN-VE_HTTP_TLS_ENABLED` | `false` | HTTP server TLS enabled | +| `--http-tls-cert-path` | `OPEN-VE_HTTP_TLS_CERT_PATH` | `""` | HTTP server TLS cert path | +| `--http-tls-key-path` | `OPEN-VE_HTTP_TLS_KEY_PATH` | `""` | HTTP server TLS key path | +| `--grpc-port` | `OPEN-VE_GRPC_ADDR` | `:9000` | gRPC server port number | +| `--grpc-tls-enabled` | `OPEN-VE_GRPC_TLS_ENABLED` | `false` | gRPC server TLS enabled | +| `--grpc-tls-cert-path` | `OPEN-VE_GRPC_TLS_CERT_PATH` | `""` | gRPC server TLS cert path | +| `--grpc-tls-key-path` | `OPEN-VE_GRPC_TLS_KEY_PATH` | `""` | gRPC server TLS key path | +| `--store-engine` | `OPEN-VE_STORE_ENGINE` | `memory` | store engine (redis/memory) | +| `--store-redis-addr` | `OPEN-VE_STORE_REDIS_ADDR` | `redis:6379` | Redis address | +| `--store-redis-password` | `OPEN-VE_STORE_REDIS_PASSWORD` | `""` | Redis password | +| `--store-redis-db` | `OPEN-VE_STORE_REDIS_DB` | `0` | Redis DB | +| `--store-redis-pool-size` | `OPEN-VE_STORE_REDIS_POOL_SIZE` | `1000` | Redis pool size | +| `--log-level` | `OPEN-VE_LOG_LEVEL` | `info` | Log level | #### 2. Config File @@ -44,8 +49,9 @@ You can also use a config file in YAML format. Place the `config.yaml` in the same directory or `$HOME/.open-ve/config.yaml`. ```yaml +mode: "master" http: - addr: ":8080" + port: "8080" corsAllowedOrigins: ["*"] corsAllowedHeaders: ["*"] tls: @@ -53,7 +59,7 @@ http: certPath: "" keyPath: "" grpc: - addr: ":9000" + poer: "9000" tls: enabled: false certPath: "" @@ -91,7 +97,21 @@ go build -o open-ve go/cmd/open-ve/main.go docker-compose up ``` -## CEL +## System Design + +### Master-Slave Architecture + +Open-VE supports a master-slave architecture designed for scalability and compatibility with microservice environments. + +In slave mode, Open-VE connects to the master server and syncs validation rules every 30 seconds. + +When a validation check request is made to the master server, it distributes the request across the connected slave servers. + +Additionally, you can directly request validation checks from the slave servers. + +![micro-validator](https://github.com/user-attachments/assets/e248d40c-bcc7-4219-a65a-5b243e101000) + +### CEL We use [CEL](https://github.com/google/cel-spec/blob/master/doc/langdef.md) as the expression language for validation rules. @@ -111,143 +131,8 @@ Supported types: | message names | | ❓ | | `type` | | ❓ | -## Example (HTTP API) - -### Register Validation Rules - -Request: - -```bash -curl --request POST \ - --url http://localhost:8080/v1/dsl \ - --header 'Content-Type: application/json' \ - --data '{ - "validations": [ - { - "id": "item", - "cels": [ - "price > 0", # price must be greater than 0 - "size(image) < 360" # image size must be less than 360 bytes - ], - "variables": [ - { - "name": "price", - "type": "int" - }, - { - "name": "image", - "type": "bytes" - } - ] - }, - { - "id": "user", - "cels": [ - "size(name) < 20" # name length must be less than 20 - ], - "variables": [ - { - "name": "name", - "type": "string" - } - ] - } - ] - }' -``` - -Response: - -```json -{} -``` - -### Get Current Validation Rules - -Request: - -```bash -curl --request GET \ - --url http://localhost:8080/v1/dsl \ - --header 'Content-Type: application/json' -``` - -Response: - -```json -{ - "validations": [ - { - "id": "item", - "cels": ["price > 0", "size(image) < 360"], - "variables": [ - { - "name": "price", - "type": "int" - }, - { - "name": "image", - "type": "bytes" - } - ] - }, - { - "id": "user", - "cels": ["size(name) < 20"], - "variables": [ - { - "name": "name", - "type": "string" - } - ] - } - ] -} -``` - -### Validate +## Examples -Request: - -```bash -curl --request POST \ - --url 'http://localhost:8080/v1/check' \ - --header 'Content-Type: application/json' \ - --data '{ - "validations": [ - { - "id": "item", - "variables": { - "price": -100, - "image": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAIAAACQd1PeAAAADElEQVR4nGO4unY2AAR4Ah51j5XwAAAAAElFTkSuQmCC" # send base64 encoded image - } - }, - { - "id": "user", - "variables": { - "name": "longlonglonglongname" - } - } - ] - }' +- [Example of Master Slave Architecture](docs/Master-Slave-Example.md) -``` - -Response: - -```json -{ - "results": [ - { - "id": "item", - "isValid": false, - "message": "failed validations: price > 0" - }, - { - "id": "user", - "isValid": false, - "message": "failed validations: size(name) < 20" - } - ] -} -``` +- [Example of Monolithic Architecture](docs/Monolithic-Example.md) diff --git a/docs/Master-Slave-Example.md b/docs/Master-Slave-Example.md new file mode 100644 index 0000000..5a7a889 --- /dev/null +++ b/docs/Master-Slave-Example.md @@ -0,0 +1,181 @@ +# Master-SlaveExample + +This example demonstrates how to run the Open-VE as master-slave architecture. + +Note: In this example, the master node is hosted at `localhost:8081`, and the slave node is hosted at `localhost:8082`. + +## Run + +### Master Node + +```bash +OPEN-VE_MODE=master +OPEN-VE_HTTP_PORT=8081 +OPEN-VE_GRPC_PORT=9001 + +open-ve run +``` + +### Slave Node + +```bash +OPEN-VE_MODE=slave +OPEN-VE_SLAVE_ID=slave-node-id +OPEN-VE_SLAVE_MASTER_HTTP_ADDR=http://localhost:8081 +OPEN-VE_SLAVE_SLAVE_HTTP_ADDR=http://localhost:8082 +OPEN-VE_HTTP_PORT=8082 +OPEN-VE_GRPC_PORT=9002 + +open-ve run +``` + +## Scinario + +### 1. Register a Set of Validation Rules to Master Node + +```bash +curl --request POST \ + --url http://localhost:8081/v1/dsl \ + --header 'Content-Type: application/json' \ + --data '{ + "validations": [ + { + "id": "user", + "cels": [ + "size(name) < 20" + ], + "variables": [ + { + "name": "name", + "type": "string" + } + ] + } + ] +}' +``` + +### 2. Register a Set of Validation Rules to Slave Node + +```bash +curl --request POST \ + --url http://localhost:8082/v1/dsl \ + --header 'Content-Type: application/json' \ + --data '{ + "validations": [ + { + "id": "item", + "cels": [ + "price > 0", + "size(image) < 360" + ], + "variables": [ + { + "name": "price", + "type": "int" + }, + { + "name": "image", + "type": "bytes" + } + ] + } + ] +}' +``` + +### 3. Check the Current Validation Rules + +```bash +curl --request GET \ + --url http://localhost:8081/v1/dsl \ + --header 'Content-Type: application/json' +``` + +```json +{ + "validations": [ + { + "id": "user", + "cels": ["size(name) < 20"], + "variables": [ + { + "name": "name", + "type": "string" + } + ] + } + ] +} +``` + +```bash +curl --request GET \ + --url http://localhost:8082/v1/dsl \ + --header 'Content-Type: application/json' +``` + +```json +{ + "validations": [ + { + "id": "item", + "cels": ["price > 0", "size(image) < 360"], + "variables": [ + { + "name": "price", + "type": "int" + }, + { + "name": "image", + "type": "bytes" + } + ] + } + ] +} +``` + +### 4. Request Validation Check to Master Node + +Although only part of the validation rules are registered with the master node, you can request validation for all rules, including those on the slave nodes. + +```bash +curl --request POST \ + --url http://localhost:8081/v1/check \ + --header 'Content-Type: application/json' \ + --data '{ + "validations": [ + { + "id": "item", + "variables": { + "price": -100, + "image": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAIAAACQd1PeAAAADElEQVR4nGO4unY2AAR4Ah51j5XwAAAAAElFTkSuQmCC" + } + }, + { + "id": "user", + "variables": { + "name": "longlonglonglongname" + } + } + ] +}' +``` + +```json +{ + "results": [ + { + "id": "user", + "isValid": false, + "message": "failed validations: size(name) < 20" + }, + { + "id": "item", + "isValid": false, + "message": "failed validations: price > 0" + } + ] +} +``` diff --git a/docs/Monolithic-Example.md b/docs/Monolithic-Example.md new file mode 100644 index 0000000..cf75e84 --- /dev/null +++ b/docs/Monolithic-Example.md @@ -0,0 +1,133 @@ +# Monolithic Example + +This example demonstrates how to run the Open-VE as monolithic architecture. + +## Run + +```bash +open-ve run +``` + +## Scenario + +### 1. Register Validation Rules + +```bash +curl --request POST \ + --url http://localhost:8080/v1/dsl \ + --header 'Content-Type: application/json' \ + --data '{ + "validations": [ + { + "id": "user", + "cels": [ + "size(name) < 20" + ], + "variables": [ + { + "name": "name", + "type": "string" + } + ] + } + { + "id": "item", + "cels": [ + "price > 0", + "size(image) < 360" + ], + "variables": [ + { + "name": "price", + "type": "int" + }, + { + "name": "image", + "type": "bytes" + } + ] + } + ] +}' +``` + +### 2. Check the Current Validation Rules + +```bash +curl --request GET \ + --url http://localhost:8080/v1/dsl \ + --header 'Content-Type: application/json' +``` + +```json +{ + "validations": [ + { + "id": "user", + "cels": ["size(name) < 20"], + "variables": [ + { + "name": "name", + "type": "string" + } + ] + } + { + "id": "item", + "cels": ["price > 0", "size(image) < 360"], + "variables": [ + { + "name": "price", + "type": "int" + }, + { + "name": "image", + "type": "bytes" + } + ] + } + ] +} +``` + +### 3. Request Validation Check + +```bash +curl --request POST \ + --url http://localhost:8080/v1/check \ + --header 'Content-Type: application/json' \ + --data '{ + "validations": [ + { + "id": "item", + "variables": { + "price": -100, + "image": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAIAAACQd1PeAAAADElEQVR4nGO4unY2AAR4Ah51j5XwAAAAAElFTkSuQmCC" + } + }, + { + "id": "user", + "variables": { + "name": "longlonglonglongname" + } + } + ] +}' +``` + +```json +{ + "results": [ + { + "id": "user", + "isValid": false, + "message": "failed validations: size(name) < 20" + }, + { + "id": "item", + "isValid": false, + "message": "failed validations: price > 0" + } + ] +} +``` From 80c1cc0aed4153ba7a0f4cfbfc4b4d9712d1c4bb Mon Sep 17 00:00:00 2001 From: shibukazu Date: Sat, 24 Aug 2024 05:14:58 +0900 Subject: [PATCH 9/9] fix --- .github/workflows/test-runn.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-runn.yml b/.github/workflows/test-runn.yml index f992cf4..fd5b008 100644 --- a/.github/workflows/test-runn.yml +++ b/.github/workflows/test-runn.yml @@ -25,7 +25,7 @@ jobs: go install github.com/k1LoW/runn/cmd/runn@latest - name: "Start Server" run: | - go run go/cmd/open-ve/main.go run --http-addr=:8080 & + go run go/cmd/open-ve/main.go run --http-port=8080 & - name: "Run runn" run: | export ENDPOINT=http://0.0.0.0:8080