Skip to content

Commit

Permalink
allow overwrite in create_work_pool (#14967)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Aug 15, 2024
1 parent ef7cb00 commit 4df27f9
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 16 deletions.
38 changes: 25 additions & 13 deletions src/prefect/cli/work_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,14 @@ async def create(
" for the given work pool type."
),
),
overwrite: bool = typer.Option(
False,
"--overwrite",
help=("Whether or not to overwrite an existing work pool with the same name."),
),
):
"""
Create a new work pool.
Create a new work pool or update an existing one.
\b
Examples:
Expand All @@ -114,22 +119,25 @@ async def create(
Create a Docker work pool with a custom base job template:
\b
$ prefect work-pool create "my-pool" --type docker --base-job-template ./base-job-template.json
\b
Update an existing work pool:
\b
$ prefect work-pool create "existing-pool" --base-job-template ./base-job-template.json --overwrite
"""
if not name.lower().strip("'\" "):
exit_with_error("Work pool name cannot be empty.")
async with get_client() as client:
try:
await client.read_work_pool(work_pool_name=name)
existing_pool = await client.read_work_pool(work_pool_name=name)
if not overwrite:
exit_with_error(
f"Work pool named {name!r} already exists. Use --overwrite to update it."
)
except ObjectNotFound:
pass
else:
exit_with_error(
f"Work pool named {name!r} already exists. Please try creating your"
" work pool again with a different name."
)
existing_pool = None

if type is None:
if type is None and existing_pool is None:
async with get_collections_metadata_client() as collections_client:
if not is_interactive():
exit_with_error(
Expand Down Expand Up @@ -158,6 +166,8 @@ async def create(
table_kwargs={"show_lines": True},
)
type = worker["type"]
elif existing_pool:
type = existing_pool.type

available_work_pool_types = await get_available_work_pool_types()
if type not in available_work_pool_types:
Expand Down Expand Up @@ -200,8 +210,11 @@ async def create(
base_job_template=template_contents,
is_paused=paused,
)
work_pool = await client.create_work_pool(work_pool=wp)
app.console.print(f"Created work pool {work_pool.name!r}!\n", style="green")
work_pool = await client.create_work_pool(work_pool=wp, overwrite=overwrite)
action = "Updated" if overwrite and existing_pool else "Created"
app.console.print(
f"{action} work pool {work_pool.name!r}!\n", style="green"
)
if (
not work_pool.is_paused
and not work_pool.is_managed_pool
Expand Down Expand Up @@ -232,8 +245,7 @@ async def create(
exit_with_success("")
except ObjectAlreadyExists:
exit_with_error(
f"Work pool named {name!r} already exists. Please try creating your"
" work pool again with a different name."
f"Work pool named {name!r} already exists. Please use --overwrite to update it."
)


Expand Down
20 changes: 19 additions & 1 deletion src/prefect/client/orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2612,6 +2612,7 @@ async def read_work_pools(
async def create_work_pool(
self,
work_pool: WorkPoolCreate,
overwrite: bool = False,
) -> WorkPool:
"""
Creates a work pool with the provided configuration.
Expand All @@ -2629,7 +2630,24 @@ async def create_work_pool(
)
except httpx.HTTPStatusError as e:
if e.response.status_code == status.HTTP_409_CONFLICT:
raise prefect.exceptions.ObjectAlreadyExists(http_exc=e) from e
if overwrite:
existing_work_pool = await self.read_work_pool(
work_pool_name=work_pool.name
)
if existing_work_pool.type != work_pool.type:
warnings.warn(
"Overwriting work pool type is not supported. Ignoring provided type.",
category=UserWarning,
)
await self.update_work_pool(
work_pool_name=work_pool.name,
work_pool=WorkPoolUpdate.model_validate(
work_pool.model_dump(exclude={"name", "type"})
),
)
response = await self._client.get(f"/work_pools/{work_pool.name}")
else:
raise prefect.exceptions.ObjectAlreadyExists(http_exc=e) from e
else:
raise

Expand Down
41 changes: 39 additions & 2 deletions tests/cli/test_work_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ async def test_create_work_pool_name_conflict(
f"work-pool create {pool_name} -t process",
expected_code=1,
expected_output_contains=[
f"Work pool named {pool_name!r} already exists. Please try creating"
" your work pool again with a different name."
f"Work pool named {pool_name!r} already exists. Use --overwrite to update it."
],
)

Expand Down Expand Up @@ -389,6 +388,44 @@ async def provision(self, *args, **kwargs):
],
)

@pytest.mark.usefixtures("mock_collection_registry")
async def test_create_work_pool_with_overwrite(self, prefect_client):
pool_name = "overwrite-pool"

# Create initial work pool
await run_sync_in_worker_thread(
invoke_and_assert,
f"work-pool create {pool_name} --type process",
expected_code=0,
expected_output_contains=[f"Created work pool {pool_name!r}"],
)

initial_pool = await prefect_client.read_work_pool(pool_name)
assert initial_pool.name == pool_name
assert not initial_pool.is_paused

# Attempt to overwrite the work pool (updating is_paused)
await run_sync_in_worker_thread(
invoke_and_assert,
f"work-pool create {pool_name} --paused --overwrite",
expected_code=0,
expected_output_contains=[f"Updated work pool {pool_name!r}"],
)

updated_pool = await prefect_client.read_work_pool(pool_name)
assert updated_pool.name == pool_name
assert updated_pool.id == initial_pool.id
assert updated_pool.is_paused

await run_sync_in_worker_thread(
invoke_and_assert,
f"work-pool create {pool_name} --paused",
expected_code=1,
expected_output_contains=[
f"Work pool named {pool_name!r} already exists. Use --overwrite to update it."
],
)


class TestInspect:
async def test_inspect(self, prefect_client, work_pool):
Expand Down
33 changes: 33 additions & 0 deletions tests/client/test_prefect_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1856,6 +1856,39 @@ async def test_read_work_pools(self, prefect_client):
work_pool_2.id,
}

async def test_create_work_pool_overwriting_existing_work_pool(
self, prefect_client, work_pool
):
await prefect_client.create_work_pool(
work_pool=WorkPoolCreate(
name=work_pool.name,
type=work_pool.type,
description="new description",
),
overwrite=True,
)

updated_work_pool = await prefect_client.read_work_pool(work_pool.name)
assert updated_work_pool.description == "new description"

async def test_create_work_pool_with_attempt_to_overwrite_type(
self, prefect_client, work_pool
):
with pytest.warns(
UserWarning, match="Overwriting work pool type is not supported"
):
await prefect_client.create_work_pool(
work_pool=WorkPoolCreate(
name=work_pool.name,
type="kubernetes",
description=work_pool.description,
),
overwrite=True,
)

updated_work_pool = await prefect_client.read_work_pool(work_pool.name)
assert updated_work_pool.type == work_pool.type

async def test_update_work_pool(self, prefect_client):
work_pool = await prefect_client.create_work_pool(
work_pool=WorkPoolCreate(name="test-pool-1")
Expand Down

0 comments on commit 4df27f9

Please sign in to comment.