diff --git a/src/prefect/settings/models/root.py b/src/prefect/settings/models/root.py index bf2efe7aa376..116b9dbfa989 100644 --- a/src/prefect/settings/models/root.py +++ b/src/prefect/settings/models/root.py @@ -1,5 +1,4 @@ import warnings -from datetime import timedelta from pathlib import Path from typing import ( TYPE_CHECKING, @@ -130,11 +129,6 @@ class Settings(PrefectBaseSettings): description="The default limit applied to queries that can return multiple objects, such as `POST /flow_runs/filter`.", ) - api_task_cache_key_max_length: int = Field( - default=2000, - description="The maximum number of characters allowed for a task run cache key.", - ) - ########################################################################### # Logging settings @@ -218,12 +212,6 @@ class Settings(PrefectBaseSettings): description="This value sets the default retry delay seconds for all tasks.", ) - task_run_tag_concurrency_slot_wait_seconds: int = Field( - default=30, - ge=0, - description="The number of seconds to wait before retrying when a task run cannot secure a concurrency slot from the server.", - ) - sqlalchemy_pool_size: Optional[int] = Field( default=None, description="Controls connection pool size when using a PostgreSQL database with the Prefect API. If not set, the default SQLAlchemy pool size will be used.", @@ -291,21 +279,6 @@ class Settings(PrefectBaseSettings): description="Whether or not to delete failed task submissions from the database.", ) - task_scheduling_max_scheduled_queue_size: int = Field( - default=1000, - description="The maximum number of scheduled tasks to queue for submission.", - ) - - task_scheduling_max_retry_queue_size: int = Field( - default=100, - description="The maximum number of retries to queue for submission.", - ) - - task_scheduling_pending_task_timeout: timedelta = Field( - default=timedelta(0), - description="How long before a PENDING task are made available to another task worker.", - ) - experimental_enable_schedule_concurrency: bool = Field( default=False, description="Whether or not to enable concurrency for scheduled tasks.", diff --git a/src/prefect/settings/models/server/root.py b/src/prefect/settings/models/server/root.py index 2fc32c7ae878..dcb3d4d94b98 100644 --- a/src/prefect/settings/models/server/root.py +++ b/src/prefect/settings/models/server/root.py @@ -13,6 +13,7 @@ from .events import ServerEventsSettings from .flow_run_graph import ServerFlowRunGraphSettings from .services import ServerServicesSettings +from .tasks import ServerTasksSettings class ServerSettings(PrefectBaseSettings): @@ -127,3 +128,7 @@ class ServerSettings(PrefectBaseSettings): default_factory=ServerServicesSettings, description="Settings for controlling server services behavior", ) + tasks: ServerTasksSettings = Field( + default_factory=ServerTasksSettings, + description="Settings for controlling server tasks behavior", + ) diff --git a/src/prefect/settings/models/server/tasks.py b/src/prefect/settings/models/server/tasks.py new file mode 100644 index 000000000000..2b5821af7d86 --- /dev/null +++ b/src/prefect/settings/models/server/tasks.py @@ -0,0 +1,86 @@ +from datetime import timedelta + +from pydantic import AliasChoices, AliasPath, Field +from pydantic_settings import SettingsConfigDict + +from prefect.settings.base import PrefectBaseSettings + + +class ServerTasksSchedulingSettings(PrefectBaseSettings): + """ + Settings for controlling server-side behavior related to task scheduling + """ + + model_config = SettingsConfigDict( + env_file=".env", + env_prefix="PREFECT_SERVER_TASKS_SCHEDULING_", + extra="ignore", + ) + + max_scheduled_queue_size: int = Field( + default=1000, + description="The maximum number of scheduled tasks to queue for submission.", + validation_alias=AliasChoices( + AliasPath("max_scheduled_queue_size"), + "prefect_server_tasks_scheduling_max_scheduled_queue_size", + "prefect_task_scheduling_max_scheduled_queue_size", + ), + ) + + max_retry_queue_size: int = Field( + default=100, + description="The maximum number of retries to queue for submission.", + validation_alias=AliasChoices( + AliasPath("max_retry_queue_size"), + "prefect_server_tasks_scheduling_max_retry_queue_size", + "prefect_task_scheduling_max_retry_queue_size", + ), + ) + + pending_task_timeout: timedelta = Field( + default=timedelta(0), + description="How long before a PENDING task are made available to another task worker.", + validation_alias=AliasChoices( + AliasPath("pending_task_timeout"), + "prefect_server_tasks_scheduling_pending_task_timeout", + "prefect_task_scheduling_pending_task_timeout", + ), + ) + + +class ServerTasksSettings(PrefectBaseSettings): + """ + Settings for controlling server-side behavior related to tasks + """ + + model_config = SettingsConfigDict( + env_file=".env", + env_prefix="PREFECT_SERVER_TASKS_", + extra="ignore", + ) + + tag_concurrency_slot_wait_seconds: float = Field( + default=30, + ge=0, + description="The number of seconds to wait before retrying when a task run cannot secure a concurrency slot from the server.", + validation_alias=AliasChoices( + AliasPath("tag_concurrency_slot_wait_seconds"), + "prefect_server_tasks_tag_concurrency_slot_wait_seconds", + "prefect_task_run_tag_concurrency_slot_wait_seconds", + ), + ) + + max_cache_key_length: int = Field( + default=2000, + description="The maximum number of characters allowed for a task run cache key.", + validation_alias=AliasChoices( + AliasPath("max_cache_key_length"), + "prefect_server_tasks_max_cache_key_length", + "prefect_api_task_cache_key_max_length", + ), + ) + + scheduling: ServerTasksSchedulingSettings = Field( + default_factory=ServerTasksSchedulingSettings, + description="Settings for controlling server-side behavior related to task scheduling", + ) diff --git a/tests/test_settings.py b/tests/test_settings.py index 10100986d9a3..b06a601fd671 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -171,7 +171,7 @@ }, "PREFECT_API_SERVICES_TRIGGERS_ENABLED": {"test_value": True, "legacy": True}, "PREFECT_API_SSL_CERT_FILE": {"test_value": "/path/to/cert"}, - "PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH": {"test_value": 10}, + "PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH": {"test_value": 10, "legacy": True}, "PREFECT_API_TLS_INSECURE_SKIP_VERIFY": {"test_value": True}, "PREFECT_API_URL": {"test_value": "https://api.prefect.io"}, "PREFECT_ASYNC_FETCH_STATE_RESULT": {"test_value": True}, @@ -361,19 +361,35 @@ }, "PREFECT_SERVER_SERVICES_TASK_RUN_RECORDER_ENABLED": {"test_value": True}, "PREFECT_SERVER_SERVICES_TRIGGERS_ENABLED": {"test_value": True}, + "PREFECT_SERVER_TASKS_MAX_CACHE_KEY_LENGTH": {"test_value": 10}, + "PREFECT_SERVER_TASKS_SCHEDULING_MAX_RETRY_QUEUE_SIZE": {"test_value": 10}, + "PREFECT_SERVER_TASKS_SCHEDULING_MAX_SCHEDULED_QUEUE_SIZE": {"test_value": 10}, + "PREFECT_SERVER_TASKS_SCHEDULING_PENDING_TASK_TIMEOUT": { + "test_value": timedelta(seconds=10), + }, + "PREFECT_SERVER_TASKS_TAG_CONCURRENCY_SLOT_WAIT_SECONDS": { + "test_value": 10.0, + }, "PREFECT_SILENCE_API_URL_MISCONFIGURATION": {"test_value": True}, "PREFECT_SQLALCHEMY_MAX_OVERFLOW": {"test_value": 10}, "PREFECT_SQLALCHEMY_POOL_SIZE": {"test_value": 10}, "PREFECT_TASKS_REFRESH_CACHE": {"test_value": True}, "PREFECT_TASK_DEFAULT_RETRIES": {"test_value": 10}, "PREFECT_TASK_DEFAULT_RETRY_DELAY_SECONDS": {"test_value": 10}, - "PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS": {"test_value": 10}, + "PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS": { + "test_value": 10, + "legacy": True, + }, "PREFECT_TASK_SCHEDULING_DEFAULT_STORAGE_BLOCK": {"test_value": "block"}, "PREFECT_TASK_SCHEDULING_DELETE_FAILED_SUBMISSIONS": {"test_value": True}, - "PREFECT_TASK_SCHEDULING_MAX_RETRY_QUEUE_SIZE": {"test_value": 10}, - "PREFECT_TASK_SCHEDULING_MAX_SCHEDULED_QUEUE_SIZE": {"test_value": 10}, + "PREFECT_TASK_SCHEDULING_MAX_RETRY_QUEUE_SIZE": {"test_value": 10, "legacy": True}, + "PREFECT_TASK_SCHEDULING_MAX_SCHEDULED_QUEUE_SIZE": { + "test_value": 10, + "legacy": True, + }, "PREFECT_TASK_SCHEDULING_PENDING_TASK_TIMEOUT": { - "test_value": timedelta(seconds=10) + "test_value": timedelta(seconds=10), + "legacy": True, }, "PREFECT_TEST_MODE": {"test_value": True}, "PREFECT_TEST_SETTING": {"test_value": "bar"},