diff --git a/cmd/client/app.go b/cmd/client/app.go index 412a95f7..927d2415 100644 --- a/cmd/client/app.go +++ b/cmd/client/app.go @@ -44,5 +44,6 @@ func init() { \/__/ \/__/ \/__/ \/__/ `}, "\r\n")) }) + App.OnClosing(close) register(App) } diff --git a/cmd/client/cli/flydb-client.go b/cmd/client/cli/flydb-client.go index 0bef82cd..a8336cdf 100644 --- a/cmd/client/cli/flydb-client.go +++ b/cmd/client/cli/flydb-client.go @@ -2,9 +2,10 @@ package main import ( "fmt" + "os" + "github.com/ByteStorage/FlyDB/cmd/client" "github.com/desertbit/grumble" - "os" ) func main() { diff --git a/cmd/client/client.go b/cmd/client/client.go new file mode 100644 index 00000000..81b9a636 --- /dev/null +++ b/cmd/client/client.go @@ -0,0 +1,39 @@ +package client + +import ( + "fmt" + + "github.com/ByteStorage/FlyDB/db/grpc/client" +) + +var ( + Addr string + cliClient *client.Client +) + +func newClient() *client.Client { + var err error + if cliClient != nil { + return cliClient + } + + if cliClient, err = client.NewClient(Addr); err != nil { + fmt.Println("new client error: ", err) + } + + return cliClient +} + +func close() error { + if cliClient == nil { + return nil + } + + if err := cliClient.Close(); err != nil { + return err + } + + cliClient = nil + + return nil +} diff --git a/cmd/client/consts.go b/cmd/client/consts.go new file mode 100644 index 00000000..457874de --- /dev/null +++ b/cmd/client/consts.go @@ -0,0 +1,37 @@ +package client + +import "math" + +const ( + CommonKeyArg = "key" + CommonMemberArg = "member" + CommonMembersArg = "members" + CommonValueArg = "value" + + CommonDefaultEmptyString = "" + + ZSetScoreArg = "score" + ZSetStartArg = "start" + ZSetEndArg = "end" + ZSetIncrByArg = "incrBy" + ZSetMinArg = "min" + ZSetMaxArg = "max" + + ZSetDefaultScore = 0 + ZSetDefaultRangeStart = 0 + ZSetDefaultRangeEnd = math.MaxInt + ZSetDefaultIncrBy = 0 + ZSetDefaultMin = 0 + ZSetDefaultMax = math.MaxInt + + ZSetDefaultKeyHelp = "the key of the zset" + ZSetDefaultMemberHelp = "the member of the zset" + ZSetDefaultMembersHelp = "the members of the zset, separated by space, e.g. member1 member2 member3" + ZSetDefaultValueHelp = "the value of the zset" + ZSetDefaultScoreHelp = "the score of the zset" + ZSetDefaultStartHelp = "the start index of the zset" + ZSetDefaultEndHelp = "the end index of the zset" + ZSetDefaultIncrByHelp = "the increment value of the zset" + ZSetDefaultMinHelp = "the min score of the zset" + ZSetDefaultMaxHelp = "the max score of the zset" +) diff --git a/cmd/client/hash.go b/cmd/client/hash.go index 54ddbdc1..210cfcf7 100644 --- a/cmd/client/hash.go +++ b/cmd/client/hash.go @@ -2,6 +2,7 @@ package client import ( "fmt" + "github.com/desertbit/grumble" "github.com/pkg/errors" ) diff --git a/cmd/client/list.go b/cmd/client/list.go index 7eca0e96..958bd9e4 100644 --- a/cmd/client/list.go +++ b/cmd/client/list.go @@ -2,6 +2,7 @@ package client import ( "fmt" + "github.com/desertbit/grumble" ) diff --git a/cmd/client/root.go b/cmd/client/root.go index 1c1c74a8..1cf04d28 100644 --- a/cmd/client/root.go +++ b/cmd/client/root.go @@ -1,6 +1,8 @@ package client -import "github.com/desertbit/grumble" +import ( + "github.com/desertbit/grumble" +) func register(app *grumble.App) { app.AddCommand(&grumble.Command{ @@ -545,4 +547,118 @@ func register(app *grumble.App) { }, }) + app.AddCommand(&grumble.Command{ + Name: "ZAdd", + Help: "Add a member to a sorted set, or update its score if it already exists", + Run: ZSetAdd, + Args: func(a *grumble.Args) { + a.String(CommonKeyArg, ZSetDefaultKeyHelp, grumble.Default(CommonDefaultEmptyString)) + a.String(CommonMemberArg, ZSetDefaultMemberHelp, grumble.Default(CommonDefaultEmptyString)) + a.Int(ZSetScoreArg, ZSetDefaultScoreHelp, grumble.Default(ZSetDefaultScore)) + a.String(CommonValueArg, ZSetDefaultValueHelp, grumble.Default(CommonDefaultEmptyString)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "ZRem", + Help: "Remove a member from a sorted set", + Run: ZSetRem, + Args: func(a *grumble.Args) { + a.String(CommonKeyArg, ZSetDefaultKeyHelp, grumble.Default(CommonDefaultEmptyString)) + a.String(CommonMemberArg, ZSetDefaultMemberHelp, grumble.Default(CommonDefaultEmptyString)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "ZRems", + Help: "Remove multiple members from a sorted set", + Run: ZSetRems, + Args: func(a *grumble.Args) { + a.String(CommonKeyArg, ZSetDefaultKeyHelp, grumble.Default(CommonDefaultEmptyString)) + a.StringList(CommonMembersArg, ZSetDefaultMembersHelp, grumble.Default(CommonDefaultEmptyString)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "ZScore", + Help: "Get the score of a member in a sorted set", + Run: ZSetScore, + Args: func(a *grumble.Args) { + a.String(CommonKeyArg, ZSetDefaultKeyHelp, grumble.Default(CommonDefaultEmptyString)) + a.String(CommonMemberArg, ZSetDefaultMemberHelp, grumble.Default(CommonDefaultEmptyString)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "ZRank", + Help: "Get the rank of a member in a sorted set", + Run: ZSetRank, + Args: func(a *grumble.Args) { + a.String(CommonKeyArg, ZSetDefaultKeyHelp, grumble.Default(CommonDefaultEmptyString)) + a.String(CommonMemberArg, ZSetDefaultMemberHelp, grumble.Default(CommonDefaultEmptyString)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "ZRevRank", + Help: "Get the reverse rank of a member in a sorted set", + Run: ZSetRevRank, + Args: func(a *grumble.Args) { + a.String(CommonKeyArg, ZSetDefaultKeyHelp, grumble.Default(CommonDefaultEmptyString)) + a.String(CommonMemberArg, ZSetDefaultMemberHelp, grumble.Default(CommonDefaultEmptyString)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "ZRange", + Help: "Get a range of members in a sorted set", + Run: ZSetRange, + Args: func(a *grumble.Args) { + a.String(CommonKeyArg, ZSetDefaultKeyHelp, grumble.Default(CommonDefaultEmptyString)) + a.Int(ZSetStartArg, ZSetDefaultStartHelp, grumble.Default(ZSetDefaultRangeStart)) + a.Int(ZSetEndArg, ZSetDefaultEndHelp, grumble.Default(ZSetDefaultRangeEnd)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "ZCount", + Help: "Count the members in a sorted set with scores within the given range", + Run: ZSetCount, + Args: func(a *grumble.Args) { + a.String(CommonKeyArg, ZSetDefaultKeyHelp, grumble.Default(CommonDefaultEmptyString)) + a.Int(ZSetMinArg, ZSetDefaultMinHelp, grumble.Default(ZSetDefaultMin)) + a.Int(ZSetMaxArg, ZSetDefaultMaxHelp, grumble.Default(ZSetDefaultMax)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "ZRevRange", + Help: "Get a range of members in a sorted set, in reverse order", + Run: ZSetRevRange, + Args: func(a *grumble.Args) { + a.String(CommonKeyArg, ZSetDefaultKeyHelp, grumble.Default(CommonDefaultEmptyString)) + a.Int(ZSetStartArg, ZSetDefaultStartHelp, grumble.Default(ZSetDefaultRangeStart)) + a.Int(ZSetEndArg, ZSetDefaultEndHelp, grumble.Default(ZSetDefaultRangeEnd)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "ZCard", + Help: "Get the number of members in a sorted set", + Run: ZSetCard, + Args: func(a *grumble.Args) { + a.String(CommonKeyArg, ZSetDefaultKeyHelp, grumble.Default(CommonDefaultEmptyString)) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "ZIncrBy", + Help: "Increment the score of a member in a sorted set", + Run: ZSetIncrBy, + Args: func(a *grumble.Args) { + a.String(CommonKeyArg, ZSetDefaultKeyHelp, grumble.Default(CommonDefaultEmptyString)) + a.String(CommonMemberArg, ZSetDefaultMemberHelp, grumble.Default(CommonDefaultEmptyString)) + a.Int(ZSetIncrByArg, ZSetDefaultIncrByHelp, grumble.Default(ZSetDefaultIncrBy)) + }, + }) } diff --git a/cmd/client/set.go b/cmd/client/set.go index 6524af7b..6805fd95 100644 --- a/cmd/client/set.go +++ b/cmd/client/set.go @@ -2,6 +2,7 @@ package client import ( "fmt" + "github.com/desertbit/grumble" ) diff --git a/cmd/client/string.go b/cmd/client/string.go index ef33e820..8795a06e 100644 --- a/cmd/client/string.go +++ b/cmd/client/string.go @@ -3,18 +3,10 @@ package client import ( "errors" "fmt" - "github.com/ByteStorage/FlyDB/db/grpc/client" + "github.com/desertbit/grumble" ) -var Addr string - -func newClient() *client.Client { - return &client.Client{ - Addr: Addr, - } -} - func stringPutData(c *grumble.Context) error { key := c.Args.String("key") value := c.Args.String("value") diff --git a/cmd/client/zset.go b/cmd/client/zset.go new file mode 100644 index 00000000..b5430eac --- /dev/null +++ b/cmd/client/zset.go @@ -0,0 +1,330 @@ +package client + +import ( + "fmt" + "math" + + "github.com/desertbit/grumble" + pbany "github.com/golang/protobuf/ptypes/any" + + "github.com/ByteStorage/FlyDB/db/grpc/client" + "github.com/ByteStorage/FlyDB/lib/proto/gzset" +) + +func checkIsEmpty(key string, value string) bool { + if value == "" { + fmt.Printf("%s is empty\n", key) + return true + } + + return false +} + +func checkIsInt32(key string, value int) bool { + if value < math.MinInt32 || value > math.MaxInt32 { + fmt.Printf("%s is not int32\n", key) + return true + } + + return false +} + +func string2Any(value string) *pbany.Any { + return &pbany.Any{ + Value: []byte(value), + } +} + +func checkRange(start int, end int) bool { + if start > end { + fmt.Println("Start is greater than end") + return true + } + + return false +} + +func ZSetAdd(ctx *grumble.Context) error { + var ( + key = ctx.Args.String(CommonKeyArg) + member = ctx.Args.String(CommonMemberArg) + score = ctx.Args.Int(ZSetScoreArg) + value = ctx.Args.String(CommonValueArg) + zSetValue = &gzset.ZSetValue{} + err error + ) + + if checkIsEmpty(CommonKeyArg, key) || checkIsEmpty(CommonMemberArg, member) || checkIsEmpty(CommonValueArg, value) || checkIsInt32(ZSetScoreArg, score) { + return nil + } + + zSetValue.Member = member + zSetValue.Score = int32(score) + zSetValue.Value = string2Any(value) + + if _, err = newClient().ZAdd(key, zSetValue); err != nil { + fmt.Println("ZAdd data error: ", err) + + return err + } + + fmt.Println("ZAdd data success") + + return nil +} + +func ZSetRem(ctx *grumble.Context) error { + var ( + key = ctx.Args.String(CommonKeyArg) + member = ctx.Args.String(CommonMemberArg) + err error + ) + + if checkIsEmpty(CommonKeyArg, key) || checkIsEmpty(CommonMemberArg, member) { + return nil + } + + if _, err = newClient().ZRem(key, member); err != nil { + fmt.Println("ZRem data error: ", err) + + return err + } + + fmt.Println("ZRem data success") + + return nil +} + +func ZSetRems(ctx *grumble.Context) error { + var ( + key = ctx.Args.String(CommonKeyArg) + memberList = ctx.Args.StringList(CommonMembersArg) + err error + ) + + if checkIsEmpty(CommonKeyArg, key) { + return nil + } + + if _, err = newClient().ZRems(key, memberList); err != nil { + fmt.Println("ZRems data error: ", err) + + return err + } + + fmt.Println("ZRems data success") + + return nil +} + +func ZSetScore(ctx *grumble.Context) error { + var ( + key = ctx.Args.String(CommonKeyArg) + member = ctx.Args.String(CommonMemberArg) + response *gzset.ZScoreResponse + err error + ) + + if checkIsEmpty(CommonKeyArg, key) || checkIsEmpty(CommonMemberArg, member) { + return nil + } + + if response, err = newClient().ZScore(key, member); err != nil { + fmt.Println("ZScore data error: ", err) + + return err + } + + fmt.Printf("ZScore data success, score: %d\n", response.Score) + + return nil +} + +func ZSetRank(ctx *grumble.Context) error { + var ( + key = ctx.Args.String(CommonKeyArg) + member = ctx.Args.String(CommonMemberArg) + response *gzset.ZRankResponse + err error + ) + + if checkIsEmpty(CommonKeyArg, key) || checkIsEmpty(CommonMemberArg, member) { + return nil + } + + if response, err = newClient().ZRank(key, member); err != nil { + fmt.Println("ZRank data error: ", err) + + return err + } + + fmt.Printf("ZRank data success, rank: %d\n", response.Rank) + + return nil +} + +func ZSetRevRank(ctx *grumble.Context) error { + var ( + key = ctx.Args.String(CommonKeyArg) + member = ctx.Args.String(CommonMemberArg) + response *gzset.ZRevRankResponse + err error + ) + + if checkIsEmpty(CommonKeyArg, key) || checkIsEmpty(CommonMemberArg, member) { + return nil + } + + if response, err = newClient().ZRevRank(key, member); err != nil { + fmt.Println("ZRevRank data error: ", err) + + return err + } + + fmt.Printf("ZRevRank data success, rank: %d\n", response.Rank) + + return nil +} + +func printZSetResponse(memberList []*gzset.ZSetValue) { + for _, member := range memberList { + fmt.Printf("Member: %s, Score: %d, Value: %s\n", member.Member, member.Score, member.Value.Value) + } +} + +func ZSetRange(ctx *grumble.Context) error { + var ( + key = ctx.Args.String(CommonKeyArg) + start = ctx.Args.Int(ZSetStartArg) + end = ctx.Args.Int(ZSetEndArg) + requestRange = &client.Range{} + response *gzset.ZRangeResponse + err error + ) + + if checkIsEmpty(CommonKeyArg, key) || checkIsInt32(ZSetStartArg, start) || checkIsInt32(ZSetEndArg, end) || checkRange(start, end) { + return nil + } + + requestRange.Start = int32(start) + requestRange.End = int32(end) + + if response, err = newClient().ZRange(key, requestRange); err != nil { + fmt.Println("ZRange data error: ", err) + + return err + } + + fmt.Println("ZRange data success") + printZSetResponse(response.Members) + + return nil +} + +func ZSetCount(ctx *grumble.Context) error { + var ( + key = ctx.Args.String(CommonKeyArg) + minArg = ctx.Args.Int(ZSetMinArg) + maxArg = ctx.Args.Int(ZSetMaxArg) + requestRange = &client.Range{} + response *gzset.ZCountResponse + err error + ) + + if checkIsEmpty(CommonKeyArg, key) || checkIsInt32(ZSetStartArg, minArg) || checkIsInt32(ZSetEndArg, maxArg) || checkRange(minArg, maxArg) { + return nil + } + + requestRange.Start = int32(minArg) + requestRange.End = int32(maxArg) + + if response, err = newClient().ZCount(key, requestRange); err != nil { + fmt.Println("ZCount data error: ", err) + + return err + } + + fmt.Printf("ZCount data success, count: %d\n", response.Count) + + return nil +} + +func ZSetRevRange(ctx *grumble.Context) error { + var ( + key = ctx.Args.String(CommonKeyArg) + start = ctx.Args.Int(ZSetStartArg) + end = ctx.Args.Int(ZSetEndArg) + requestRange = &client.Range{} + response *gzset.ZRevRangeResponse + err error + ) + + if checkIsEmpty(CommonKeyArg, key) || checkIsInt32(ZSetStartArg, start) || checkIsInt32(ZSetEndArg, end) || checkRange(start, end) { + return nil + } + + requestRange.Start = int32(start) + requestRange.End = int32(end) + + if response, err = newClient().ZRevRange(key, requestRange); err != nil { + fmt.Println("ZRevRange data error: ", err) + + return err + } + + fmt.Println("ZRevRange data success") + printZSetResponse(response.Members) + + return nil + +} + +func ZSetCard(ctx *grumble.Context) error { + var ( + key = ctx.Args.String(CommonKeyArg) + response *gzset.ZCardResponse + err error + ) + + if checkIsEmpty(CommonKeyArg, key) { + return nil + } + + if response, err = newClient().ZCard(key); err != nil { + fmt.Println("ZCard data error: ", err) + + return err + } + + fmt.Printf("ZCard data success, count: %d\n", response.Count) + + return nil +} + +func ZSetIncrBy(ctx *grumble.Context) error { + var ( + key = ctx.Args.String(CommonKeyArg) + member = ctx.Args.String(CommonMemberArg) + incrBy = ctx.Args.Int(ZSetIncrByArg) + incr = &client.Incr{} + response *gzset.ZIncrByResponse + err error + ) + + if checkIsEmpty(CommonKeyArg, key) || checkIsEmpty(CommonMemberArg, member) || checkIsInt32(ZSetIncrByArg, incrBy) { + return nil + } + + incr.Member = member + incr.Inc = int32(incrBy) + + if response, err = newClient().ZIncrBy(key, incr); err != nil { + fmt.Println("ZIncrBy data error: ", err) + + return err + } + + fmt.Printf("ZIncrBy data success, new score: %d\n", response.NewScore) + + return nil +} diff --git a/db/grpc/base.go b/db/grpc/base.go index 728e745b..73f70f5e 100644 --- a/db/grpc/base.go +++ b/db/grpc/base.go @@ -2,20 +2,23 @@ package base import ( "fmt" + "net" + "os" + "os/signal" + "syscall" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "github.com/ByteStorage/FlyDB/config" "github.com/ByteStorage/FlyDB/db/grpc/service" "github.com/ByteStorage/FlyDB/lib/proto/ghash" "github.com/ByteStorage/FlyDB/lib/proto/glist" "github.com/ByteStorage/FlyDB/lib/proto/gset" "github.com/ByteStorage/FlyDB/lib/proto/gstring" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/health" - "google.golang.org/grpc/health/grpc_health_v1" - "net" - "os" - "os/signal" - "syscall" + "github.com/ByteStorage/FlyDB/lib/proto/gzset" ) type Base interface { @@ -69,6 +72,13 @@ func NewService(options config.Options, addr string) (Base, error) { baseService.RegisterService(setService) gset.RegisterGSetServiceServer(baseService.server, setService) + zsetService, err := service.NewZSetService(options) + if err != nil { + return nil, err + } + baseService.RegisterService(zsetService) + gzset.RegisterGZSetServiceServer(baseService.server, zsetService) + return baseService, nil } diff --git a/db/grpc/client/client.go b/db/grpc/client/client.go index e9e99f92..42c5f5fe 100644 --- a/db/grpc/client/client.go +++ b/db/grpc/client/client.go @@ -1,52 +1,158 @@ package client import ( + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "github.com/ByteStorage/FlyDB/lib/proto/ghash" "github.com/ByteStorage/FlyDB/lib/proto/glist" "github.com/ByteStorage/FlyDB/lib/proto/gset" "github.com/ByteStorage/FlyDB/lib/proto/gstring" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" + "github.com/ByteStorage/FlyDB/lib/proto/gzset" ) // Client is a grpc client type Client struct { - Addr string // db server address + Addr string // db server address + conn *grpc.ClientConn + gStringServiceClient gstring.GStringServiceClient + gHashServiceClient ghash.GHashServiceClient + gListServiceClient glist.GListServiceClient + gSetServiceClient gset.GSetServiceClient + gZSetServiceClient gzset.GZSetServiceClient +} + +func NewClient(addr string) (*Client, error) { + var ( + conn *grpc.ClientConn + err error + ) + + if conn, err = grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil { + return nil, err + } + + return &Client{ + Addr: addr, + conn: conn, + }, nil +} + +func (c *Client) getGrpcConn() (*grpc.ClientConn, error) { + if c.conn == nil { + conn, err := grpc.Dial(c.Addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, err + } + + c.conn = conn + } + + return c.conn, nil } // newGrpcClient returns a grpc client -func newGrpcClient(addr string) (gstring.GStringServiceClient, error) { - conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { +func (c *Client) newGrpcClient() (gstring.GStringServiceClient, error) { + var ( + conn *grpc.ClientConn + err error + ) + + if c.gStringServiceClient != nil { + return c.gStringServiceClient, nil + } + + if conn, err = c.getGrpcConn(); err != nil { return nil, err } - client := gstring.NewGStringServiceClient(conn) - return client, nil + + c.gStringServiceClient = gstring.NewGStringServiceClient(conn) + + return c.gStringServiceClient, nil } -func newHashGrpcClient(addr string) (ghash.GHashServiceClient, error) { - conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { +func (c *Client) newHashGrpcClient() (ghash.GHashServiceClient, error) { + var ( + conn *grpc.ClientConn + err error + ) + + if c.gHashServiceClient != nil { + return c.gHashServiceClient, nil + } + + if conn, err = c.getGrpcConn(); err != nil { return nil, err } - client := ghash.NewGHashServiceClient(conn) - return client, nil + + c.gHashServiceClient = ghash.NewGHashServiceClient(conn) + + return c.gHashServiceClient, nil +} + +func (c *Client) newListGrpcClient() (glist.GListServiceClient, error) { + var ( + conn *grpc.ClientConn + err error + ) + + if c.gListServiceClient != nil { + return c.gListServiceClient, nil + } + + if conn, err = c.getGrpcConn(); err != nil { + return nil, err + } + + c.gListServiceClient = glist.NewGListServiceClient(conn) + + return c.gListServiceClient, nil } -func newListGrpcClient(addr string) (glist.GListServiceClient, error) { - conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { +func (c *Client) newSetGrpcClient() (gset.GSetServiceClient, error) { + var ( + conn *grpc.ClientConn + err error + ) + + if c.gSetServiceClient != nil { + return c.gSetServiceClient, nil + } + + if conn, err = c.getGrpcConn(); err != nil { return nil, err } - client := glist.NewGListServiceClient(conn) - return client, nil + + c.gSetServiceClient = gset.NewGSetServiceClient(conn) + + return c.gSetServiceClient, nil } -func newSetGrpcClient(addr string) (gset.GSetServiceClient, error) { - conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { +func (c *Client) newZSetGrpcClient() (gzset.GZSetServiceClient, error) { + var ( + conn *grpc.ClientConn + err error + ) + + if c.gZSetServiceClient != nil { + return c.gZSetServiceClient, nil + } + + if conn, err = c.getGrpcConn(); err != nil { return nil, err } - client := gset.NewGSetServiceClient(conn) - return client, nil + + c.gZSetServiceClient = gzset.NewGZSetServiceClient(conn) + + return c.gZSetServiceClient, nil +} + +func (c *Client) Close() error { + if c.conn != nil { + if err := c.conn.Close(); err != nil { + return err + } + } + + return nil } diff --git a/db/grpc/client/hash.go b/db/grpc/client/hash.go index db7dbc6e..49b7f402 100644 --- a/db/grpc/client/hash.go +++ b/db/grpc/client/hash.go @@ -3,12 +3,13 @@ package client import ( "context" "errors" + "github.com/ByteStorage/FlyDB/lib/proto/ghash" ) // HSet puts a key-value pair into the db by client api func (c *Client) HSet(key, field string, value interface{}) error { - client, err := newHashGrpcClient(c.Addr) + client, err := c.newHashGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } @@ -30,7 +31,7 @@ func (c *Client) HSet(key, field string, value interface{}) error { // HGet gets a value by key from the db by client api func (c *Client) HGet(key, field string) (interface{}, error) { - client, err := newHashGrpcClient(c.Addr) + client, err := c.newHashGrpcClient() if err != nil { return nil, err } @@ -48,7 +49,7 @@ func (c *Client) HGet(key, field string) (interface{}, error) { // HDel deletes a key-value pair from the db by client api func (c *Client) HDel(key, field string) error { - client, err := newHashGrpcClient(c.Addr) + client, err := c.newHashGrpcClient() if err != nil { return err } @@ -108,7 +109,7 @@ func getValue(resp *ghash.GHashGetResponse) (interface{}, error) { } func (c *Client) HExists(key, field string) (bool, error) { - client, err := newHashGrpcClient(c.Addr) + client, err := c.newHashGrpcClient() if err != nil { return false, err } @@ -123,7 +124,7 @@ func (c *Client) HExists(key, field string) (bool, error) { } func (c *Client) HLen(key string) (int64, error) { - client, err := newHashGrpcClient(c.Addr) + client, err := c.newHashGrpcClient() if err != nil { return 0, err } @@ -138,7 +139,7 @@ func (c *Client) HLen(key string) (int64, error) { } func (c *Client) HUpdate(key, field string, value interface{}) error { - client, err := newHashGrpcClient(c.Addr) + client, err := c.newHashGrpcClient() if err != nil { return err } @@ -182,7 +183,7 @@ func updateValue(req *ghash.GHashUpdateRequest, value interface{}) error { } func (c *Client) HIncrBy(key, field string, value int64) (int64, error) { - client, err := newHashGrpcClient(c.Addr) + client, err := c.newHashGrpcClient() if err != nil { return 0, err } @@ -197,7 +198,7 @@ func (c *Client) HIncrBy(key, field string, value int64) (int64, error) { } func (c *Client) HIncrByFloat(key, field string, value float64) (float64, error) { - client, err := newHashGrpcClient(c.Addr) + client, err := c.newHashGrpcClient() if err != nil { return 0.0, err } @@ -212,7 +213,7 @@ func (c *Client) HIncrByFloat(key, field string, value float64) (float64, error) } func (c *Client) HDecrBy(key, field string, value int64) (int64, error) { - client, err := newHashGrpcClient(c.Addr) + client, err := c.newHashGrpcClient() if err != nil { return 0, err } @@ -227,7 +228,7 @@ func (c *Client) HDecrBy(key, field string, value int64) (int64, error) { } func (c *Client) HStrLen(key, field string) (int64, error) { - client, err := newHashGrpcClient(c.Addr) + client, err := c.newHashGrpcClient() if err != nil { return 0, err } @@ -242,7 +243,7 @@ func (c *Client) HStrLen(key, field string) (int64, error) { } func (c *Client) HMove(key, dest, field string) error { - client, err := newHashGrpcClient(c.Addr) + client, err := c.newHashGrpcClient() if err != nil { return err } @@ -257,7 +258,7 @@ func (c *Client) HMove(key, dest, field string) error { } func (c *Client) HSetNX(key, field string, value interface{}) error { - client, err := newHashGrpcClient(c.Addr) + client, err := c.newHashGrpcClient() if err != nil { return err } @@ -300,7 +301,7 @@ func setNXValue(req *ghash.GHashSetNXRequest, value interface{}) error { } func (c *Client) HType(key, field string) (string, error) { - client, err := newHashGrpcClient(c.Addr) + client, err := c.newHashGrpcClient() if err != nil { return "", err } diff --git a/db/grpc/client/list.go b/db/grpc/client/list.go index 7d743293..03785dd2 100644 --- a/db/grpc/client/list.go +++ b/db/grpc/client/list.go @@ -3,11 +3,12 @@ package client import ( "context" "errors" + "github.com/ByteStorage/FlyDB/lib/proto/glist" ) func (c *Client) LPush(key string, value interface{}) error { - client, err := newListGrpcClient(c.Addr) + client, err := c.newListGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } @@ -43,7 +44,7 @@ func (c *Client) LPush(key string, value interface{}) error { } func (c *Client) LPushs(key string, values []interface{}) error { - client, err := newListGrpcClient(c.Addr) + client, err := c.newListGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } @@ -81,7 +82,7 @@ func (c *Client) LPushs(key string, values []interface{}) error { } func (c *Client) RPush(key string, value interface{}) error { - client, err := newListGrpcClient(c.Addr) + client, err := c.newListGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } @@ -118,7 +119,7 @@ func (c *Client) RPush(key string, value interface{}) error { } func (c *Client) RPushs(key string, values []interface{}) error { - client, err := newListGrpcClient(c.Addr) + client, err := c.newListGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } @@ -157,7 +158,7 @@ func (c *Client) RPushs(key string, values []interface{}) error { } func (c *Client) LPop(key string) (interface{}, error) { - client, err := newListGrpcClient(c.Addr) + client, err := c.newListGrpcClient() if err != nil { return nil, errors.New("new grpc client error: " + err.Error()) } @@ -190,7 +191,7 @@ func (c *Client) LPop(key string) (interface{}, error) { } func (c *Client) RPop(key string) (interface{}, error) { - client, err := newListGrpcClient(c.Addr) + client, err := c.newListGrpcClient() if err != nil { return nil, errors.New("new grpc client error: " + err.Error()) } @@ -223,7 +224,7 @@ func (c *Client) RPop(key string) (interface{}, error) { } func (c *Client) LRange(key string, start, stop int) ([]interface{}, error) { - client, err := newListGrpcClient(c.Addr) + client, err := c.newListGrpcClient() if err != nil { return nil, errors.New("new grpc client error: " + err.Error()) } @@ -266,7 +267,7 @@ func (c *Client) LRange(key string, start, stop int) ([]interface{}, error) { } func (c *Client) LLen(key string) (int32, error) { - client, err := newListGrpcClient(c.Addr) + client, err := c.newListGrpcClient() if err != nil { return 0, errors.New("new grpc client error: " + err.Error()) } @@ -281,7 +282,7 @@ func (c *Client) LLen(key string) (int32, error) { } func (c *Client) LRem(key string, count int32, value interface{}) error { - client, err := newListGrpcClient(c.Addr) + client, err := c.newListGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } @@ -318,7 +319,7 @@ func (c *Client) LRem(key string, count int32, value interface{}) error { } func (c *Client) LIndex(key string, index int) (interface{}, error) { - client, err := newListGrpcClient(c.Addr) + client, err := c.newListGrpcClient() if err != nil { return nil, errors.New("new grpc client error: " + err.Error()) } @@ -354,7 +355,7 @@ func (c *Client) LIndex(key string, index int) (interface{}, error) { } func (c *Client) LSet(key string, index int, value interface{}) error { - client, err := newListGrpcClient(c.Addr) + client, err := c.newListGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } @@ -395,7 +396,7 @@ func (c *Client) LSet(key string, index int, value interface{}) error { } func (c *Client) LTrim(key string, start, stop int) error { - client, err := newListGrpcClient(c.Addr) + client, err := c.newListGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } diff --git a/db/grpc/client/set.go b/db/grpc/client/set.go index 061ba93e..a0314982 100644 --- a/db/grpc/client/set.go +++ b/db/grpc/client/set.go @@ -3,11 +3,12 @@ package client import ( "context" "errors" + "github.com/ByteStorage/FlyDB/lib/proto/gset" ) func (c *Client) SAdd(key, member string) error { - client, err := newSetGrpcClient(c.Addr) + client, err := c.newSetGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } @@ -23,7 +24,7 @@ func (c *Client) SAdd(key, member string) error { } func (c *Client) SAdds(key string, members []string) error { - client, err := newSetGrpcClient(c.Addr) + client, err := c.newSetGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } @@ -39,7 +40,7 @@ func (c *Client) SAdds(key string, members []string) error { } func (c *Client) SRem(key, member string) error { - client, err := newSetGrpcClient(c.Addr) + client, err := c.newSetGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } @@ -55,7 +56,7 @@ func (c *Client) SRem(key, member string) error { } func (c *Client) SRems(key string, members []string) error { - client, err := newSetGrpcClient(c.Addr) + client, err := c.newSetGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } @@ -71,7 +72,7 @@ func (c *Client) SRems(key string, members []string) error { } func (c *Client) SCard(key string) (int32, error) { - client, err := newSetGrpcClient(c.Addr) + client, err := c.newSetGrpcClient() if err != nil { return 0, errors.New("new grpc client error: " + err.Error()) } @@ -84,7 +85,7 @@ func (c *Client) SCard(key string) (int32, error) { } func (c *Client) SMembers(key string) ([]string, error) { - client, err := newSetGrpcClient(c.Addr) + client, err := c.newSetGrpcClient() if err != nil { return nil, errors.New("new grpc client error: " + err.Error()) } @@ -97,7 +98,7 @@ func (c *Client) SMembers(key string) ([]string, error) { } func (c *Client) SIsMember(key, member string) (bool, error) { - client, err := newSetGrpcClient(c.Addr) + client, err := c.newSetGrpcClient() if err != nil { return false, errors.New("new grpc client error: " + err.Error()) } @@ -110,7 +111,7 @@ func (c *Client) SIsMember(key, member string) (bool, error) { } func (c *Client) SUnion(keys []string) ([]string, error) { - client, err := newSetGrpcClient(c.Addr) + client, err := c.newSetGrpcClient() if err != nil { return nil, errors.New("new grpc client error: " + err.Error()) } @@ -123,7 +124,7 @@ func (c *Client) SUnion(keys []string) ([]string, error) { } func (c *Client) SInter(keys []string) ([]string, error) { - client, err := newSetGrpcClient(c.Addr) + client, err := c.newSetGrpcClient() if err != nil { return nil, errors.New("new grpc client error: " + err.Error()) } @@ -136,7 +137,7 @@ func (c *Client) SInter(keys []string) ([]string, error) { } func (c *Client) SDiff(keys []string) ([]string, error) { - client, err := newSetGrpcClient(c.Addr) + client, err := c.newSetGrpcClient() if err != nil { return nil, errors.New("new grpc client error: " + err.Error()) } @@ -149,7 +150,7 @@ func (c *Client) SDiff(keys []string) ([]string, error) { } func (c *Client) SUnionStore(destination string, keys []string) error { - client, err := newSetGrpcClient(c.Addr) + client, err := c.newSetGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } @@ -165,7 +166,7 @@ func (c *Client) SUnionStore(destination string, keys []string) error { } func (c *Client) SInterStore(destinationKey string, keys []string) error { - client, err := newSetGrpcClient(c.Addr) + client, err := c.newSetGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } diff --git a/db/grpc/client/string.go b/db/grpc/client/string.go index 2d570a20..3d0bb2c0 100644 --- a/db/grpc/client/string.go +++ b/db/grpc/client/string.go @@ -4,12 +4,13 @@ import ( "context" "errors" "fmt" + "github.com/ByteStorage/FlyDB/lib/proto/gstring" ) // Put puts a key-value pair into the db by client api func (c *Client) Put(key string, value interface{}) error { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return errors.New("new grpc client error: " + err.Error()) } @@ -44,7 +45,7 @@ func (c *Client) Put(key string, value interface{}) error { // Get gets a value by key from the db by client api func (c *Client) Get(key string) (interface{}, error) { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return nil, err } @@ -74,7 +75,7 @@ func (c *Client) Get(key string) (interface{}, error) { // Del deletes a key-value pair from the db by client api func (c *Client) Del(key string) error { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return err } @@ -89,7 +90,7 @@ func (c *Client) Del(key string) error { } func (c *Client) StrLen(key string) (int32, error) { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return 0, nil } @@ -101,7 +102,7 @@ func (c *Client) StrLen(key string) (int32, error) { } func (c *Client) Type(key string) (string, error) { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return "", err } @@ -115,7 +116,7 @@ func (c *Client) Type(key string) (string, error) { } func (c *Client) Append(key string, value string) error { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return err } @@ -130,7 +131,7 @@ func (c *Client) Append(key string, value string) error { } func (c *Client) GetSet(key string, value interface{}) (interface{}, error) { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return err, nil } @@ -183,7 +184,7 @@ func (c *Client) GetSet(key string, value interface{}) (interface{}, error) { } func (c *Client) Incr(key string) error { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return err } @@ -196,7 +197,7 @@ func (c *Client) Incr(key string) error { } func (c *Client) IncrBy(key string, amount int64) error { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return err } @@ -210,7 +211,7 @@ func (c *Client) IncrBy(key string, amount int64) error { } func (c *Client) IncrByFloat(key string, amount float64) error { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return err } @@ -225,7 +226,7 @@ func (c *Client) IncrByFloat(key string, amount float64) error { } func (c *Client) Decr(key string) error { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return err } @@ -240,7 +241,7 @@ func (c *Client) Decr(key string) error { } func (c *Client) DecrBy(key string, amount int64) error { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return err } @@ -255,7 +256,7 @@ func (c *Client) DecrBy(key string, amount int64) error { } func (c *Client) Exists(key string) (bool, error) { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return false, nil } @@ -267,7 +268,7 @@ func (c *Client) Exists(key string) (bool, error) { } func (c *Client) Expire(key string, ttl int64) error { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return err } @@ -282,7 +283,7 @@ func (c *Client) Expire(key string, ttl int64) error { } func (c *Client) Persist(key string) error { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return err } @@ -297,7 +298,7 @@ func (c *Client) Persist(key string) error { } func (c *Client) MGet(keys []string) ([]interface{}, error) { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return nil, err } @@ -329,7 +330,7 @@ func (c *Client) MGet(keys []string) ([]interface{}, error) { } func (c *Client) MSet(pairs ...interface{}) error { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return err } @@ -376,7 +377,7 @@ func (c *Client) MSet(pairs ...interface{}) error { } func (c *Client) MSetNX(pairs ...interface{}) error { - client, err := newGrpcClient(c.Addr) + client, err := c.newGrpcClient() if err != nil { return err } diff --git a/db/grpc/client/zset.go b/db/grpc/client/zset.go new file mode 100644 index 00000000..aebfc4ce --- /dev/null +++ b/db/grpc/client/zset.go @@ -0,0 +1,246 @@ +package client + +import ( + "context" + "errors" + + "github.com/ByteStorage/FlyDB/lib/proto/gzset" +) + +type Range struct { + Start int32 + End int32 +} + +type Incr struct { + Member string + Inc int32 +} + +func (c *Client) ZAdd(key string, member *gzset.ZSetValue) (*gzset.ZAddResponse, error) { + var ( + client gzset.GZSetServiceClient + request = &gzset.ZAddRequest{Key: key, Member: member} + response *gzset.ZAddResponse + err error + ) + + if client, err = c.newZSetGrpcClient(); err != nil { + return nil, errors.New("new grpc client error: " + err.Error()) + } + + if response, err = client.ZAdd(context.Background(), request); err != nil { + return nil, err + } + + return response, nil +} + +func (c *Client) ZAdds(key string, member []*gzset.ZSetValue) (*gzset.ZAddsResponse, error) { + var ( + client gzset.GZSetServiceClient + request = &gzset.ZAddsRequest{Key: key, Members: member} + response *gzset.ZAddsResponse + err error + ) + + if client, err = c.newZSetGrpcClient(); err != nil { + return nil, errors.New("new grpc client error: " + err.Error()) + } + + if response, err = client.ZAdds(context.Background(), request); err != nil { + return nil, err + } + + return response, nil +} + +func (c *Client) ZRem(key string, member string) (*gzset.ZRemResponse, error) { + var ( + client gzset.GZSetServiceClient + request = &gzset.ZRemRequest{Key: key, Member: member} + response *gzset.ZRemResponse + err error + ) + + if client, err = c.newZSetGrpcClient(); err != nil { + return nil, errors.New("new grpc client error: " + err.Error()) + } + + if response, err = client.ZRem(context.Background(), request); err != nil { + return nil, err + } + + return response, nil +} + +func (c *Client) ZRems(key string, members []string) (*gzset.ZRemsResponse, error) { + var ( + client gzset.GZSetServiceClient + request = &gzset.ZRemsRequest{Key: key, Members: members} + response *gzset.ZRemsResponse + err error + ) + + if client, err = c.newZSetGrpcClient(); err != nil { + return nil, errors.New("new grpc client error: " + err.Error()) + } + + if response, err = client.ZRems(context.Background(), request); err != nil { + return nil, err + } + + return response, nil +} + +func (c *Client) ZScore(key string, member string) (*gzset.ZScoreResponse, error) { + var ( + client gzset.GZSetServiceClient + request = &gzset.ZScoreRequest{Key: key, Member: member} + response *gzset.ZScoreResponse + err error + ) + + if client, err = c.newZSetGrpcClient(); err != nil { + return nil, errors.New("new grpc client error: " + err.Error()) + } + + if response, err = client.ZScore(context.Background(), request); err != nil { + return nil, err + } + + return response, nil +} + +func (c *Client) ZRank(key string, member string) (*gzset.ZRankResponse, error) { + var ( + client gzset.GZSetServiceClient + request = &gzset.ZRankRequest{Key: key, Member: member} + response *gzset.ZRankResponse + err error + ) + + if client, err = c.newZSetGrpcClient(); err != nil { + return nil, errors.New("new grpc client error: " + err.Error()) + } + + if response, err = client.ZRank(context.Background(), request); err != nil { + return nil, err + } + + return response, nil +} + +func (c *Client) ZRevRank(key string, member string) (*gzset.ZRevRankResponse, error) { + var ( + client gzset.GZSetServiceClient + request = &gzset.ZRevRankRequest{Key: key, Member: member} + response *gzset.ZRevRankResponse + err error + ) + + if client, err = c.newZSetGrpcClient(); err != nil { + return nil, errors.New("new grpc client error: " + err.Error()) + } + + if response, err = client.ZRevRank(context.Background(), request); err != nil { + return nil, err + } + + return response, nil +} + +func (c *Client) ZRange(key string, requestRange *Range) (*gzset.ZRangeResponse, error) { + var ( + client gzset.GZSetServiceClient + request = &gzset.ZRangeRequest{Key: key, Start: requestRange.Start, End: requestRange.End} + response *gzset.ZRangeResponse + err error + ) + + if client, err = c.newZSetGrpcClient(); err != nil { + return nil, errors.New("new grpc client error: " + err.Error()) + } + + if response, err = client.ZRange(context.Background(), request); err != nil { + return nil, err + } + + return response, nil +} + +func (c *Client) ZCount(key string, requestRange *Range) (*gzset.ZCountResponse, error) { + var ( + client gzset.GZSetServiceClient + request = &gzset.ZCountRequest{Key: key, Min: requestRange.Start, Max: requestRange.End} + response *gzset.ZCountResponse + err error + ) + + if client, err = c.newZSetGrpcClient(); err != nil { + return nil, errors.New("new grpc client error: " + err.Error()) + } + + if response, err = client.ZCount(context.Background(), request); err != nil { + return nil, err + } + + return response, nil +} + +func (c *Client) ZRevRange(key string, requestRange *Range) (*gzset.ZRevRangeResponse, error) { + var ( + client gzset.GZSetServiceClient + request = &gzset.ZRevRangeRequest{Key: key, StartRank: requestRange.Start, EndRank: requestRange.End} + response *gzset.ZRevRangeResponse + err error + ) + + if client, err = c.newZSetGrpcClient(); err != nil { + return nil, errors.New("new grpc client error: " + err.Error()) + } + + if response, err = client.ZRevRange(context.Background(), request); err != nil { + return nil, err + } + + return response, nil +} + +func (c *Client) ZCard(key string) (*gzset.ZCardResponse, error) { + var ( + client gzset.GZSetServiceClient + request = &gzset.ZCardRequest{Key: key} + response *gzset.ZCardResponse + err error + ) + + if client, err = c.newZSetGrpcClient(); err != nil { + return nil, errors.New("new grpc client error: " + err.Error()) + } + + if response, err = client.ZCard(context.Background(), request); err != nil { + return nil, err + } + + return response, nil +} + +func (c *Client) ZIncrBy(key string, incr *Incr) (*gzset.ZIncrByResponse, error) { + var ( + client gzset.GZSetServiceClient + request = &gzset.ZIncrByRequest{Key: key, Member: incr.Member, IncBy: incr.Inc} + response *gzset.ZIncrByResponse + err error + ) + + if client, err = c.newZSetGrpcClient(); err != nil { + return nil, errors.New("new grpc client error: " + err.Error()) + } + + if response, err = client.ZIncrBy(context.Background(), request); err != nil { + return nil, err + } + + return response, nil +} diff --git a/db/grpc/service/zset.go b/db/grpc/service/zset.go new file mode 100644 index 00000000..6940f51b --- /dev/null +++ b/db/grpc/service/zset.go @@ -0,0 +1,258 @@ +package service + +import ( + "context" + "errors" + "math" + + pbany "github.com/golang/protobuf/ptypes/any" + + "github.com/ByteStorage/FlyDB/config" + "github.com/ByteStorage/FlyDB/lib/encoding" + "github.com/ByteStorage/FlyDB/lib/proto/gzset" + "github.com/ByteStorage/FlyDB/structure" +) + +type ZSetService interface { + Base + gzset.GZSetServiceServer +} + +var _ ZSetService = &zSet{} + +type zSet struct { + dbs *structure.ZSetStructure + gzset.GZSetServiceServer +} + +func NewZSetService(options config.Options) (ZSetService, error) { + zSetStructure, err := structure.NewZSetStructure(options) + if err != nil { + return nil, err + } + return &zSet{dbs: zSetStructure}, nil +} + +func (z *zSet) CloseDb() error { + return z.dbs.Stop() +} + +func (z *zSet) checkScoreIntIsInt32(score int) bool { + return score >= math.MinInt32 && score <= math.MaxInt32 +} + +func (z *zSet) ZAdd(_ context.Context, request *gzset.ZAddRequest) (*gzset.ZAddResponse, error) { + var ( + err error + ) + + if err = z.dbs.ZAdd(request.Key, int(request.Member.Score), request.Member.Member, + string(request.Member.Value.Value)); err != nil { + return &gzset.ZAddResponse{Success: false}, err + } + + return &gzset.ZAddResponse{Success: true}, nil +} + +func (z *zSet) ZAdds(_ context.Context, request *gzset.ZAddsRequest) (*gzset.ZAddsResponse, error) { + for _, member := range request.Members { + var ( + err error + ) + + if !z.checkScoreIntIsInt32(int(member.Score)) { + return &gzset.ZAddsResponse{Success: false}, nil + } + + if err = z.dbs.ZAdd(request.Key, int(member.Score), member.Member, string(member.Value.GetValue())); err != nil { + return &gzset.ZAddsResponse{Success: false}, err + } + + } + + return &gzset.ZAddsResponse{Success: true}, nil +} + +func (z *zSet) ZRem(_ context.Context, request *gzset.ZRemRequest) (*gzset.ZRemResponse, error) { + if err := z.dbs.ZRem(request.Key, request.Member); err != nil { + return &gzset.ZRemResponse{Success: false}, err + } + + return &gzset.ZRemResponse{Success: true}, nil +} + +func (z *zSet) ZRems(_ context.Context, request *gzset.ZRemsRequest) (*gzset.ZRemsResponse, error) { + if err := z.dbs.ZRems(request.Key, request.Members...); err != nil { + return &gzset.ZRemsResponse{Success: false}, err + } + + return &gzset.ZRemsResponse{Success: true}, nil +} + +func (z *zSet) ZScore(_ context.Context, request *gzset.ZScoreRequest) (*gzset.ZScoreResponse, error) { + var ( + err error + score int + ) + + if score, err = z.dbs.ZScore(request.Key, request.Member); err != nil { + return &gzset.ZScoreResponse{}, err + } + + return &gzset.ZScoreResponse{Score: int32(score), Exists: true}, nil +} + +func (z *zSet) ZRank(_ context.Context, request *gzset.ZRankRequest) (*gzset.ZRankResponse, error) { + var ( + err error + rank int + ) + + if rank, err = z.dbs.ZRank(request.Key, request.Member); err != nil { + return &gzset.ZRankResponse{}, err + } + + return &gzset.ZRankResponse{Rank: int32(rank), Exists: true}, nil +} + +func (z *zSet) ZRevRank(_ context.Context, request *gzset.ZRevRankRequest) (*gzset.ZRevRankResponse, error) { + var ( + err error + rank int + ) + + if rank, err = z.dbs.ZRevRank(request.Key, request.Member); err != nil { + return &gzset.ZRevRankResponse{}, err + } + + return &gzset.ZRevRankResponse{Rank: int32(rank), Exists: true}, nil +} + +func (z *zSet) structureZSetValue2GZSetValue(structureZSetValue structure.ZSetValue) (*gzset.ZSetValue, error) { + var ( + structureValueBytes []byte + err error + gzSetValue = &gzset.ZSetValue{} + value []byte + dec *encoding.MessagePackCodecDecoder + ) + + structureValueBytes, _ = structureZSetValue.MarshalBinary() + + dec = encoding.NewMessagePackDecoder(structureValueBytes) + + if err = dec.Decode(&gzSetValue.Member); err != nil { + return gzSetValue, err + } + + if err = dec.Decode(&gzSetValue.Score); err != nil { + return gzSetValue, err + } + + err = dec.Decode(&value) + gzSetValue.Value = &pbany.Any{Value: value} + return gzSetValue, err +} + +func (z *zSet) structureZSetValueList2GZSetValueList(structureZSetValueList []structure.ZSetValue) ([]*gzset. + ZSetValue, error) { + var ( + err error + responseMembers = make([]*gzset.ZSetValue, 0, len(structureZSetValueList)) + ) + + for _, member := range structureZSetValueList { + var ( + gzSetValue *gzset.ZSetValue + ) + if gzSetValue, err = z.structureZSetValue2GZSetValue(member); err != nil { + return make([]*gzset.ZSetValue, 0), err + } + responseMembers = append(responseMembers, gzSetValue) + } + + return responseMembers, nil +} + +func (z *zSet) ZRange(_ context.Context, request *gzset.ZRangeRequest) (*gzset.ZRangeResponse, error) { + var ( + err error + members []structure.ZSetValue + responseMembers []*gzset.ZSetValue + ) + + if members, err = z.dbs.ZRange(request.Key, int(request.Start), int(request.End)); err != nil { + return &gzset.ZRangeResponse{}, err + } + + if responseMembers, err = z.structureZSetValueList2GZSetValueList(members); err != nil { + return &gzset.ZRangeResponse{}, err + } + + return &gzset.ZRangeResponse{Members: responseMembers}, nil +} + +func (z *zSet) ZCount(_ context.Context, request *gzset.ZCountRequest) (*gzset.ZCountResponse, error) { + var ( + err error + count int + ) + + if count, err = z.dbs.ZCount(request.Key, int(request.Min), int(request.Max)); err != nil { + return &gzset.ZCountResponse{}, err + } + + return &gzset.ZCountResponse{Count: int32(count)}, nil +} + +func (z *zSet) ZRevRange(_ context.Context, request *gzset.ZRevRangeRequest) (*gzset.ZRevRangeResponse, error) { + var ( + err error + members []structure.ZSetValue + responseMembers []*gzset.ZSetValue + ) + + if members, err = z.dbs.ZRevRange(request.Key, int(request.StartRank), int(request.EndRank)); err != nil { + return &gzset.ZRevRangeResponse{}, err + } + + if responseMembers, err = z.structureZSetValueList2GZSetValueList(members); err != nil { + return &gzset.ZRevRangeResponse{}, err + } + + return &gzset.ZRevRangeResponse{Members: responseMembers}, nil +} + +func (z *zSet) ZCard(_ context.Context, request *gzset.ZCardRequest) (*gzset.ZCardResponse, error) { + var ( + err error + count int + ) + + if count, err = z.dbs.ZCard(request.Key); err != nil { + return &gzset.ZCardResponse{}, err + } + + return &gzset.ZCardResponse{Count: int32(count)}, nil +} + +func (z *zSet) ZIncrBy(_ context.Context, request *gzset.ZIncrByRequest) (*gzset.ZIncrByResponse, error) { + var ( + err error + newScore int + ) + + if err = z.dbs.ZIncrBy(request.Key, request.Member, int(request.IncBy)); err != nil { + return &gzset.ZIncrByResponse{}, err + } + + if newScore, err = z.dbs.ZScore(request.Key, request.Member); err != nil { + return &gzset.ZIncrByResponse{}, err + } + + if !z.checkScoreIntIsInt32(newScore) { + return &gzset.ZIncrByResponse{NewScore: 0, Exists: true}, errors.New("new score is not int32") + } + + return &gzset.ZIncrByResponse{NewScore: int32(newScore), Exists: true}, nil +}