Skip to content

Commit

Permalink
get events from XDCCache (#6608)
Browse files Browse the repository at this point in the history
## What changed?
Get events from XDCCache for sync state.

## Why?
To reduce db read.

## How did you test it?
unit tests.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
hai719 authored Oct 10, 2024
1 parent 96eda3a commit 447b827
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 27 deletions.
1 change: 1 addition & 0 deletions service/history/history_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func NewEngineWithShardContext(
shard,
workflowCache,
workflowConsistencyChecker,
eventBlobCache,
shard.GetLogger(),
)

Expand Down
69 changes: 62 additions & 7 deletions service/history/replication/sync_state_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type (
shardContext shard.Context
workflowCache wcache.Cache
workflowConsistencyChecker api.WorkflowConsistencyChecker
eventBlobCache persistence.XDCCache
logger log.Logger
}
lastUpdatedStateTransitionGetter interface {
Expand All @@ -97,12 +98,14 @@ func NewSyncStateRetriever(
shardContext shard.Context,
workflowCache wcache.Cache,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
eventBlobCache persistence.XDCCache,
logger log.Logger,
) *SyncStateRetrieverImpl {
return &SyncStateRetrieverImpl{
shardContext: shardContext,
workflowCache: workflowCache,
workflowConsistencyChecker: workflowConsistencyChecker,
eventBlobCache: eventBlobCache,
logger: logger,
}
}
Expand Down Expand Up @@ -235,7 +238,12 @@ func (s *SyncStateRetrieverImpl) getSyncStateResult(
versionedTransitionArtifact.NewRunInfo = newRunInfo
}

events, err := s.getSyncStateEvents(ctx, targetVersionHistories, sourceVersionHistories)
wfKey := definition.WorkflowKey{
NamespaceID: namespaceID,
WorkflowID: execution.WorkflowId,
RunID: execution.RunId,
}
events, err := s.getSyncStateEvents(ctx, wfKey, targetVersionHistories, sourceVersionHistories)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -296,7 +304,12 @@ func (s *SyncStateRetrieverImpl) getNewRunInfo(ctx context.Context, namespaceId
versionHistory = versionhistory.CopyVersionHistory(versionHistory)
releaseFunc(nil)
releaseFunc = nil
newRunEvents, err := s.getEventsBlob(ctx, versionHistory.BranchToken, common.FirstEventID, common.FirstEventID+1)
wfKey := definition.WorkflowKey{
NamespaceID: namespaceId.String(),
WorkflowID: execution.WorkflowId,
RunID: newRunId,
}
newRunEvents, err := s.getEventsBlob(ctx, wfKey, versionHistory, common.FirstEventID, common.FirstEventID+1, true)
switch err.(type) {
case nil:
case *serviceerror.NotFound:
Expand Down Expand Up @@ -376,9 +389,49 @@ func (s *SyncStateRetrieverImpl) getSnapshot(mutableState workflow.MutableState)
return mutableStateProto, nil
}

func (s *SyncStateRetrieverImpl) getEventsBlob(ctx context.Context, branchToken []byte, startEventId int64, endEventId int64) ([]*commonpb.DataBlob, error) {
func (s *SyncStateRetrieverImpl) getEventsBlob(
ctx context.Context,
workflowKey definition.WorkflowKey,
versionHistory *history.VersionHistory,
startEventId int64,
endEventId int64,
isNewRun bool,
) ([]*commonpb.DataBlob, error) {
var eventBlobs []*commonpb.DataBlob

if s.eventBlobCache != nil {
for {
eventVersion, err := versionhistory.GetVersionHistoryEventVersion(versionHistory, startEventId)
if err != nil {
return nil, err
}
xdcCacheValue, ok := s.eventBlobCache.Get(persistence.NewXDCCacheKey(workflowKey, startEventId, eventVersion))
if !ok {
break
}
left := endEventId - startEventId
if !isNewRun && int64(len(xdcCacheValue.EventBlobs)) >= left {
s.logger.Error(
fmt.Sprintf("xdc cached events are truncated, want [%d, %d), got [%d, %d) from cache",
startEventId, endEventId, startEventId, xdcCacheValue.NextEventID),
tag.FirstEventVersion(eventVersion),
tag.WorkflowNamespaceID(workflowKey.NamespaceID),
tag.WorkflowID(workflowKey.WorkflowID),
tag.WorkflowRunID(workflowKey.RunID),
)
eventBlobs = append(eventBlobs, xdcCacheValue.EventBlobs[:left]...)
return eventBlobs, nil
}
eventBlobs = append(eventBlobs, xdcCacheValue.EventBlobs...)
startEventId = xdcCacheValue.NextEventID
if startEventId >= endEventId {
return eventBlobs, nil
}
}
}

rawHistoryResponse, err := s.shardContext.GetExecutionManager().ReadRawHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
BranchToken: branchToken,
BranchToken: versionHistory.BranchToken,
MinEventID: startEventId,
MaxEventID: endEventId,
PageSize: defaultPageSize,
Expand All @@ -387,10 +440,12 @@ func (s *SyncStateRetrieverImpl) getEventsBlob(ctx context.Context, branchToken
if err != nil {
return nil, err
}
return rawHistoryResponse.HistoryEventBlobs, nil

eventBlobs = append(eventBlobs, rawHistoryResponse.HistoryEventBlobs...)
return eventBlobs, nil
}

func (s *SyncStateRetrieverImpl) getSyncStateEvents(ctx context.Context, targetVersionHistories [][]*history.VersionHistoryItem, sourceVersionHistories *history.VersionHistories) ([]*commonpb.DataBlob, error) {
func (s *SyncStateRetrieverImpl) getSyncStateEvents(ctx context.Context, workflowKey definition.WorkflowKey, targetVersionHistories [][]*history.VersionHistoryItem, sourceVersionHistories *history.VersionHistories) ([]*commonpb.DataBlob, error) {
if targetVersionHistories == nil {
// return nil, so target will retrieve the missing events from source
return nil, nil
Expand All @@ -412,7 +467,7 @@ func (s *SyncStateRetrieverImpl) getSyncStateEvents(ctx context.Context, targetV
}
startEventId := lcaItem.GetEventId() + 1

return s.getEventsBlob(ctx, sourceHistory.BranchToken, startEventId, sourceLastItem.GetEventId()+1)
return s.getEventsBlob(ctx, workflowKey, sourceHistory, startEventId, sourceLastItem.GetEventId()+1, false)
}

func isInfoUpdated(subStateMachine lastUpdatedStateTransitionGetter, versionedTransition *persistencepb.VersionedTransition) bool {
Expand Down
121 changes: 101 additions & 20 deletions service/history/replication/sync_state_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ package replication
import (
"context"
"testing"
"time"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.temporal.io/api/common/v1"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/api/history/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/locks"
"go.temporal.io/server/common/log"
Expand All @@ -55,15 +57,17 @@ type (
suite.Suite
*require.Assertions
workflowCache *wcache.MockCache
eventBlobCache persistence.XDCCache
logger log.Logger
mockShard *shard.ContextTest
controller *gomock.Controller
releaseFunc func(err error)
workflowContext *workflow.MockContext
newRunWorkflowContext *workflow.MockContext
namespaceID string
execution *common.WorkflowExecution
execution *commonpb.WorkflowExecution
newRunId string
workflowKey definition.WorkflowKey
syncStateRetriever *SyncStateRetrieverImpl
workflowConsistencyChecker *api.MockWorkflowConsistencyChecker
}
Expand Down Expand Up @@ -92,20 +96,29 @@ func (s *syncWorkflowStateSuite) SetupSuite() {
s.workflowCache = wcache.NewMockCache(s.controller)
s.logger = s.mockShard.GetLogger()
s.namespaceID = tests.NamespaceID.String()
s.execution = &common.WorkflowExecution{
s.execution = &commonpb.WorkflowExecution{
WorkflowId: uuid.New(),
RunId: uuid.New(),
}
s.newRunId = uuid.New()
s.workflowKey = definition.WorkflowKey{
NamespaceID: s.namespaceID,
WorkflowID: s.execution.WorkflowId,
RunID: s.execution.RunId,
}
s.workflowConsistencyChecker = api.NewMockWorkflowConsistencyChecker(s.controller)
s.syncStateRetriever = NewSyncStateRetriever(s.mockShard, s.workflowCache, s.workflowConsistencyChecker, s.logger)
}

func (s *syncWorkflowStateSuite) TearDownSuite() {
}

func (s *syncWorkflowStateSuite) SetupTest() {

s.eventBlobCache = persistence.NewEventsBlobCache(
1024*1024,
20*time.Second,
s.logger,
)
s.syncStateRetriever = NewSyncStateRetriever(s.mockShard, s.workflowCache, s.workflowConsistencyChecker, s.eventBlobCache, s.logger)
}

func (s *syncWorkflowStateSuite) TearDownTest() {
Expand Down Expand Up @@ -295,7 +308,7 @@ func (s *syncWorkflowStateSuite) TestGetNewRunInfo() {
NewExecutionRunId: s.newRunId,
}
mu.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes()
s.workflowCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), s.mockShard, namespace.ID(s.namespaceID), &common.WorkflowExecution{
s.workflowCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), s.mockShard, namespace.ID(s.namespaceID), &commonpb.WorkflowExecution{
WorkflowId: s.execution.WorkflowId,
RunId: s.newRunId,
}, locks.PriorityLow).Return(s.newRunWorkflowContext, s.releaseFunc, nil)
Expand All @@ -309,7 +322,7 @@ func (s *syncWorkflowStateSuite) TestGetNewRunInfo() {
ShardID: s.mockShard.GetShardID(),
PageSize: defaultPageSize,
}).Return(&persistence.ReadRawHistoryBranchResponse{
HistoryEventBlobs: []*common.DataBlob{
HistoryEventBlobs: []*commonpb.DataBlob{
{Data: []byte("event1")}},
}, nil)
newRunInfo, err := s.syncStateRetriever.getNewRunInfo(context.Background(), namespace.ID(s.namespaceID), s.execution, s.newRunId)
Expand Down Expand Up @@ -345,7 +358,7 @@ func (s *syncWorkflowStateSuite) TestGetNewRunInfo_NotFound() {
NewExecutionRunId: s.newRunId,
}
mu.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes()
s.workflowCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), s.mockShard, namespace.ID(s.namespaceID), &common.WorkflowExecution{
s.workflowCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), s.mockShard, namespace.ID(s.namespaceID), &commonpb.WorkflowExecution{
WorkflowId: s.execution.WorkflowId,
RunId: s.newRunId,
}, locks.PriorityLow).Return(s.newRunWorkflowContext, s.releaseFunc, nil)
Expand All @@ -357,6 +370,28 @@ func (s *syncWorkflowStateSuite) TestGetNewRunInfo_NotFound() {
s.Nil(newRunInfo)
}

func (s *syncWorkflowStateSuite) addXDCCache(minEventID int64, version int64, nextEventID int64, eventBlobs []*commonpb.DataBlob, versionHistoryItems []*history.VersionHistoryItem) {
s.eventBlobCache.Put(persistence.NewXDCCacheKey(
s.workflowKey,
minEventID,
version,
), persistence.NewXDCCacheValue(
nil,
versionHistoryItems,
eventBlobs,
nextEventID,
))
}

func (s *syncWorkflowStateSuite) getEventBlobs(firstEventID, nextEventID int64) []*commonpb.DataBlob {
eventBlob := &commonpb.DataBlob{Data: []byte("event1")}
eventBlobs := make([]*commonpb.DataBlob, nextEventID-firstEventID)
for i := 0; i < int(nextEventID-firstEventID); i++ {
eventBlobs[i] = eventBlob
}
return eventBlobs
}

func (s *syncWorkflowStateSuite) TestGetSyncStateEvents() {
targetVersionHistoriesItems := [][]*history.VersionHistoryItem{
{
Expand All @@ -367,33 +402,79 @@ func (s *syncWorkflowStateSuite) TestGetSyncStateEvents() {
{EventId: 10, Version: 10},
},
}
versionHistoryItems := []*history.VersionHistoryItem{
{EventId: 1, Version: 10},
{EventId: 30, Version: 13},
}
sourceVersionHistories := &history.VersionHistories{
CurrentVersionHistoryIndex: 0,
Histories: []*history.VersionHistory{
{
BranchToken: []byte("source branchToken1"),
Items: []*history.VersionHistoryItem{
{EventId: 1, Version: 10},
{EventId: 20, Version: 13},
},
Items: versionHistoryItems,
},
},
}

// get [19, 31) from DB
s.mockShard.Resource.ExecutionMgr.EXPECT().ReadRawHistoryBranch(gomock.Any(), &persistence.ReadHistoryBranchRequest{
BranchToken: sourceVersionHistories.Histories[0].GetBranchToken(),
MinEventID: 19,
MaxEventID: 21,
MaxEventID: 31,
ShardID: s.mockShard.GetShardID(),
PageSize: defaultPageSize,
}).Return(&persistence.ReadRawHistoryBranchResponse{
HistoryEventBlobs: []*common.DataBlob{
{Data: []byte("event1")}},
}, nil)
}).Return(&persistence.ReadRawHistoryBranchResponse{HistoryEventBlobs: s.getEventBlobs(19, 31)}, nil)

events, err := s.syncStateRetriever.getSyncStateEvents(context.Background(), targetVersionHistoriesItems, sourceVersionHistories)
events, err := s.syncStateRetriever.getSyncStateEvents(context.Background(), s.workflowKey, targetVersionHistoriesItems, sourceVersionHistories)
s.NoError(err)
s.Len(events, 31-19)

// get [19,21) from cache, [21, 31) from DB
s.mockShard.Resource.ExecutionMgr.EXPECT().ReadRawHistoryBranch(gomock.Any(), &persistence.ReadHistoryBranchRequest{
BranchToken: sourceVersionHistories.Histories[0].GetBranchToken(),
MinEventID: 21,
MaxEventID: 31,
ShardID: s.mockShard.GetShardID(),
PageSize: defaultPageSize,
}).Return(&persistence.ReadRawHistoryBranchResponse{HistoryEventBlobs: s.getEventBlobs(21, 31)}, nil)

s.addXDCCache(19, 13, 21, s.getEventBlobs(19, 21), versionHistoryItems)
events, err = s.syncStateRetriever.getSyncStateEvents(context.Background(), s.workflowKey, targetVersionHistoriesItems, sourceVersionHistories)
s.NoError(err)
s.Len(events, 31-19)

// get [19,31) from cache
s.addXDCCache(21, 13, 41, s.getEventBlobs(21, 41), versionHistoryItems)
events, err = s.syncStateRetriever.getSyncStateEvents(context.Background(), s.workflowKey, targetVersionHistoriesItems, sourceVersionHistories)
s.NoError(err)
s.Len(events, 31-19)
}

func (s *syncWorkflowStateSuite) TestGetEventsBlob_NewRun() {
versionHistory := &history.VersionHistory{
BranchToken: []byte("branchToken1"),
Items: []*history.VersionHistoryItem{
{EventId: 1, Version: 1},
},
}

// get [1,4) from DB
s.mockShard.Resource.ExecutionMgr.EXPECT().ReadRawHistoryBranch(gomock.Any(), &persistence.ReadHistoryBranchRequest{
BranchToken: versionHistory.BranchToken,
MinEventID: common.FirstEventID,
MaxEventID: common.FirstEventID + 1,
ShardID: s.mockShard.GetShardID(),
PageSize: defaultPageSize,
}).Return(&persistence.ReadRawHistoryBranchResponse{HistoryEventBlobs: s.getEventBlobs(1, 4)}, nil)
events, err := s.syncStateRetriever.getEventsBlob(context.Background(), s.workflowKey, versionHistory, common.FirstEventID, common.FirstEventID+1, true)
s.NoError(err)
s.Len(events, 4-1)

// get [1,4) from cache
s.addXDCCache(1, 1, 4, s.getEventBlobs(1, 4), versionHistory.Items)
events, err = s.syncStateRetriever.getEventsBlob(context.Background(), s.workflowKey, versionHistory, common.FirstEventID, common.FirstEventID+1, true)
s.NoError(err)
s.NotNil(events)
s.Len(events, 4-1)
}

func (s *syncWorkflowStateSuite) TestGetSyncStateEvents_EventsUpToDate_ReturnNothing() {
Expand All @@ -416,7 +497,7 @@ func (s *syncWorkflowStateSuite) TestGetSyncStateEvents_EventsUpToDate_ReturnNot
},
}

events, err := s.syncStateRetriever.getSyncStateEvents(context.Background(), targetVersionHistoriesItems, sourceVersionHistories)
events, err := s.syncStateRetriever.getSyncStateEvents(context.Background(), s.workflowKey, targetVersionHistoriesItems, sourceVersionHistories)

s.NoError(err)
s.Nil(events)
Expand Down

0 comments on commit 447b827

Please sign in to comment.