Skip to content

Commit

Permalink
Add a deployment concurrency limit field (#14929)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanluciano authored Aug 15, 2024
1 parent 58f163e commit e11b67b
Show file tree
Hide file tree
Showing 15 changed files with 237 additions and 2 deletions.
93 changes: 93 additions & 0 deletions docs/3.0rc/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -15457,6 +15457,19 @@
"title": "Schedules",
"description": "A list of schedules for the deployment."
},
"concurrency_limit": {
"anyOf": [
{
"type": "integer",
"minimum": 0.0
},
{
"type": "null"
}
],
"title": "Concurrency Limit",
"description": "The deployment's concurrency limit."
},
"enforce_parameter_schema": {
"type": "boolean",
"title": "Enforce Parameter Schema",
Expand Down Expand Up @@ -15693,13 +15706,68 @@
}
],
"description": "Filter criteria for `Deployment.work_queue_name`"
},
"concurrency_limit": {
"anyOf": [
{
"$ref": "#/components/schemas/DeploymentFilterConcurrencyLimit"
},
{
"type": "null"
}
],
"description": "Filter criteria for `Deployment.concurrency`"
}
},
"additionalProperties": false,
"type": "object",
"title": "DeploymentFilter",
"description": "Filter for deployments. Only deployments matching all criteria will be returned."
},
"DeploymentFilterConcurrencyLimit": {
"properties": {
"ge_": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"title": "Ge ",
"description": "Only include deployments with a concurrency limit greater than or equal to this value"
},
"le_": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"title": "Le ",
"description": "Only include deployments with a concurrency limit less than or equal to this value"
},
"is_null_": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"title": "Is Null ",
"description": "If true, only include deployments without a concurrency limit"
}
},
"additionalProperties": false,
"type": "object",
"title": "DeploymentFilterConcurrencyLimit",
"description": "Filter by `Deployment.concurrency_limit`."
},
"DeploymentFilterId": {
"properties": {
"any_": {
Expand Down Expand Up @@ -16130,6 +16198,18 @@
"title": "Schedules",
"description": "A list of schedules for the deployment."
},
"concurrency_limit": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"title": "Concurrency Limit",
"description": "The maximum number of flow runs that can be active at once."
},
"job_variables": {
"type": "object",
"title": "Job Variables",
Expand Down Expand Up @@ -16616,6 +16696,19 @@
"title": "Schedules",
"description": "A list of schedules for the deployment."
},
"concurrency_limit": {
"anyOf": [
{
"type": "integer",
"minimum": 0.0
},
{
"type": "null"
}
],
"title": "Concurrency Limit",
"description": "The deployment's concurrency limit."
},
"parameters": {
"anyOf": [
{
Expand Down
8 changes: 8 additions & 0 deletions src/prefect/client/schemas/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ def convert_to_strings(cls, values):
default_factory=list,
description="A list of schedules for the deployment.",
)
concurrency_limit: Optional[int] = Field(
default=None,
description="The concurrency limit for the deployment.",
)
enforce_parameter_schema: Optional[bool] = Field(
default=None,
description=(
Expand Down Expand Up @@ -229,6 +233,10 @@ def remove_old_fields(cls, values):
default=None,
description="A list of schedules for the deployment.",
)
concurrency_limit: Optional[int] = Field(
default=None,
description="The concurrency limit for the deployment.",
)
tags: List[str] = Field(default_factory=list)
work_queue_name: Optional[str] = Field(None)
work_pool_name: Optional[str] = Field(
Expand Down
20 changes: 20 additions & 0 deletions src/prefect/client/schemas/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,23 @@ class DeploymentFilterTags(PrefectBaseModel, OperatorMixin):
)


class DeploymentFilterConcurrencyLimit(PrefectBaseModel):
"""Filter by `Deployment.concurrency_limit`."""

ge_: Optional[int] = Field(
default=None,
description="Only include deployments with a concurrency limit greater than or equal to this value",
)
le_: Optional[int] = Field(
default=None,
description="Only include deployments with a concurrency limit less than or equal to this value",
)
is_null_: Optional[bool] = Field(
default=None,
description="If true, only include deployments without a concurrency limit",
)


class DeploymentFilter(PrefectBaseModel, OperatorMixin):
"""Filter for deployments. Only deployments matching all criteria will be returned."""

Expand All @@ -520,6 +537,9 @@ class DeploymentFilter(PrefectBaseModel, OperatorMixin):
work_queue_name: Optional[DeploymentFilterWorkQueueName] = Field(
default=None, description="Filter criteria for `Deployment.work_queue_name`"
)
concurrency_limit: Optional[DeploymentFilterConcurrencyLimit] = Field(
default=None, description="Filter criteria for `Deployment.concurrency_limit`"
)


class LogFilterName(PrefectBaseModel):
Expand Down
3 changes: 3 additions & 0 deletions src/prefect/client/schemas/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,9 @@ class Deployment(ObjectBaseModel):
paused: bool = Field(
default=False, description="Whether or not the deployment is paused."
)
concurrency_limit: Optional[int] = Field(
default=None, description="The concurrency limit for the deployment."
)
schedules: List[DeploymentSchedule] = Field(
default_factory=list, description="A list of schedules for the deployment."
)
Expand Down
3 changes: 3 additions & 0 deletions src/prefect/client/schemas/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ class DeploymentResponse(ObjectBaseModel):
flow_id: UUID = Field(
default=..., description="The flow id associated with the deployment."
)
concurrency_limit: Optional[int] = Field(
default=None, description="The concurrency limit for the deployment."
)
paused: bool = Field(
default=False, description="Whether or not the deployment is paused."
)
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/client/schemas/sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class DeploymentSort(AutoEnum):
UPDATED_DESC = AutoEnum.auto()
NAME_ASC = AutoEnum.auto()
NAME_DESC = AutoEnum.auto()
CONCURRENCY_LIMIT_ASC = AutoEnum.auto()
CONCURRENCY_LIMIT_DESC = AutoEnum.auto()


class ArtifactSort(AutoEnum):
Expand Down
4 changes: 4 additions & 0 deletions src/prefect/server/database/migrations/MIGRATION-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Each time a database migration is written, an entry is included here with:

This gives us a history of changes and will create merge conflicts if two migrations are made at once, flagging situations where a branch needs to be updated before merging.

# Add `concurrency_limit` to `Deployments`
SQLite: `f93e1439f022`
Postgres:`97429116795e`

# Add `events` and `event_resources` tables
SQLite: `824e9edafa60`
Postgres: `15768c2ec702`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""add_deployment_concurrency_limit
Revision ID: 97429116795e
Revises: 7495a5013e7e
Create Date: 2024-08-14 15:01:11.152219
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "97429116795e"
down_revision = "7495a5013e7e"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"deployment", sa.Column("concurrency_limit", sa.Integer(), nullable=True)
)


def downgrade():
op.drop_column("deployment", "concurrency_limit")
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""add_deployment_concurrency_limit
Revision ID: f93e1439f022
Revises: 354f1ede7e9f
Create Date: 2024-08-14 14:50:52.420436
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "f93e1439f022"
down_revision = "354f1ede7e9f"
branch_labels = None
depends_on = None


def upgrade():
with op.batch_alter_table("deployment", schema=None) as batch_op:
batch_op.add_column(sa.Column("concurrency_limit", sa.Integer(), nullable=True))


def downgrade():
with op.batch_alter_table("deployment", schema=None) as batch_op:
batch_op.drop_column("concurrency_limit")
3 changes: 3 additions & 0 deletions src/prefect/server/database/orm_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,9 @@ def job_variables(self):
order_by=sa.desc(sa.text("updated")),
)

concurrency_limit: Mapped[Union[int, None]] = mapped_column(
sa.Integer, default=None, nullable=True
)
tags: Mapped[List[str]] = mapped_column(
JSON, server_default="[]", default=list, nullable=False
)
Expand Down
6 changes: 6 additions & 0 deletions src/prefect/server/schemas/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ class DeploymentCreate(ActionBaseModel):
default_factory=list,
description="A list of schedules for the deployment.",
)
concurrency_limit: Optional[NonNegativeInteger] = Field(
default=None, description="The deployment's concurrency limit."
)
enforce_parameter_schema: bool = Field(
default=True,
description=(
Expand Down Expand Up @@ -264,6 +267,9 @@ def remove_old_fields(cls, values):
default_factory=list,
description="A list of schedules for the deployment.",
)
concurrency_limit: Optional[NonNegativeInteger] = Field(
default=None, description="The deployment's concurrency limit."
)
parameters: Optional[Dict[str, Any]] = Field(
default=None,
description="Parameters for flow runs scheduled by the deployment.",
Expand Down
3 changes: 3 additions & 0 deletions src/prefect/server/schemas/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,9 @@ class Deployment(ORMBaseModel):
schedules: List[DeploymentSchedule] = Field(
default_factory=list, description="A list of schedules for the deployment."
)
concurrency_limit: Optional[NonNegativeInteger] = Field(
default=None, description="The concurrency limit for the deployment."
)
job_variables: Dict[str, Any] = Field(
default_factory=dict,
description="Overrides to apply to flow run infrastructure at runtime.",
Expand Down
39 changes: 37 additions & 2 deletions src/prefect/server/schemas/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,38 @@ def _get_filter_list(self) -> List:
return filters


class DeploymentFilterConcurrencyLimit(PrefectFilterBaseModel):
"""Filter by `Deployment.concurrency_limit`."""

ge_: Optional[int] = Field(
default=None,
description="Only include deployments with a concurrency limit greater than or equal to this value",
)

le_: Optional[int] = Field(
default=None,
description="Only include deployments with a concurrency limit less than or equal to this value",
)
is_null_: Optional[bool] = Field(
default=None,
description="If true, only include deployments without a concurrency limit",
)

def _get_filter_list(self) -> List:
filters = []
if self.ge_ is not None:
filters.append(orm_models.Deployment.concurrency_limit >= self.ge_)
if self.le_ is not None:
filters.append(orm_models.Deployment.concurrency_limit <= self.le_)
if self.is_null_ is not None:
filters.append(
orm_models.Deployment.concurrency_limit.is_(None)
if self.is_null_
else orm_models.Deployment.concurrency_limit.is_not(None)
)
return filters


class DeploymentFilterTags(PrefectOperatorFilterBaseModel):
"""Filter by `Deployment.tags`."""

Expand Down Expand Up @@ -1047,10 +1079,12 @@ class DeploymentFilter(PrefectOperatorFilterBaseModel):
work_queue_name: Optional[DeploymentFilterWorkQueueName] = Field(
default=None, description="Filter criteria for `Deployment.work_queue_name`"
)
concurrency_limit: Optional[DeploymentFilterConcurrencyLimit] = Field(
default=None, description="Filter criteria for `Deployment.concurrency`"
)

def _get_filter_list(self) -> List:
filters = []

if self.id is not None:
filters.append(self.id.as_sql_filter())
if self.name is not None:
Expand All @@ -1063,7 +1097,8 @@ def _get_filter_list(self) -> List:
filters.append(self.tags.as_sql_filter())
if self.work_queue_name is not None:
filters.append(self.work_queue_name.as_sql_filter())

if self.concurrency_limit is not None:
filters.append(self.concurrency_limit.as_sql_filter())
return filters


Expand Down
4 changes: 4 additions & 0 deletions src/prefect/server/schemas/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,10 @@ class DeploymentResponse(ORMBaseModel):
schedules: List[schemas.core.DeploymentSchedule] = Field(
default_factory=list, description="A list of schedules for the deployment."
)
concurrency_limit: Optional[int] = Field(
default=None,
description="The maximum number of flow runs that can be active at once.",
)
job_variables: Dict[str, Any] = Field(
default_factory=dict,
description="Overrides to apply to the base infrastructure block at runtime.",
Expand Down
1 change: 1 addition & 0 deletions tests/server/models/test_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async def test_create_deployment_succeeds(self, session, flow):
assert deployment.flow_id == flow.id
assert deployment.parameters == {"foo": "bar"}
assert deployment.tags == ["foo", "bar"]
assert deployment.concurrency_limit is None

async def test_creating_a_deployment_with_existing_work_queue_is_ok(
self, session, flow, work_queue
Expand Down

0 comments on commit e11b67b

Please sign in to comment.