From e11b67b756eacb4a4dc780fe28af473068d09729 Mon Sep 17 00:00:00 2001 From: Jean Luciano Date: Thu, 15 Aug 2024 11:22:28 -0500 Subject: [PATCH] Add a deployment concurrency limit field (#14929) --- .../3.0rc/api-ref/rest-api/server/schema.json | 93 +++++++++++++++++++ src/prefect/client/schemas/actions.py | 8 ++ src/prefect/client/schemas/filters.py | 20 ++++ src/prefect/client/schemas/objects.py | 3 + src/prefect/client/schemas/responses.py | 3 + src/prefect/client/schemas/sorting.py | 2 + .../database/migrations/MIGRATION-NOTES.md | 4 + ...16795e_add_deployment_concurrency_limit.py | 25 +++++ ...39f022_add_deployment_concurrency_limit.py | 25 +++++ src/prefect/server/database/orm_models.py | 3 + src/prefect/server/schemas/actions.py | 6 ++ src/prefect/server/schemas/core.py | 3 + src/prefect/server/schemas/filters.py | 39 +++++++- src/prefect/server/schemas/responses.py | 4 + tests/server/models/test_deployments.py | 1 + 15 files changed, 237 insertions(+), 2 deletions(-) create mode 100644 src/prefect/server/database/migrations/versions/postgresql/2024_08_14_150111_97429116795e_add_deployment_concurrency_limit.py create mode 100644 src/prefect/server/database/migrations/versions/sqlite/2024_08_14_145052_f93e1439f022_add_deployment_concurrency_limit.py diff --git a/docs/3.0rc/api-ref/rest-api/server/schema.json b/docs/3.0rc/api-ref/rest-api/server/schema.json index 11b2f8167e87..0391b56c0f74 100644 --- a/docs/3.0rc/api-ref/rest-api/server/schema.json +++ b/docs/3.0rc/api-ref/rest-api/server/schema.json @@ -15457,6 +15457,19 @@ "title": "Schedules", "description": "A list of schedules for the deployment." }, + "concurrency_limit": { + "anyOf": [ + { + "type": "integer", + "minimum": 0.0 + }, + { + "type": "null" + } + ], + "title": "Concurrency Limit", + "description": "The deployment's concurrency limit." + }, "enforce_parameter_schema": { "type": "boolean", "title": "Enforce Parameter Schema", @@ -15693,6 +15706,17 @@ } ], "description": "Filter criteria for `Deployment.work_queue_name`" + }, + "concurrency_limit": { + "anyOf": [ + { + "$ref": "#/components/schemas/DeploymentFilterConcurrencyLimit" + }, + { + "type": "null" + } + ], + "description": "Filter criteria for `Deployment.concurrency`" } }, "additionalProperties": false, @@ -15700,6 +15724,50 @@ "title": "DeploymentFilter", "description": "Filter for deployments. Only deployments matching all criteria will be returned." }, + "DeploymentFilterConcurrencyLimit": { + "properties": { + "ge_": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "title": "Ge ", + "description": "Only include deployments with a concurrency limit greater than or equal to this value" + }, + "le_": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "title": "Le ", + "description": "Only include deployments with a concurrency limit less than or equal to this value" + }, + "is_null_": { + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "null" + } + ], + "title": "Is Null ", + "description": "If true, only include deployments without a concurrency limit" + } + }, + "additionalProperties": false, + "type": "object", + "title": "DeploymentFilterConcurrencyLimit", + "description": "Filter by `Deployment.concurrency_limit`." + }, "DeploymentFilterId": { "properties": { "any_": { @@ -16130,6 +16198,18 @@ "title": "Schedules", "description": "A list of schedules for the deployment." }, + "concurrency_limit": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "title": "Concurrency Limit", + "description": "The maximum number of flow runs that can be active at once." + }, "job_variables": { "type": "object", "title": "Job Variables", @@ -16616,6 +16696,19 @@ "title": "Schedules", "description": "A list of schedules for the deployment." }, + "concurrency_limit": { + "anyOf": [ + { + "type": "integer", + "minimum": 0.0 + }, + { + "type": "null" + } + ], + "title": "Concurrency Limit", + "description": "The deployment's concurrency limit." + }, "parameters": { "anyOf": [ { diff --git a/src/prefect/client/schemas/actions.py b/src/prefect/client/schemas/actions.py index 5a9b5a0f115d..6869270c390e 100644 --- a/src/prefect/client/schemas/actions.py +++ b/src/prefect/client/schemas/actions.py @@ -157,6 +157,10 @@ def convert_to_strings(cls, values): default_factory=list, description="A list of schedules for the deployment.", ) + concurrency_limit: Optional[int] = Field( + default=None, + description="The concurrency limit for the deployment.", + ) enforce_parameter_schema: Optional[bool] = Field( default=None, description=( @@ -229,6 +233,10 @@ def remove_old_fields(cls, values): default=None, description="A list of schedules for the deployment.", ) + concurrency_limit: Optional[int] = Field( + default=None, + description="The concurrency limit for the deployment.", + ) tags: List[str] = Field(default_factory=list) work_queue_name: Optional[str] = Field(None) work_pool_name: Optional[str] = Field( diff --git a/src/prefect/client/schemas/filters.py b/src/prefect/client/schemas/filters.py index 1023e2058591..b6cbaac85d1e 100644 --- a/src/prefect/client/schemas/filters.py +++ b/src/prefect/client/schemas/filters.py @@ -505,6 +505,23 @@ class DeploymentFilterTags(PrefectBaseModel, OperatorMixin): ) +class DeploymentFilterConcurrencyLimit(PrefectBaseModel): + """Filter by `Deployment.concurrency_limit`.""" + + ge_: Optional[int] = Field( + default=None, + description="Only include deployments with a concurrency limit greater than or equal to this value", + ) + le_: Optional[int] = Field( + default=None, + description="Only include deployments with a concurrency limit less than or equal to this value", + ) + is_null_: Optional[bool] = Field( + default=None, + description="If true, only include deployments without a concurrency limit", + ) + + class DeploymentFilter(PrefectBaseModel, OperatorMixin): """Filter for deployments. Only deployments matching all criteria will be returned.""" @@ -520,6 +537,9 @@ class DeploymentFilter(PrefectBaseModel, OperatorMixin): work_queue_name: Optional[DeploymentFilterWorkQueueName] = Field( default=None, description="Filter criteria for `Deployment.work_queue_name`" ) + concurrency_limit: Optional[DeploymentFilterConcurrencyLimit] = Field( + default=None, description="Filter criteria for `Deployment.concurrency_limit`" + ) class LogFilterName(PrefectBaseModel): diff --git a/src/prefect/client/schemas/objects.py b/src/prefect/client/schemas/objects.py index 19adb2242ef4..10eeba343e5c 100644 --- a/src/prefect/client/schemas/objects.py +++ b/src/prefect/client/schemas/objects.py @@ -996,6 +996,9 @@ class Deployment(ObjectBaseModel): paused: bool = Field( default=False, description="Whether or not the deployment is paused." ) + concurrency_limit: Optional[int] = Field( + default=None, description="The concurrency limit for the deployment." + ) schedules: List[DeploymentSchedule] = Field( default_factory=list, description="A list of schedules for the deployment." ) diff --git a/src/prefect/client/schemas/responses.py b/src/prefect/client/schemas/responses.py index 202ac1cefa1f..a2c5d2215f43 100644 --- a/src/prefect/client/schemas/responses.py +++ b/src/prefect/client/schemas/responses.py @@ -313,6 +313,9 @@ class DeploymentResponse(ObjectBaseModel): flow_id: UUID = Field( default=..., description="The flow id associated with the deployment." ) + concurrency_limit: Optional[int] = Field( + default=None, description="The concurrency limit for the deployment." + ) paused: bool = Field( default=False, description="Whether or not the deployment is paused." ) diff --git a/src/prefect/client/schemas/sorting.py b/src/prefect/client/schemas/sorting.py index 2348ff49e24a..87d680d34e96 100644 --- a/src/prefect/client/schemas/sorting.py +++ b/src/prefect/client/schemas/sorting.py @@ -59,6 +59,8 @@ class DeploymentSort(AutoEnum): UPDATED_DESC = AutoEnum.auto() NAME_ASC = AutoEnum.auto() NAME_DESC = AutoEnum.auto() + CONCURRENCY_LIMIT_ASC = AutoEnum.auto() + CONCURRENCY_LIMIT_DESC = AutoEnum.auto() class ArtifactSort(AutoEnum): diff --git a/src/prefect/server/database/migrations/MIGRATION-NOTES.md b/src/prefect/server/database/migrations/MIGRATION-NOTES.md index 2059b40cb798..7aa8221d533d 100644 --- a/src/prefect/server/database/migrations/MIGRATION-NOTES.md +++ b/src/prefect/server/database/migrations/MIGRATION-NOTES.md @@ -8,6 +8,10 @@ Each time a database migration is written, an entry is included here with: This gives us a history of changes and will create merge conflicts if two migrations are made at once, flagging situations where a branch needs to be updated before merging. +# Add `concurrency_limit` to `Deployments` +SQLite: `f93e1439f022` +Postgres:`97429116795e` + # Add `events` and `event_resources` tables SQLite: `824e9edafa60` Postgres: `15768c2ec702` diff --git a/src/prefect/server/database/migrations/versions/postgresql/2024_08_14_150111_97429116795e_add_deployment_concurrency_limit.py b/src/prefect/server/database/migrations/versions/postgresql/2024_08_14_150111_97429116795e_add_deployment_concurrency_limit.py new file mode 100644 index 000000000000..86984afb8563 --- /dev/null +++ b/src/prefect/server/database/migrations/versions/postgresql/2024_08_14_150111_97429116795e_add_deployment_concurrency_limit.py @@ -0,0 +1,25 @@ +"""add_deployment_concurrency_limit + +Revision ID: 97429116795e +Revises: 7495a5013e7e +Create Date: 2024-08-14 15:01:11.152219 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "97429116795e" +down_revision = "7495a5013e7e" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "deployment", sa.Column("concurrency_limit", sa.Integer(), nullable=True) + ) + + +def downgrade(): + op.drop_column("deployment", "concurrency_limit") diff --git a/src/prefect/server/database/migrations/versions/sqlite/2024_08_14_145052_f93e1439f022_add_deployment_concurrency_limit.py b/src/prefect/server/database/migrations/versions/sqlite/2024_08_14_145052_f93e1439f022_add_deployment_concurrency_limit.py new file mode 100644 index 000000000000..1a1aae909d94 --- /dev/null +++ b/src/prefect/server/database/migrations/versions/sqlite/2024_08_14_145052_f93e1439f022_add_deployment_concurrency_limit.py @@ -0,0 +1,25 @@ +"""add_deployment_concurrency_limit + +Revision ID: f93e1439f022 +Revises: 354f1ede7e9f +Create Date: 2024-08-14 14:50:52.420436 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "f93e1439f022" +down_revision = "354f1ede7e9f" +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table("deployment", schema=None) as batch_op: + batch_op.add_column(sa.Column("concurrency_limit", sa.Integer(), nullable=True)) + + +def downgrade(): + with op.batch_alter_table("deployment", schema=None) as batch_op: + batch_op.drop_column("concurrency_limit") diff --git a/src/prefect/server/database/orm_models.py b/src/prefect/server/database/orm_models.py index 21f8462af700..5f32613eae5c 100644 --- a/src/prefect/server/database/orm_models.py +++ b/src/prefect/server/database/orm_models.py @@ -875,6 +875,9 @@ def job_variables(self): order_by=sa.desc(sa.text("updated")), ) + concurrency_limit: Mapped[Union[int, None]] = mapped_column( + sa.Integer, default=None, nullable=True + ) tags: Mapped[List[str]] = mapped_column( JSON, server_default="[]", default=list, nullable=False ) diff --git a/src/prefect/server/schemas/actions.py b/src/prefect/server/schemas/actions.py index 741f777b4168..9ef0e06af9e6 100644 --- a/src/prefect/server/schemas/actions.py +++ b/src/prefect/server/schemas/actions.py @@ -169,6 +169,9 @@ class DeploymentCreate(ActionBaseModel): default_factory=list, description="A list of schedules for the deployment.", ) + concurrency_limit: Optional[NonNegativeInteger] = Field( + default=None, description="The deployment's concurrency limit." + ) enforce_parameter_schema: bool = Field( default=True, description=( @@ -264,6 +267,9 @@ def remove_old_fields(cls, values): default_factory=list, description="A list of schedules for the deployment.", ) + concurrency_limit: Optional[NonNegativeInteger] = Field( + default=None, description="The deployment's concurrency limit." + ) parameters: Optional[Dict[str, Any]] = Field( default=None, description="Parameters for flow runs scheduled by the deployment.", diff --git a/src/prefect/server/schemas/core.py b/src/prefect/server/schemas/core.py index a8ea11b3c1c1..eb3466713357 100644 --- a/src/prefect/server/schemas/core.py +++ b/src/prefect/server/schemas/core.py @@ -546,6 +546,9 @@ class Deployment(ORMBaseModel): schedules: List[DeploymentSchedule] = Field( default_factory=list, description="A list of schedules for the deployment." ) + concurrency_limit: Optional[NonNegativeInteger] = Field( + default=None, description="The concurrency limit for the deployment." + ) job_variables: Dict[str, Any] = Field( default_factory=dict, description="Overrides to apply to flow run infrastructure at runtime.", diff --git a/src/prefect/server/schemas/filters.py b/src/prefect/server/schemas/filters.py index 95037dd3bd41..7dd5d3c87ea9 100644 --- a/src/prefect/server/schemas/filters.py +++ b/src/prefect/server/schemas/filters.py @@ -996,6 +996,38 @@ def _get_filter_list(self) -> List: return filters +class DeploymentFilterConcurrencyLimit(PrefectFilterBaseModel): + """Filter by `Deployment.concurrency_limit`.""" + + ge_: Optional[int] = Field( + default=None, + description="Only include deployments with a concurrency limit greater than or equal to this value", + ) + + le_: Optional[int] = Field( + default=None, + description="Only include deployments with a concurrency limit less than or equal to this value", + ) + is_null_: Optional[bool] = Field( + default=None, + description="If true, only include deployments without a concurrency limit", + ) + + def _get_filter_list(self) -> List: + filters = [] + if self.ge_ is not None: + filters.append(orm_models.Deployment.concurrency_limit >= self.ge_) + if self.le_ is not None: + filters.append(orm_models.Deployment.concurrency_limit <= self.le_) + if self.is_null_ is not None: + filters.append( + orm_models.Deployment.concurrency_limit.is_(None) + if self.is_null_ + else orm_models.Deployment.concurrency_limit.is_not(None) + ) + return filters + + class DeploymentFilterTags(PrefectOperatorFilterBaseModel): """Filter by `Deployment.tags`.""" @@ -1047,10 +1079,12 @@ class DeploymentFilter(PrefectOperatorFilterBaseModel): work_queue_name: Optional[DeploymentFilterWorkQueueName] = Field( default=None, description="Filter criteria for `Deployment.work_queue_name`" ) + concurrency_limit: Optional[DeploymentFilterConcurrencyLimit] = Field( + default=None, description="Filter criteria for `Deployment.concurrency`" + ) def _get_filter_list(self) -> List: filters = [] - if self.id is not None: filters.append(self.id.as_sql_filter()) if self.name is not None: @@ -1063,7 +1097,8 @@ def _get_filter_list(self) -> List: filters.append(self.tags.as_sql_filter()) if self.work_queue_name is not None: filters.append(self.work_queue_name.as_sql_filter()) - + if self.concurrency_limit is not None: + filters.append(self.concurrency_limit.as_sql_filter()) return filters diff --git a/src/prefect/server/schemas/responses.py b/src/prefect/server/schemas/responses.py index d29b3fefbd36..33af1b3fad30 100644 --- a/src/prefect/server/schemas/responses.py +++ b/src/prefect/server/schemas/responses.py @@ -360,6 +360,10 @@ class DeploymentResponse(ORMBaseModel): schedules: List[schemas.core.DeploymentSchedule] = Field( default_factory=list, description="A list of schedules for the deployment." ) + concurrency_limit: Optional[int] = Field( + default=None, + description="The maximum number of flow runs that can be active at once.", + ) job_variables: Dict[str, Any] = Field( default_factory=dict, description="Overrides to apply to the base infrastructure block at runtime.", diff --git a/tests/server/models/test_deployments.py b/tests/server/models/test_deployments.py index 2b23c2ae1c65..ecefb61aff16 100644 --- a/tests/server/models/test_deployments.py +++ b/tests/server/models/test_deployments.py @@ -30,6 +30,7 @@ async def test_create_deployment_succeeds(self, session, flow): assert deployment.flow_id == flow.id assert deployment.parameters == {"foo": "bar"} assert deployment.tags == ["foo", "bar"] + assert deployment.concurrency_limit is None async def test_creating_a_deployment_with_existing_work_queue_is_ok( self, session, flow, work_queue