Skip to content

Commit

Permalink
enable client side task run orchestration by default (#14913)
Browse files Browse the repository at this point in the history
  • Loading branch information
jakekaplan authored Aug 15, 2024
1 parent de310ec commit 9b44ef1
Show file tree
Hide file tree
Showing 17 changed files with 505 additions and 1,312 deletions.
5 changes: 0 additions & 5 deletions docs/3.0rc/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -22297,11 +22297,6 @@
"title": "Prefect Api Max Flow Run Graph Artifacts",
"default": 10000
},
"PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION": {
"type": "boolean",
"title": "Prefect Experimental Enable Client Side Task Orchestration",
"default": false
},
"PREFECT_RUNNER_PROCESS_LIMIT": {
"type": "integer",
"title": "Prefect Runner Process Limit",
Expand Down
5 changes: 1 addition & 4 deletions src/prefect/server/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,10 +570,7 @@ async def start_services():
if prefect.settings.PREFECT_API_EVENTS_STREAM_OUT_ENABLED:
service_instances.append(stream.Distributor())

if (
prefect.settings.PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION
and prefect.settings.PREFECT_API_SERVICES_TASK_RUN_RECORDER_ENABLED
):
if prefect.settings.PREFECT_API_SERVICES_TASK_RUN_RECORDER_ENABLED:
service_instances.append(TaskRunRecorder())

loop = asyncio.get_running_loop()
Expand Down
8 changes: 0 additions & 8 deletions src/prefect/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1379,14 +1379,6 @@ def default_cloud_ui_url(settings, value):
The maximum number of artifacts to show on a flow run graph on the v2 API
"""


PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION = Setting(
bool, default=False
)
"""
Whether or not to enable experimental client side task run orchestration.
"""

# Prefect Events feature flags

PREFECT_RUNNER_PROCESS_LIMIT = Setting(int, default=5)
Expand Down
293 changes: 92 additions & 201 deletions src/prefect/task_engine.py

Large diffs are not rendered by default.

46 changes: 7 additions & 39 deletions src/prefect/task_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,17 @@
from prefect.client.orchestration import get_client
from prefect.client.schemas.objects import TaskRun
from prefect.client.subscriptions import Subscription
from prefect.exceptions import Abort, PrefectHTTPStatusError
from prefect.logging.loggers import get_logger
from prefect.results import ResultFactory
from prefect.settings import (
PREFECT_API_URL,
PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION,
PREFECT_TASK_SCHEDULING_DELETE_FAILED_SUBMISSIONS,
)
from prefect.states import Pending
from prefect.task_engine import run_task_async, run_task_sync
from prefect.utilities.annotations import NotSet
from prefect.utilities.asyncutils import asyncnullcontext, sync_compatible
from prefect.utilities.engine import emit_task_run_state_change_event, propose_state
from prefect.utilities.engine import emit_task_run_state_change_event
from prefect.utilities.processutils import _register_signal
from prefect.utilities.services import start_client_metrics_server
from prefect.utilities.urls import url_for
Expand Down Expand Up @@ -294,42 +292,12 @@ async def _submit_scheduled_task_run(self, task_run: TaskRun):
return

initial_state = task_run.state

if PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION:
new_state = Pending()
new_state.state_details.deferred = True
new_state.state_details.task_run_id = task_run.id
new_state.state_details.flow_run_id = task_run.flow_run_id
state = new_state
task_run.state = state
else:
try:
new_state = Pending()
new_state.state_details.deferred = True
state = await propose_state(
client=get_client(), # TODO prove that we cannot use self._client here
state=new_state,
task_run_id=task_run.id,
)
except Abort as exc:
logger.exception(
f"Failed to submit task run {task_run.id!r} to engine", exc_info=exc
)
return
except PrefectHTTPStatusError as exc:
if exc.response.status_code == 404:
logger.warning(
f"Task run {task_run.id!r} not found. It may have been deleted."
)
return
raise

if not state.is_pending():
logger.warning(
f"Cancelling submission of task run {task_run.id!r} -"
f" server returned a non-pending state {state.type.value!r}."
)
return
new_state = Pending()
new_state.state_details.deferred = True
new_state.state_details.task_run_id = task_run.id
new_state.state_details.flow_run_id = task_run.flow_run_id
state = new_state
task_run.state = state

emit_task_run_state_change_event(
task_run=task_run,
Expand Down
7 changes: 0 additions & 7 deletions src/prefect/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
from prefect.logging.loggers import get_logger
from prefect.results import ResultFactory, ResultSerializer, ResultStorage
from prefect.settings import (
PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION,
PREFECT_TASK_DEFAULT_RETRIES,
PREFECT_TASK_DEFAULT_RETRY_DELAY_SECONDS,
)
Expand Down Expand Up @@ -815,12 +814,6 @@ async def create_local_run(
extra_task_inputs: Optional[Dict[str, Set[TaskRunInput]]] = None,
deferred: bool = False,
) -> TaskRun:
if not PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION:
raise RuntimeError(
"Cannot call `Task.create_local_run` unless "
"PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION is True"
)

from prefect.utilities.engine import (
_dynamic_key_for_task_run,
collect_task_run_inputs_sync,
Expand Down
5 changes: 1 addition & 4 deletions src/prefect/utilities/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
)
from prefect.results import BaseResult
from prefect.settings import (
PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION,
PREFECT_LOGGING_LOG_PRINTS,
)
from prefect.states import (
Expand Down Expand Up @@ -806,9 +805,7 @@ def emit_task_run_state_change_event(
else ""
),
"prefect.state-type": str(validated_state.type.value),
"prefect.orchestration": "client"
if PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION
else "server",
"prefect.orchestration": "client",
},
follows=follows,
)
Expand Down
14 changes: 0 additions & 14 deletions tests/deployment/test_flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@
from prefect import flow
from prefect.context import FlowRunContext
from prefect.deployments import run_deployment
from prefect.events.worker import EventsWorker
from prefect.server.schemas.core import TaskRunResult
from prefect.settings import (
PREFECT_API_URL,
PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION,
temporary_settings,
)
from prefect.tasks import task
from prefect.utilities.slugify import slugify
Expand All @@ -25,17 +22,6 @@
from prefect.client.orchestration import PrefectClient


@pytest.fixture(autouse=True, params=[False, True])
def enable_client_side_task_run_orchestration(
request, asserting_events_worker: EventsWorker
):
enabled = request.param
with temporary_settings(
{PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION: enabled}
):
yield enabled


class TestRunDeployment:
@pytest.fixture
async def test_deployment(self, prefect_client):
Expand Down
Loading

0 comments on commit 9b44ef1

Please sign in to comment.