Skip to content

Commit

Permalink
Create ServerTasksSettings model and child model
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle committed Oct 22, 2024
1 parent c214587 commit eaec5a3
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 32 deletions.
27 changes: 0 additions & 27 deletions src/prefect/settings/models/root.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import warnings
from datetime import timedelta
from pathlib import Path
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -130,11 +129,6 @@ class Settings(PrefectBaseSettings):
description="The default limit applied to queries that can return multiple objects, such as `POST /flow_runs/filter`.",
)

api_task_cache_key_max_length: int = Field(
default=2000,
description="The maximum number of characters allowed for a task run cache key.",
)

###########################################################################
# Logging settings

Expand Down Expand Up @@ -218,12 +212,6 @@ class Settings(PrefectBaseSettings):
description="This value sets the default retry delay seconds for all tasks.",
)

task_run_tag_concurrency_slot_wait_seconds: int = Field(
default=30,
ge=0,
description="The number of seconds to wait before retrying when a task run cannot secure a concurrency slot from the server.",
)

sqlalchemy_pool_size: Optional[int] = Field(
default=None,
description="Controls connection pool size when using a PostgreSQL database with the Prefect API. If not set, the default SQLAlchemy pool size will be used.",
Expand Down Expand Up @@ -291,21 +279,6 @@ class Settings(PrefectBaseSettings):
description="Whether or not to delete failed task submissions from the database.",
)

task_scheduling_max_scheduled_queue_size: int = Field(
default=1000,
description="The maximum number of scheduled tasks to queue for submission.",
)

task_scheduling_max_retry_queue_size: int = Field(
default=100,
description="The maximum number of retries to queue for submission.",
)

task_scheduling_pending_task_timeout: timedelta = Field(
default=timedelta(0),
description="How long before a PENDING task are made available to another task worker.",
)

experimental_enable_schedule_concurrency: bool = Field(
default=False,
description="Whether or not to enable concurrency for scheduled tasks.",
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/settings/models/server/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .events import ServerEventsSettings
from .flow_run_graph import ServerFlowRunGraphSettings
from .services import ServerServicesSettings
from .tasks import ServerTasksSettings


class ServerSettings(PrefectBaseSettings):
Expand Down Expand Up @@ -127,3 +128,7 @@ class ServerSettings(PrefectBaseSettings):
default_factory=ServerServicesSettings,
description="Settings for controlling server services behavior",
)
tasks: ServerTasksSettings = Field(
default_factory=ServerTasksSettings,
description="Settings for controlling server tasks behavior",
)
86 changes: 86 additions & 0 deletions src/prefect/settings/models/server/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from datetime import timedelta

from pydantic import AliasChoices, AliasPath, Field
from pydantic_settings import SettingsConfigDict

from prefect.settings.base import PrefectBaseSettings


class ServerTasksSchedulingSettings(PrefectBaseSettings):
"""
Settings for controlling server-side behavior related to task scheduling
"""

model_config = SettingsConfigDict(
env_file=".env",
env_prefix="PREFECT_SERVER_TASKS_SCHEDULING_",
extra="ignore",
)

max_scheduled_queue_size: int = Field(
default=1000,
description="The maximum number of scheduled tasks to queue for submission.",
validation_alias=AliasChoices(
AliasPath("max_scheduled_queue_size"),
"prefect_server_tasks_scheduling_max_scheduled_queue_size",
"prefect_task_scheduling_max_scheduled_queue_size",
),
)

max_retry_queue_size: int = Field(
default=100,
description="The maximum number of retries to queue for submission.",
validation_alias=AliasChoices(
AliasPath("max_retry_queue_size"),
"prefect_server_tasks_scheduling_max_retry_queue_size",
"prefect_task_scheduling_max_retry_queue_size",
),
)

pending_task_timeout: timedelta = Field(
default=timedelta(0),
description="How long before a PENDING task are made available to another task worker.",
validation_alias=AliasChoices(
AliasPath("pending_task_timeout"),
"prefect_server_tasks_scheduling_pending_task_timeout",
"prefect_task_scheduling_pending_task_timeout",
),
)


class ServerTasksSettings(PrefectBaseSettings):
"""
Settings for controlling server-side behavior related to tasks
"""

model_config = SettingsConfigDict(
env_file=".env",
env_prefix="PREFECT_SERVER_TASKS_",
extra="ignore",
)

tag_concurrency_slot_wait_seconds: float = Field(
default=30,
ge=0,
description="The number of seconds to wait before retrying when a task run cannot secure a concurrency slot from the server.",
validation_alias=AliasChoices(
AliasPath("tag_concurrency_slot_wait_seconds"),
"prefect_server_tasks_tag_concurrency_slot_wait_seconds",
"prefect_task_run_tag_concurrency_slot_wait_seconds",
),
)

max_cache_key_length: int = Field(
default=2000,
description="The maximum number of characters allowed for a task run cache key.",
validation_alias=AliasChoices(
AliasPath("max_cache_key_length"),
"prefect_server_tasks_max_cache_key_length",
"prefect_api_task_cache_key_max_length",
),
)

scheduling: ServerTasksSchedulingSettings = Field(
default_factory=ServerTasksSchedulingSettings,
description="Settings for controlling server-side behavior related to task scheduling",
)
26 changes: 21 additions & 5 deletions tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@
},
"PREFECT_API_SERVICES_TRIGGERS_ENABLED": {"test_value": True, "legacy": True},
"PREFECT_API_SSL_CERT_FILE": {"test_value": "/path/to/cert"},
"PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH": {"test_value": 10},
"PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH": {"test_value": 10, "legacy": True},
"PREFECT_API_TLS_INSECURE_SKIP_VERIFY": {"test_value": True},
"PREFECT_API_URL": {"test_value": "https://api.prefect.io"},
"PREFECT_ASYNC_FETCH_STATE_RESULT": {"test_value": True},
Expand Down Expand Up @@ -361,19 +361,35 @@
},
"PREFECT_SERVER_SERVICES_TASK_RUN_RECORDER_ENABLED": {"test_value": True},
"PREFECT_SERVER_SERVICES_TRIGGERS_ENABLED": {"test_value": True},
"PREFECT_SERVER_TASKS_MAX_CACHE_KEY_LENGTH": {"test_value": 10},
"PREFECT_SERVER_TASKS_SCHEDULING_MAX_RETRY_QUEUE_SIZE": {"test_value": 10},
"PREFECT_SERVER_TASKS_SCHEDULING_MAX_SCHEDULED_QUEUE_SIZE": {"test_value": 10},
"PREFECT_SERVER_TASKS_SCHEDULING_PENDING_TASK_TIMEOUT": {
"test_value": timedelta(seconds=10),
},
"PREFECT_SERVER_TASKS_TAG_CONCURRENCY_SLOT_WAIT_SECONDS": {
"test_value": 10.0,
},
"PREFECT_SILENCE_API_URL_MISCONFIGURATION": {"test_value": True},
"PREFECT_SQLALCHEMY_MAX_OVERFLOW": {"test_value": 10},
"PREFECT_SQLALCHEMY_POOL_SIZE": {"test_value": 10},
"PREFECT_TASKS_REFRESH_CACHE": {"test_value": True},
"PREFECT_TASK_DEFAULT_RETRIES": {"test_value": 10},
"PREFECT_TASK_DEFAULT_RETRY_DELAY_SECONDS": {"test_value": 10},
"PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS": {"test_value": 10},
"PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS": {
"test_value": 10,
"legacy": True,
},
"PREFECT_TASK_SCHEDULING_DEFAULT_STORAGE_BLOCK": {"test_value": "block"},
"PREFECT_TASK_SCHEDULING_DELETE_FAILED_SUBMISSIONS": {"test_value": True},
"PREFECT_TASK_SCHEDULING_MAX_RETRY_QUEUE_SIZE": {"test_value": 10},
"PREFECT_TASK_SCHEDULING_MAX_SCHEDULED_QUEUE_SIZE": {"test_value": 10},
"PREFECT_TASK_SCHEDULING_MAX_RETRY_QUEUE_SIZE": {"test_value": 10, "legacy": True},
"PREFECT_TASK_SCHEDULING_MAX_SCHEDULED_QUEUE_SIZE": {
"test_value": 10,
"legacy": True,
},
"PREFECT_TASK_SCHEDULING_PENDING_TASK_TIMEOUT": {
"test_value": timedelta(seconds=10)
"test_value": timedelta(seconds=10),
"legacy": True,
},
"PREFECT_TEST_MODE": {"test_value": True},
"PREFECT_TEST_SETTING": {"test_value": "bar"},
Expand Down

0 comments on commit eaec5a3

Please sign in to comment.