Skip to content

Commit

Permalink
Merge branch 'main' into ppv/dlqCount
Browse files Browse the repository at this point in the history
  • Loading branch information
prathyushpv authored Oct 11, 2024
2 parents 7513378 + c20f6a1 commit ec24641
Show file tree
Hide file tree
Showing 16 changed files with 3,888 additions and 3,111 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ jobs:
run: make functional-test-ndc-coverage

test-status:
if: ${{ inputs.run_single_functional_test != true && inputs.run_single_unit_test != true }}
if: always()
name: Test Status
needs:
- unit-test
Expand Down
37 changes: 37 additions & 0 deletions api/historyservice/v1/request_response.go-helpers.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6,087 changes: 3,189 additions & 2,898 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

84 changes: 4 additions & 80 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ package history
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"

"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -64,11 +64,6 @@ type clientImpl struct {
redirector redirector
timeout time.Duration
tokenSerializer common.TaskTokenSerializer
// shardIndex is incremented every time a shard-agnostic API is invoked. It is used to load balance requests
// across hosts by picking an essentially random host. We use an index here so that we don't need to inject any
// random number generator in order to make tests deterministic. We use a uint instead of an int because we
// don't want this to become negative if we ever overflow.
shardIndex atomic.Uint32
}

// NewClient creates a new history service gRPC client
Expand Down Expand Up @@ -269,81 +264,10 @@ func (c *clientImpl) StreamWorkflowReplicationMessages(
return streamClient, nil
}

// GetDLQTasks doesn't need redirects or routing because DLQ tasks are not sharded, so it just picks any available host
// in the connection pool (or creates one) and forwards the request to it.
func (c *clientImpl) GetDLQTasks(
ctx context.Context,
in *historyservice.GetDLQTasksRequest,
opts ...grpc.CallOption,
) (*historyservice.GetDLQTasksResponse, error) {
historyClient, err := c.getAnyClient("GetDLQTasks")
if err != nil {
return nil, err
}
return historyClient.GetDLQTasks(ctx, in, opts...)
}

func (c *clientImpl) DeleteDLQTasks(
ctx context.Context,
in *historyservice.DeleteDLQTasksRequest,
opts ...grpc.CallOption,
) (*historyservice.DeleteDLQTasksResponse, error) {
historyClient, err := c.getAnyClient("DeleteDLQTasks")
if err != nil {
return nil, err
}
return historyClient.DeleteDLQTasks(ctx, in, opts...)
}

func (c *clientImpl) ListQueues(
ctx context.Context,
in *historyservice.ListQueuesRequest,
opts ...grpc.CallOption,
) (*historyservice.ListQueuesResponse, error) {
historyClient, err := c.getAnyClient("ListQueues")
if err != nil {
return nil, err
}
return historyClient.ListQueues(ctx, in, opts...)
}

func (c *clientImpl) ListTasks(
ctx context.Context,
in *historyservice.ListTasksRequest,
opts ...grpc.CallOption,
) (*historyservice.ListTasksResponse, error) {
// Depth of the shardId field is 2 which is not supported by the genrpcwrapper generator.
// Simply changing the maxDepth for ShardId field in the genrpcwrapper generator will
// cause the generation logic for other methods to find more than one routing fields.

shardID := in.Request.GetShardId()
var response *historyservice.ListTasksResponse
op := func(ctx context.Context, client historyservice.HistoryServiceClient) error {
var err error
ctx, cancel := c.createContext(ctx)
defer cancel()
response, err = client.ListTasks(ctx, in, opts...)
return err
}
if err := c.executeWithRedirect(ctx, shardID, op); err != nil {
return nil, err
}
return response, nil
}

// getAnyClient returns an arbitrary client by looking up a client by a sequentially increasing shard ID. This is useful
// for history APIs that are shard-agnostic (e.g. namespace or DLQ v2 APIs).
func (c *clientImpl) getAnyClient(apiName string) (historyservice.HistoryServiceClient, error) {
// Subtract 1 so that the first index is 0 because Add returns the new value.
shardIndex := c.shardIndex.Add(1) - 1
// getRandomShard returns a random shard ID for history APIs that are shard-agnostic (e.g. namespace or DLQ v2 APIs).
func (c *clientImpl) getRandomShard() int32 {
// Add 1 at the end because shard IDs are 1-indexed.
shardID := shardIndex%uint32(c.numberOfShards) + 1
client, err := c.redirector.clientForShardID(int32(shardID))
if err != nil {
msg := fmt.Sprintf("can't find history host to serve API: %q, err: %v", apiName, err)
return nil, serviceerror.NewUnavailable(msg)
}
return client, nil
return int32(rand.Intn(int(c.numberOfShards)) + 1)
}

func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) {
Expand Down
100 changes: 90 additions & 10 deletions client/history/client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ec24641

Please sign in to comment.