Skip to content

Commit

Permalink
Added logging to task queue user data propagation (#6456)
Browse files Browse the repository at this point in the history
## What changed?
Add more logging around task queue user data propagation RPCs.
Also add a unit test that runs through the basic propagation logic.

## Why?
Add visibility to the process to help debug unusual behavior.

## How did you test it?
Added a unit test.
  • Loading branch information
dnr authored Sep 24, 2024
1 parent d34367b commit 8ab9531
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 55 deletions.
4 changes: 4 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,10 @@ func BuildId(buildId string) ZapTag {
return NewStringTag("build-id", buildId)
}

func UserDataVersion(v int64) ZapTag {
return NewInt64("user-data-version", v)
}

func Cause(cause string) ZapTag {
return NewStringTag("cause", cause)
}
2 changes: 2 additions & 0 deletions service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ type (

GetUserDataLongPollTimeout dynamicconfig.DurationPropertyFn
GetUserDataMinWaitTime time.Duration
GetUserDataReturnBudget time.Duration

// taskWriter configuration
OutstandingTaskAppendsThreshold func() int
Expand Down Expand Up @@ -298,6 +299,7 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *
},
GetUserDataLongPollTimeout: config.GetUserDataLongPollTimeout,
GetUserDataMinWaitTime: 1 * time.Second,
GetUserDataReturnBudget: returnEmptyTaskTimeBudget,
OutstandingTaskAppendsThreshold: func() int {
return config.OutstandingTaskAppendsThreshold(ns.String(), taskQueueName, taskType)
},
Expand Down
42 changes: 1 addition & 41 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,51 +1461,11 @@ func (e *matchingEngineImpl) GetTaskQueueUserData(
if err != nil {
return nil, err
}
version := req.GetLastKnownUserDataVersion()
if version < 0 {
return nil, serviceerror.NewInvalidArgument("last_known_user_data_version must not be negative")
}

if req.WaitNewData {
var cancel context.CancelFunc
ctx, cancel = newChildContext(ctx, e.config.GetUserDataLongPollTimeout(), returnEmptyTaskTimeBudget)
defer cancel()
// mark alive so that it doesn't unload while a child partition is doing a long poll
pm.MarkAlive()
}

for {
resp := &matchingservice.GetTaskQueueUserDataResponse{}
userData, userDataChanged, err := pm.GetUserDataManager().GetUserData()
if errors.Is(err, errTaskQueueClosed) {
// If we're closing, return a success with no data, as if the request expired. We shouldn't
// close due to idleness (because of the MarkAlive above), so we're probably closing due to a
// change of ownership. The caller will retry and be redirected to the new owner.
return resp, nil
} else if err != nil {
return nil, err
}
if req.WaitNewData && userData.GetVersion() == version {
// long-poll: wait for data to change/appear
select {
case <-ctx.Done():
return resp, nil
case <-userDataChanged:
continue
}
}
if userData != nil {
if userData.Version > version {
resp.UserData = userData
} else if userData.Version < version {
// This is highly unlikely but may happen due to an edge case in during ownership transfer.
// We rely on client retries in this case to let the system eventually self-heal.
return nil, serviceerror.NewInvalidArgument(
"requested task queue user data for version greater than known version")
}
}
return resp, nil
}
return pm.GetUserDataManager().HandleGetUserDataRequest(ctx, req)
}

func (e *matchingEngineImpl) ApplyTaskQueueUserDataReplicationEvent(
Expand Down
6 changes: 6 additions & 0 deletions service/matching/task_queue_partition_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"

"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/api/matchingservicemock/v1"
"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/api/taskqueue/v1"
Expand Down Expand Up @@ -588,6 +590,10 @@ func (m *mockUserDataManager) UpdateUserData(_ context.Context, _ UserDataUpdate
return nil
}

func (m *mockUserDataManager) HandleGetUserDataRequest(ctx context.Context, req *matchingservice.GetTaskQueueUserDataRequest) (*matchingservice.GetTaskQueueUserDataResponse, error) {
panic("unused")
}

func (m *mockUserDataManager) updateVersioningData(data *persistence.VersioningData) {
m.Lock()
defer m.Unlock()
Expand Down
102 changes: 98 additions & 4 deletions service/matching/user_data_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/clock/hybrid_logical_clock"
"go.temporal.io/server/common/future"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -66,6 +67,8 @@ type (
// UpdateUserData updates user data for this task queue and replicates across clusters if necessary.
// Extra care should be taken to avoid mutating the existing data in the update function.
UpdateUserData(ctx context.Context, options UserDataUpdateOptions, updateFn UserDataUpdateFunc) error
// Handles the maybe-long-poll GetUserData RPC.
HandleGetUserDataRequest(ctx context.Context, req *matchingservice.GetTaskQueueUserDataRequest) (*matchingservice.GetTaskQueueUserDataResponse, error)
}

UserDataUpdateOptions struct {
Expand Down Expand Up @@ -276,6 +279,10 @@ func (m *userDataManagerImpl) fetchUserData(ctx context.Context) error {
WaitNewData: hasFetchedUserData,
})
if err != nil {
// don't log on context canceled, produces too much log spam at shutdown
if !common.IsContextCanceledErr(err) {
m.logger.Error("error fetching user data from parent", tag.Error(err))
}
var unimplErr *serviceerror.Unimplemented
if errors.As(err, &unimplErr) {
// This might happen during a deployment. The older version couldn't have had any user data,
Expand All @@ -292,6 +299,9 @@ func (m *userDataManagerImpl) fetchUserData(ctx context.Context) error {
// nil inner fields.
if res.GetUserData() != nil {
m.setUserDataForNonOwningPartition(res.GetUserData())
m.logNewUserData("fetched user data from parent", res.GetUserData())
} else {
m.logger.Debug("fetched user data from parent, no change")
}
hasFetchedUserData = true
m.setUserDataState(userDataEnabled, nil)
Expand Down Expand Up @@ -339,6 +349,7 @@ func (m *userDataManagerImpl) loadUserDataFromDB(ctx context.Context) error {
m.lock.Lock()
defer m.lock.Unlock()
m.setUserDataLocked(response.UserData)
m.logNewUserData("loaded user data from db", response.UserData)

return nil
}
Expand All @@ -349,6 +360,9 @@ func (m *userDataManagerImpl) UpdateUserData(ctx context.Context, options UserDa
if m.store == nil {
return errUserDataNoMutateNonRoot
}
if err := m.WaitUntilInitialized(ctx); err != nil {
return err
}
newData, shouldReplicate, err := m.updateUserData(ctx, updateFn, options.KnownVersion, options.TaskQueueLimitPerBuildId)
if err != nil {
return err
Expand Down Expand Up @@ -413,6 +427,7 @@ func (m *userDataManagerImpl) updateUserData(
}
updatedUserData, shouldReplicate, err := updateFn(preUpdateData)
if err != nil {
m.logger.Error("user data update function failed", tag.Error(err))
return nil, false, err
}

Expand Down Expand Up @@ -441,14 +456,86 @@ func (m *userDataManagerImpl) updateUserData(
BuildIdsAdded: added,
BuildIdsRemoved: removed,
})
var updatedVersionedData *persistencespb.VersionedTaskQueueUserData
if err == nil {
updatedVersionedData = &persistencespb.VersionedTaskQueueUserData{Version: preUpdateVersion + 1, Data: updatedUserData}
m.setUserDataLocked(updatedVersionedData)
if err != nil {
m.logger.Error("failed to push new user data to owning matching node for namespace", tag.Error(err))
return nil, false, err
}

updatedVersionedData := &persistencespb.VersionedTaskQueueUserData{Version: preUpdateVersion + 1, Data: updatedUserData}
m.logNewUserData("modified user data", updatedVersionedData)
m.setUserDataLocked(updatedVersionedData)

return updatedVersionedData, shouldReplicate, err
}

func (m *userDataManagerImpl) HandleGetUserDataRequest(
ctx context.Context,
req *matchingservice.GetTaskQueueUserDataRequest,
) (*matchingservice.GetTaskQueueUserDataResponse, error) {
version := req.GetLastKnownUserDataVersion()
if version < 0 {
return nil, serviceerror.NewInvalidArgument("last_known_user_data_version must not be negative")
}

if req.WaitNewData {
var cancel context.CancelFunc
ctx, cancel = newChildContext(ctx, m.config.GetUserDataLongPollTimeout(), m.config.GetUserDataReturnBudget)
defer cancel()
}

for {
resp := &matchingservice.GetTaskQueueUserDataResponse{}
userData, userDataChanged, err := m.GetUserData()
if errors.Is(err, errTaskQueueClosed) {
// If we're closing, return a success with no data, as if the request expired. We shouldn't
// close due to idleness (because of the MarkAlive above), so we're probably closing due to a
// change of ownership. The caller will retry and be redirected to the new owner.
m.logger.Debug("returning empty user data (closing)", tag.NewBoolTag("long-poll", req.WaitNewData))
return resp, nil
} else if err != nil {
return nil, err
}
if req.WaitNewData && userData.GetVersion() == version {
// long-poll: wait for data to change/appear
select {
case <-ctx.Done():
m.logger.Debug("returning empty user data (expired)",
tag.NewBoolTag("long-poll", req.WaitNewData),
tag.NewInt64("request-known-version", version),
tag.UserDataVersion(userData.GetVersion()),
)
return resp, nil
case <-userDataChanged:
m.logger.Debug("user data changed while blocked in long poll")
continue
}
}
if userData != nil {
if userData.Version > version {
resp.UserData = userData
m.logger.Info("returning user data",
tag.NewBoolTag("long-poll", req.WaitNewData),
tag.NewInt64("request-known-version", version),
tag.UserDataVersion(userData.Version),
)
} else if userData.Version < version {
// This is highly unlikely but may happen due to an edge case in during ownership transfer.
// We rely on client retries in this case to let the system eventually self-heal.
m.logger.Error("requested task queue user data for version greater than known version",
tag.NewInt64("request-known-version", version),
tag.UserDataVersion(userData.Version),
)
return nil, serviceerror.NewInvalidArgument(
"requested task queue user data for version greater than known version")
}
} else {
m.logger.Debug("returning empty user data (no data)", tag.NewBoolTag("long-poll", req.WaitNewData))
}
return resp, nil
}

}

func (m *userDataManagerImpl) setUserDataForNonOwningPartition(userData *persistencespb.VersionedTaskQueueUserData) {
m.lock.Lock()
defer m.lock.Unlock()
Expand All @@ -459,3 +546,10 @@ func (m *userDataManagerImpl) callerInfoContext(ctx context.Context) context.Con
ns, _ := m.namespaceRegistry.GetNamespaceName(namespace.ID(m.partition.NamespaceId()))
return headers.SetCallerInfo(ctx, headers.NewBackgroundCallerInfo(ns.String()))
}

func (m *userDataManagerImpl) logNewUserData(message string, data *persistencespb.VersionedTaskQueueUserData) {
m.logger.Info(message,
tag.UserDataVersion(data.GetVersion()),
tag.Timestamp(hybrid_logical_clock.UTC(data.GetData().GetClock())),
)
}
Loading

0 comments on commit 8ab9531

Please sign in to comment.