Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task polling data to task responses #447

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions openapi/openapiv2.json
Original file line number Diff line number Diff line change
Expand Up @@ -8941,6 +8941,10 @@
"retryPolicy": {
"$ref": "#/definitions/v1RetryPolicy",
"description": "This is the retry policy the service uses which may be different from the one provided\n(or not) during activity scheduling. The service can override the provided one if some\nvalues are not specified or exceed configured system limits."
},
"taskResponseData": {
"$ref": "#/definitions/v1TaskResponsePollingData",
"description": "See `TaskResponsePollingData`\nNOTE: Not yet implemented. Update this docstring when released with version."
}
}
},
Expand All @@ -8955,6 +8959,10 @@
"request": {
"$ref": "#/definitions/apinexusv1Request",
"description": "Embedded request as translated from the incoming frontend request."
},
"taskResponseData": {
"$ref": "#/definitions/v1TaskResponsePollingData",
"description": "See `TaskResponsePollingData`\nNOTE: Not yet implemented. Update this docstring when released with version."
}
}
},
Expand Down Expand Up @@ -9050,6 +9058,10 @@
"$ref": "#/definitions/v1Message"
},
"title": "Protocol messages piggybacking on a WFT as a transport"
},
"taskResponseData": {
"$ref": "#/definitions/v1TaskResponsePollingData",
"description": "See `TaskResponsePollingData`\nNOTE: Not yet implemented. Update this docstring when released with version."
}
}
},
Expand Down Expand Up @@ -10896,6 +10908,24 @@
"default": "TASK_REACHABILITY_UNSPECIFIED",
"description": "Specifies which category of tasks may reach a worker on a versioned task queue.\nUsed both in a reachability query and its response.\nDeprecated.\n\n - TASK_REACHABILITY_NEW_WORKFLOWS: There's a possiblity for a worker to receive new workflow tasks. Workers should *not* be retired.\n - TASK_REACHABILITY_EXISTING_WORKFLOWS: There's a possiblity for a worker to receive existing workflow and activity tasks from existing workflows. Workers\nshould *not* be retired.\nThis enum value does not distinguish between open and closed workflows.\n - TASK_REACHABILITY_OPEN_WORKFLOWS: There's a possiblity for a worker to receive existing workflow and activity tasks from open workflows. Workers\nshould *not* be retired.\n - TASK_REACHABILITY_CLOSED_WORKFLOWS: There's a possiblity for a worker to receive existing workflow tasks from closed workflows. Workers may be\nretired dependending on application requirements. For example, if there's no need to query closed workflows."
},
"v1TaskResponsePollingData": {
"type": "object",
"properties": {
"wasSyncMatch": {
"type": "boolean",
"description": "Set to true if the returned task was a result of a sync match."
},
"timePollerWaited": {
"type": "string",
"description": "Set to the amount of time a poll call waited for a task to be available. This value is allowed to be unset\nif the match was not sync. It is useful to help the SDK understand if it may reduce the number of pollers, since\nany substantial wait time here means the backlog is drained, and poll calls are idling waiting for tasks."
},
"stats": {
"$ref": "#/definitions/v1TaskQueueStats",
"description": "See `TaskQueueStats`. This field is only set if the poll hit the root partition, where the data is readily\navailable. SDKs should cache this data, and explicitly refresh it with a call to `DescribeTaskQueue` if some\nTTL has expired."
}
},
"description": "Data attached to successful poll responses (for any task type) which the SDK can use to optimize its polling\nstrategy."
},
"v1TerminateWorkflowExecutionResponse": {
"type": "object"
},
Expand Down
29 changes: 29 additions & 0 deletions openapi/openapiv3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6617,6 +6617,12 @@ components:
items:
$ref: '#/components/schemas/Message'
description: Protocol messages piggybacking on a WFT as a transport
taskResponseData:
allOf:
- $ref: '#/components/schemas/TaskResponsePollingData'
description: |-
See `TaskResponsePollingData`
NOTE: Not yet implemented. Update this docstring when released with version.
PollerInfo:
type: object
properties:
Expand Down Expand Up @@ -8304,6 +8310,29 @@ components:
who inherit the parent/previous workflow's Build ID but not its Task Queue. In those cases, make
sure to query reachability for the parent/previous workflow's Task Queue as well.
format: enum
TaskResponsePollingData:
type: object
properties:
wasSyncMatch:
type: boolean
description: Set to true if the returned task was a result of a sync match.
timePollerWaited:
pattern: ^-?(?:0|[1-9][0-9]{0,11})(?:\.[0-9]{1,9})?s$
type: string
description: |-
Set to the amount of time a poll call waited for a task to be available. This value is allowed to be unset
if the match was not sync. It is useful to help the SDK understand if it may reduce the number of pollers, since
any substantial wait time here means the backlog is drained, and poll calls are idling waiting for tasks.
stats:
allOf:
- $ref: '#/components/schemas/TaskQueueStats'
description: |-
See `TaskQueueStats`. This field is only set if the poll hit the root partition, where the data is readily
available. SDKs should cache this data, and explicitly refresh it with a call to `DescribeTaskQueue` if some
TTL has expired.
description: |-
Data attached to successful poll responses (for any task type) which the SDK can use to optimize its polling
strategy.
TerminateWorkflowExecutionRequest:
type: object
properties:
Expand Down
15 changes: 15 additions & 0 deletions temporal/api/taskqueue/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,19 @@ message TimestampedBuildIdAssignmentRule {
message TimestampedCompatibleBuildIdRedirectRule {
CompatibleBuildIdRedirectRule rule = 1;
google.protobuf.Timestamp create_time = 2;
}

// Data attached to successful poll responses (for any task type) which the SDK can use to optimize its polling
// strategy.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe to head off potential confusion, add a comment like "Don't assume the accuracy of this data for any purpose other than poller scaling."

message TaskResponsePollingData {
// Set to true if the returned task was a result of a sync match.
bool was_sync_match = 1;
// Set to the amount of time a poll call waited for a task to be available. This value is allowed to be unset
// if the match was not sync. It is useful to help the SDK understand if it may reduce the number of pollers, since
// any substantial wait time here means the backlog is drained, and poll calls are idling waiting for tasks.
google.protobuf.Duration time_poller_waited = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd call it poll_wait_time or even just wait_time

// See `TaskQueueStats`. This field is only set if the poll hit the root partition, where the data is readily
// available. SDKs should cache this data, and explicitly refresh it with a call to `DescribeTaskQueue` if some
// TTL has expired.
TaskQueueStats stats = 3;
Comment on lines +271 to +274
Copy link
Member

@cretz cretz Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, I wonder a bit about this. I wasn't expecting SDKs to call describe task queue ever, only users. Can I get clarity on when a worker may implicitly call describe task queue? Is there information in here to drive decisions that we need? If not, I wonder if we should remove this field and ask users that want this info to call describe themselves.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's going to give us this same stats information, which is used to drive poller scaling decisions. Users aren't going to interact with any of this directly. So it will periodically call it if, by chance, no pollers are hitting the root partition in the last unit of time, which should be relatively rare.

Copy link
Member

@cretz cretz Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, can the poller scaling decisions be done without changing workers to make new describe task queue calls? Or is it a strong requirement at this point? I saw the first version of this PR didn't need these stats.

Copy link
Member Author

@Sushisource Sushisource Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the concern here? Nothing is, strictly speaking, required - we could not do poller scaling at all, but the information is helpful in making those decisions. I already had some discussion with server folks about the load making this call would add and that's all fine. I would've liked it to be in the task response all the time, ideally, but that is more expensive than this option turns out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concern I think is adding the confusing results to the poll (we already got bit by this with backlog hint where it was confusing for everyone what it meant and when the data was accurate). Also there is the concern of adding logic burden on the workers to now start making additional, separate out of band calls beyond polling to support polling. I was hoping the poller scaling could work with the calls the worker already makes and if it needs more info, put the burden on the centralized part (i.e. server) to provide it as part of the calls already being made or acknowledge that we may need to work with limited information.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to talk about calling DescribeTaskQueue, or call it. If we're polling fairly frequently, then we'll hit the root and get the data. If we're not, then there's no tasks and no reason to scale pollers. Let's just say the SDK can make use of this field when it's provided, and do without it if it's missing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That workers won't be expected to call DescribeTaskQueue definitely alleviates one of my concerns. But it alludes to another - we don't know what we want workers to do yet it sounds like. Maybe we can even use this field's proto doc to define worker poller behavior. Or maybe, since it sounds like this is an optional field and poller scaling needs to work well without it, we can do poller scaling and come back and add this field and associated optimizations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been writing some pseudocode for the algorithm that's nearly complete. I'm quite sure I want this data, and delivering it in the field is cheap. It's likely to be present often enough to be useful. I don't really see the cost of having it here as problematic at all.

More specifically, it's very useful in knowing when to scale down pollers. Which fits perfectly with this bit that is mentioned earlier but David writes clearly:

If we're polling fairly frequently, then we'll hit the root and get the data. If we're not, then there's no tasks and no reason to scale pollers.

So we're going to get that data when we most need it.

Copy link
Member

@cretz cretz Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 So the docs make clear that it may not appear, but per what @dnr put a few messages up in this thread, can we remove the part about expecting SDKs to call describe task queue to get this info?

@ShahabT - can you expand on what you mean by aggregate-and-cache? How out of date might this value be compared to a regular describe call? Is there a design step needed to confirm how this aggregate-and-cache might work? Or any fears we may learn something during impl that would affect these protos?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the stats to reflect the whole task queue, and not only the root partition itself, we need to fan-out to all partitions and get their stats (via internal DescribeTaskQueuePartition API), aggregate all of them, and return the aggregated value. This is what Enhanced DescribeTaskQueue does.

Now, if we're to send such stats in the root partition's poll response, this fan-out and calling of DescribeTaskQueuePartition API should happen constantly (as opposed to now that it happens only when DescribeTaskQueue is called).

To manage the load, server would have to add caching in the root partition to limit the fan-out frequency (say to once per second).

It's not something that we cannot manage, but it is not free either.

}
9 changes: 9 additions & 0 deletions temporal/api/workflowservice/v1/request_response.proto
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ message PollWorkflowTaskQueueResponse {
map<string, temporal.api.query.v1.WorkflowQuery> queries = 14;
// Protocol messages piggybacking on a WFT as a transport
repeated temporal.api.protocol.v1.Message messages = 15;
// See `TaskResponsePollingData`
// NOTE: Not yet implemented. Update this docstring when released with version.
temporal.api.taskqueue.v1.TaskResponsePollingData task_response_data = 16;
}

message RespondWorkflowTaskCompletedRequest {
Expand Down Expand Up @@ -447,6 +450,9 @@ message PollActivityTaskQueueResponse {
// (or not) during activity scheduling. The service can override the provided one if some
// values are not specified or exceed configured system limits.
temporal.api.common.v1.RetryPolicy retry_policy = 17;
// See `TaskResponsePollingData`
// NOTE: Not yet implemented. Update this docstring when released with version.
temporal.api.taskqueue.v1.TaskResponsePollingData task_response_data = 18;
}

message RecordActivityTaskHeartbeatRequest {
Expand Down Expand Up @@ -1588,6 +1594,9 @@ message PollNexusTaskQueueResponse {
bytes task_token = 1;
// Embedded request as translated from the incoming frontend request.
temporal.api.nexus.v1.Request request = 2;
// See `TaskResponsePollingData`
// NOTE: Not yet implemented. Update this docstring when released with version.
temporal.api.taskqueue.v1.TaskResponsePollingData task_response_data = 3;
}

message RespondNexusTaskCompletedRequest {
Expand Down
Loading