diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 26256759d495..44d3b75ef85f 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -1,5 +1,34 @@
# Prefect Release Notes
+## Release 2.19.7
+
+### Fixes
+- Fix bug where assignments inside functions are evaluated when running `prefect deploy` - https://github.com/PrefectHQ/prefect/pull/14403
+- Allow `uvicorn>0.29.0` - https://github.com/PrefectHQ/prefect/pull/14370
+
+### Documentation
+- Add Prefect 3 announcement bar to 2.x docs - https://github.com/PrefectHQ/prefect/pull/14248
+- Update Prefect intro in 2.19.x docs - https://github.com/PrefectHQ/prefect/pull/14376
+- Update docs edit pencil icon link to route to `2.x` branch in GitHub - https://github.com/PrefectHQ/prefect/pull/14378
+
+**All changes**: https://github.com/PrefectHQ/prefect/compare/2.19.6...2.19.7
+
+## Release 2.19.6
+
+### Enhancements
+- Enable deploying from local paths with `Flow.deploy` - https://github.com/PrefectHQ/prefect/pull/13981
+
+### Fixes
+- Fix `concurrency` timeout scoping - https://github.com/PrefectHQ/prefect/pull/14183
+- Fix deployment and work queue statuses - https://github.com/PrefectHQ/prefect/pull/14097
+* Fix resolution of block documents in `job_variables` in a `prefect.yaml` before saving server-side - https://github.com/PrefectHQ/prefect/pull/14156
+
+### Documentation
+- Fix typo in Cloud Run V2 worker navigation link - https://github.com/PrefectHQ/prefect/pull/14170
+
+
+**All changes**: https://github.com/PrefectHQ/prefect/compare/2.19.5...2.19.6
+
## Release 2.19.5
### Fixes
diff --git a/docs/guides/host.md b/docs/guides/host.md
index ced187627c32..97e3b69fcf27 100644
--- a/docs/guides/host.md
+++ b/docs/guides/host.md
@@ -21,6 +21,7 @@ Learn how to host your own Prefect server instance.
If you would like to host a Prefect server instance on Kubernetes, check out the prefect-server [Helm chart](https://github.com/PrefectHQ/prefect-helm/tree/main/charts/prefect-server).
After installing Prefect, you have:
+
- a Python SDK client that can communicate with [Prefect Cloud](https://app.prefect.cloud)
- an [API server](/api-ref/) instance backed by a database and a UI
diff --git a/docs/index.md b/docs/index.md
index 609ba1957018..93886442bad3 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -14,7 +14,7 @@ search:
# **Welcome to Prefect**
-Prefect is a workflow orchestration tool empowering developers to build, observe, and react to data pipelines.
+Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
It's the easiest way to transform any Python function into a unit of work that can be observed and orchestrated. Just bring your Python code, sprinkle in a few decorators, and go!
diff --git a/docs/overrides/main.html b/docs/overrides/main.html
index 6a5a195a1727..d9395c50bf4a 100644
--- a/docs/overrides/main.html
+++ b/docs/overrides/main.html
@@ -4,4 +4,13 @@
{% block analytics %}
{{ super() }}
+{% endblock %}
+
+{% block announce %}
+
+
{% endblock %}
\ No newline at end of file
diff --git a/docs/stylesheets/extra.css b/docs/stylesheets/extra.css
index 8c9a1ebe4d4f..9183375c2d26 100644
--- a/docs/stylesheets/extra.css
+++ b/docs/stylesheets/extra.css
@@ -472,4 +472,8 @@ to force column width */
a.cloud-button:hover {
background-color: rgb(2, 37, 172);
-}
\ No newline at end of file
+}
+
+[data-md-component="announce"] {
+ display: block !important;
+}
diff --git a/mkdocs.yml b/mkdocs.yml
index 08d72618a2a0..f369da6b8a09 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -1,7 +1,7 @@
site_name: Prefect Docs
site_url: https://docs.prefect.io/
repo_url: https://github.com/PrefectHQ/prefect
-edit_uri: edit/main/docs
+edit_uri: edit/2.x/docs
extra_css:
- stylesheets/theme.css
- stylesheets/admonitions.css
@@ -322,7 +322,7 @@ nav:
- Deployment Steps: integrations/prefect-gcp/deployments/steps.md
- Workers:
- Cloud Run: integrations/prefect-gcp/cloud_run_worker.md
- - Cloud Run V2: cintegrations/prefect-gcp/loud_run_worker_v2.md
+ - Cloud Run V2: integrations/prefect-gcp/cloud_run_worker_v2.md
- Vertex AI: integrations/prefect-gcp/vertex_worker.md
- GitHub:
- integrations/prefect-github/index.md
diff --git a/requirements-client.txt b/requirements-client.txt
index b6f14219ff57..3733b89dccc4 100644
--- a/requirements-client.txt
+++ b/requirements-client.txt
@@ -32,7 +32,7 @@ sniffio >=1.3.0, < 2.0.0
toml >= 0.10.0
typing_extensions >= 4.5.0, < 5.0.0
ujson >= 5.8.0, < 6.0.0
-uvicorn >= 0.14.0, < 0.29.0
+uvicorn >= 0.14.0, !=0.29.0
websockets >= 10.4, < 13.0
# additional dependencies of starlette, which we're currently vendoring
diff --git a/requirements.txt b/requirements.txt
index 491141eb1742..6b9d2b78b106 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -7,7 +7,7 @@ asyncpg >= 0.23
click >= 8.0, < 8.2
cryptography >= 36.0.1
dateparser >= 1.1.1, < 2.0.0
-docker >= 4.0, < 7.0
+docker >= 4.0
graphviz >= 0.20.1
griffe >= 0.20.0
jinja2 >= 3.0.0, < 4.0.0
diff --git a/src/integrations/prefect-aws/prefect_aws/s3.py b/src/integrations/prefect-aws/prefect_aws/s3.py
index 84410f148fcc..a5b8bef44290 100644
--- a/src/integrations/prefect-aws/prefect_aws/s3.py
+++ b/src/integrations/prefect-aws/prefect_aws/s3.py
@@ -18,9 +18,9 @@
from prefect.utilities.filesystem import filter_files
if PYDANTIC_VERSION.startswith("2."):
- from pydantic.v1 import Field
+ from pydantic.v1 import Field, root_validator
else:
- from pydantic import Field
+ from pydantic import Field, root_validator
from prefect_aws import AwsCredentials, MinIOCredentials
from prefect_aws.client_parameters import AwsClientParameters
@@ -426,6 +426,22 @@ class S3Bucket(WritableFileSystem, WritableDeploymentStorage, ObjectStorageBlock
),
)
+ @root_validator
+ def validate_credentials(cls, values):
+ creds = values["credentials"]
+ if isinstance(creds, AwsCredentials) and isinstance(
+ creds.aws_client_parameters, dict
+ ):
+ # There's an issue with pydantic and nested blocks in a Union
+ # that is causing `aws_client_parameters` to be a dict in some
+ # cases instead of an `AwsClientParameters` object. Here we
+ # convert it to the correct type.
+ creds.aws_client_parameters = AwsClientParameters(
+ **creds.aws_client_parameters
+ )
+
+ return values
+
# Property to maintain compatibility with storage block based deployments
@property
def basepath(self) -> str:
diff --git a/src/integrations/prefect-aws/tests/test_s3.py b/src/integrations/prefect-aws/tests/test_s3.py
index 7a14ce839492..f33a657e32d0 100644
--- a/src/integrations/prefect-aws/tests/test_s3.py
+++ b/src/integrations/prefect-aws/tests/test_s3.py
@@ -3,11 +3,14 @@
from pathlib import Path, PurePosixPath, PureWindowsPath
import boto3
+import botocore
+import botocore.exceptions
import pytest
from botocore.exceptions import ClientError, EndpointConnectionError
from moto import mock_s3
from prefect_aws import AwsCredentials, MinIOCredentials
from prefect_aws.client_parameters import AwsClientParameters
+from prefect_aws.credentials import _get_client_cached
from prefect_aws.s3 import (
S3Bucket,
s3_copy,
@@ -1118,3 +1121,23 @@ def test_move_subpaths(
to_bucket=to_bucket,
)
assert key == expected_path
+
+ def test_round_trip_default_credentials(self):
+ # Regression test for
+ # https://github.com/PrefectHQ/prefect/issues/14147
+ block = S3Bucket(
+ bucket_name="test",
+ bucket_path="test/weather",
+ )
+ block.save(name="default-creds", overwrite=True)
+
+ loaded = S3Bucket.load("default-creds")
+
+ # Ensure that the client cache is cleared and that we will in fact
+ # get the broken client instead of one created earlier in this suite.
+ _get_client_cached.cache_clear()
+
+ # Attempt to use the client, which will raise an error, but it should
+ # be a `NoCredentialsError` instead of an opaque 'unhashable dict' error.
+ with pytest.raises(botocore.exceptions.NoCredentialsError):
+ loaded.copy_object("my_folder/notes.txt", "my_folder/notes_copy.txt")
diff --git a/src/integrations/prefect-bitbucket/prefect_bitbucket/repository.py b/src/integrations/prefect-bitbucket/prefect_bitbucket/repository.py
index ea57ddfeed1b..03ed32007e0f 100644
--- a/src/integrations/prefect-bitbucket/prefect_bitbucket/repository.py
+++ b/src/integrations/prefect-bitbucket/prefect_bitbucket/repository.py
@@ -36,7 +36,7 @@
"""
import io
-from distutils.dir_util import copy_tree
+import shutil
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional, Tuple, Union
@@ -197,4 +197,6 @@ async def get_directory(
dst_dir=local_path, src_dir=tmp_dir, sub_directory=from_path
)
- copy_tree(src=content_source, dst=content_destination)
+ shutil.copytree(
+ src=content_source, dst=content_destination, dirs_exist_ok=True
+ )
diff --git a/src/integrations/prefect-github/prefect_github/repository.py b/src/integrations/prefect-github/prefect_github/repository.py
index dc1b8137293d..b63bce15d4dc 100644
--- a/src/integrations/prefect-github/prefect_github/repository.py
+++ b/src/integrations/prefect-github/prefect_github/repository.py
@@ -9,8 +9,8 @@
import io
import shlex
+import shutil
from datetime import datetime
-from distutils.dir_util import copy_tree
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Dict, Iterable, Optional, Tuple, Union
@@ -160,7 +160,9 @@ async def get_directory(
dst_dir=local_path, src_dir=tmp_path_str, sub_directory=from_path
)
- copy_tree(src=content_source, dst=content_destination)
+ shutil.copytree(
+ src=content_source, dst=content_destination, dirs_exist_ok=True
+ )
@task
diff --git a/src/integrations/prefect-gitlab/prefect_gitlab/repositories.py b/src/integrations/prefect-gitlab/prefect_gitlab/repositories.py
index 633d4075aef8..e1c0ddf50ac2 100644
--- a/src/integrations/prefect-gitlab/prefect_gitlab/repositories.py
+++ b/src/integrations/prefect-gitlab/prefect_gitlab/repositories.py
@@ -40,9 +40,10 @@
private_gitlab_block.save()
```
"""
+
import io
+import shutil
import urllib.parse
-from distutils.dir_util import copy_tree
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional, Tuple, Union
@@ -213,4 +214,6 @@ async def get_directory(
dst_dir=local_path, src_dir=tmp_dir, sub_directory=from_path
)
- copy_tree(src=content_source, dst=content_destination)
+ shutil.copytree(
+ src=content_source, dst=content_destination, dirs_exist_ok=True
+ )
diff --git a/src/prefect/cli/cloud/__init__.py b/src/prefect/cli/cloud/__init__.py
index 21f807fed6c4..94d4c584a70f 100644
--- a/src/prefect/cli/cloud/__init__.py
+++ b/src/prefect/cli/cloud/__init__.py
@@ -131,9 +131,6 @@ async def serve_login_api(cancel_scope, task_status):
cause = exc.__context__ # Hide the system exit
traceback.print_exception(type(cause), value=cause, tb=cause.__traceback__)
cancel_scope.cancel()
- except KeyboardInterrupt:
- # `uvicorn.serve` can raise `KeyboardInterrupt` when it's done serving.
- cancel_scope.cancel()
else:
# Exit if we are done serving the API
# Uvicorn overrides signal handlers so without this Ctrl-C is broken
@@ -274,9 +271,8 @@ async def login_with_browser() -> str:
app.console.print("Waiting for response...")
await result_event.wait()
- # Uvicorn installs signal handlers, this is the cleanest way to shutdown the
- # login API
- raise_signal(signal.SIGINT)
+ # Shut down the background uvicorn server
+ tg.cancel_scope.cancel()
result = login_api.extra.get("result")
if not result:
diff --git a/src/prefect/cli/deploy.py b/src/prefect/cli/deploy.py
index aaa9ff1f6816..35980a04a26a 100644
--- a/src/prefect/cli/deploy.py
+++ b/src/prefect/cli/deploy.py
@@ -461,12 +461,17 @@ async def _run_single_deploy(
push_steps = deploy_config.get("push", actions.get("push")) or []
pull_steps = deploy_config.get("pull", actions.get("pull")) or []
+ # Don't resolve references in job variables. That will happen when
+ # preparing for a flow run.
+ _job_variables = deploy_config["work_pool"].pop("job_variables", {})
deploy_config = await resolve_block_document_references(deploy_config)
deploy_config = await resolve_variables(deploy_config)
# check for env var placeholders early so users can pass work pool names, etc.
deploy_config = apply_values(deploy_config, os.environ, remove_notset=False)
+ deploy_config["work_pool"]["job_variables"] = _job_variables
+
if not deploy_config.get("entrypoint"):
if not is_interactive():
raise ValueError(
@@ -676,7 +681,6 @@ async def _run_single_deploy(
deploy_config_before_templating = deepcopy(deploy_config)
## apply templating from build and push steps to the final deployment spec
_parameter_schema = deploy_config.pop("parameter_openapi_schema")
-
_schedules = deploy_config.pop("schedules")
deploy_config = apply_values(deploy_config, step_outputs)
diff --git a/src/prefect/cli/server.py b/src/prefect/cli/server.py
index 16de9047afaa..825eeaf93bf4 100644
--- a/src/prefect/cli/server.py
+++ b/src/prefect/cli/server.py
@@ -110,6 +110,7 @@ async def start(
server_env["PREFECT_SERVER_ANALYTICS_ENABLED"] = str(analytics)
server_env["PREFECT_API_SERVICES_LATE_RUNS_ENABLED"] = str(late_runs)
server_env["PREFECT_API_SERVICES_UI"] = str(ui)
+ server_env["PREFECT_UI_ENABLED"] = str(ui)
server_env["PREFECT_LOGGING_SERVER_LEVEL"] = log_level
base_url = f"http://{host}:{port}"
diff --git a/src/prefect/concurrency/asyncio.py b/src/prefect/concurrency/asyncio.py
index b9e346c27e8a..95713d0f70be 100644
--- a/src/prefect/concurrency/asyncio.py
+++ b/src/prefect/concurrency/asyncio.py
@@ -13,7 +13,6 @@
from prefect import get_client
from prefect.client.schemas.responses import MinimalConcurrencyLimitResponse
-from prefect.utilities.timeout import timeout_async
from .events import (
_emit_concurrency_acquisition_events,
@@ -26,6 +25,10 @@ class ConcurrencySlotAcquisitionError(Exception):
"""Raised when an unhandlable occurs while acquiring concurrency slots."""
+class AcquireConcurrencySlotTimeoutError(TimeoutError):
+ """Raised when acquiring a concurrency slot times out."""
+
+
@asynccontextmanager
async def concurrency(
names: Union[str, List[str]],
@@ -58,8 +61,9 @@ async def main():
```
"""
names = names if isinstance(names, list) else [names]
- with timeout_async(seconds=timeout_seconds):
- limits = await _acquire_concurrency_slots(names, occupy)
+ limits = await _acquire_concurrency_slots(
+ names, occupy, timeout_seconds=timeout_seconds
+ )
acquisition_time = pendulum.now("UTC")
emitted_events = _emit_concurrency_acquisition_events(limits, occupy)
@@ -91,12 +95,18 @@ async def _acquire_concurrency_slots(
names: List[str],
slots: int,
mode: Union[Literal["concurrency"], Literal["rate_limit"]] = "concurrency",
+ timeout_seconds: Optional[float] = None,
) -> List[MinimalConcurrencyLimitResponse]:
service = ConcurrencySlotAcquisitionService.instance(frozenset(names))
- future = service.send((slots, mode))
+ future = service.send((slots, mode, timeout_seconds))
response_or_exception = await asyncio.wrap_future(future)
if isinstance(response_or_exception, Exception):
+ if isinstance(response_or_exception, TimeoutError):
+ raise AcquireConcurrencySlotTimeoutError(
+ f"Attempt to acquire concurrency slots timed out after {timeout_seconds} second(s)"
+ ) from response_or_exception
+
raise ConcurrencySlotAcquisitionError(
f"Unable to acquire concurrency slots on {names!r}"
) from response_or_exception
diff --git a/src/prefect/concurrency/services.py b/src/prefect/concurrency/services.py
index 8aede9ae6eaf..2ab19362e00f 100644
--- a/src/prefect/concurrency/services.py
+++ b/src/prefect/concurrency/services.py
@@ -3,6 +3,7 @@
from contextlib import asynccontextmanager
from typing import (
FrozenSet,
+ Optional,
Tuple,
)
@@ -13,6 +14,7 @@
from prefect._internal.concurrency import logger
from prefect._internal.concurrency.services import QueueService
from prefect.client.orchestration import PrefectClient
+from prefect.utilities.timeout import timeout_async
class ConcurrencySlotAcquisitionService(QueueService):
@@ -27,10 +29,12 @@ async def _lifespan(self):
self._client = client
yield
- async def _handle(self, item: Tuple[int, str, concurrent.futures.Future]):
- occupy, mode, future = item
+ async def _handle(
+ self, item: Tuple[int, str, Optional[float], concurrent.futures.Future]
+ ):
+ occupy, mode, timeout_seconds, future = item
try:
- response = await self.acquire_slots(occupy, mode)
+ response = await self.acquire_slots(occupy, mode, timeout_seconds)
except Exception as exc:
# If the request to the increment endpoint fails in a non-standard
# way, we need to set the future's result so that the caller can
@@ -40,25 +44,28 @@ async def _handle(self, item: Tuple[int, str, concurrent.futures.Future]):
else:
future.set_result(response)
- async def acquire_slots(self, slots: int, mode: str) -> httpx.Response:
- while True:
- try:
- response = await self._client.increment_concurrency_slots(
- names=self.concurrency_limit_names, slots=slots, mode=mode
- )
- except Exception as exc:
- if (
- isinstance(exc, httpx.HTTPStatusError)
- and exc.response.status_code == status.HTTP_423_LOCKED
- ):
- retry_after = float(exc.response.headers["Retry-After"])
- await asyncio.sleep(retry_after)
+ async def acquire_slots(
+ self, slots: int, mode: str, timeout_seconds: Optional[float] = None
+ ):
+ with timeout_async(timeout_seconds):
+ while True:
+ try:
+ response = await self._client.increment_concurrency_slots(
+ names=self.concurrency_limit_names, slots=slots, mode=mode
+ )
+ except Exception as exc:
+ if (
+ isinstance(exc, httpx.HTTPStatusError)
+ and exc.response.status_code == status.HTTP_423_LOCKED
+ ):
+ retry_after = float(exc.response.headers["Retry-After"])
+ await asyncio.sleep(retry_after)
+ else:
+ raise exc
else:
- raise exc
- else:
- return response
+ return response
- def send(self, item: Tuple[int, str]):
+ def send(self, item: Tuple[int, str, Optional[float]]) -> concurrent.futures.Future:
with self._lock:
if self._stopped:
raise RuntimeError("Cannot put items in a stopped service instance.")
@@ -66,7 +73,7 @@ def send(self, item: Tuple[int, str]):
logger.debug("Service %r enqueuing item %r", self, item)
future: concurrent.futures.Future = concurrent.futures.Future()
- occupy, mode = item
- self._queue.put_nowait((occupy, mode, future))
+ occupy, mode, timeout_seconds = item
+ self._queue.put_nowait((occupy, mode, timeout_seconds, future))
return future
diff --git a/src/prefect/concurrency/sync.py b/src/prefect/concurrency/sync.py
index 3551c28b2853..d572ca641551 100644
--- a/src/prefect/concurrency/sync.py
+++ b/src/prefect/concurrency/sync.py
@@ -12,7 +12,6 @@
from prefect._internal.concurrency.api import create_call, from_sync
from prefect._internal.concurrency.event_loop import get_running_loop
from prefect.client.schemas.responses import MinimalConcurrencyLimitResponse
-from prefect.utilities.timeout import timeout
from .asyncio import (
_acquire_concurrency_slots,
@@ -57,10 +56,9 @@ def main():
"""
names = names if isinstance(names, list) else [names]
- with timeout(seconds=timeout_seconds):
- limits: List[MinimalConcurrencyLimitResponse] = _call_async_function_from_sync(
- _acquire_concurrency_slots, names, occupy
- )
+ limits: List[MinimalConcurrencyLimitResponse] = _call_async_function_from_sync(
+ _acquire_concurrency_slots, names, occupy, timeout_seconds=timeout_seconds
+ )
acquisition_time = pendulum.now("UTC")
emitted_events = _emit_concurrency_acquisition_events(limits, occupy)
diff --git a/src/prefect/exceptions.py b/src/prefect/exceptions.py
index afe7f321f529..674ae1f5349c 100644
--- a/src/prefect/exceptions.py
+++ b/src/prefect/exceptions.py
@@ -178,7 +178,10 @@ def __init__(self, msg: str):
@classmethod
def from_validation_error(cls, exc: ValidationError) -> Self:
- bad_params = [f'{".".join(err["loc"])}: {err["msg"]}' for err in exc.errors()]
+ bad_params = [
+ f'{".".join(str(item) for item in err["loc"])}: {err["msg"]}'
+ for err in exc.errors()
+ ]
msg = "Flow run received invalid parameters:\n - " + "\n - ".join(bad_params)
return cls(msg)
diff --git a/src/prefect/flows.py b/src/prefect/flows.py
index 9a82556ea769..967cb23ea86b 100644
--- a/src/prefect/flows.py
+++ b/src/prefect/flows.py
@@ -83,8 +83,9 @@
from prefect.results import ResultSerializer, ResultStorage
from prefect.runner.storage import (
BlockStorageAdapter,
+ LocalStorage,
RunnerStorage,
- create_storage_from_url,
+ create_storage_from_source,
)
from prefect.settings import (
PREFECT_DEFAULT_WORK_POOL_NAME,
@@ -910,7 +911,7 @@ async def from_source(
```
"""
if isinstance(source, str):
- storage = create_storage_from_url(source)
+ storage = create_storage_from_source(source)
elif isinstance(source, RunnerStorage):
storage = source
elif hasattr(source, "get_directory"):
@@ -920,9 +921,11 @@ async def from_source(
f"Unsupported source type {type(source).__name__!r}. Please provide a"
" URL to remote storage or a storage object."
)
+
with tempfile.TemporaryDirectory() as tmpdir:
- storage.set_base_path(Path(tmpdir))
- await storage.pull_code()
+ if not isinstance(storage, LocalStorage):
+ storage.set_base_path(Path(tmpdir))
+ await storage.pull_code()
full_entrypoint = str(storage.destination / entrypoint)
flow: "Flow" = await from_async.wait_for_call_in_new_thread(
diff --git a/src/prefect/runner/storage.py b/src/prefect/runner/storage.py
index 5b4c72003c6d..bf56f0baddcc 100644
--- a/src/prefect/runner/storage.py
+++ b/src/prefect/runner/storage.py
@@ -564,8 +564,74 @@ def __eq__(self, __value) -> bool:
return False
-def create_storage_from_url(
- url: str, pull_interval: Optional[int] = 60
+class LocalStorage:
+ """
+ Sets the working directory in the local filesystem.
+
+ Parameters:
+ Path: Local file path to set the working directory for the flow
+
+ Examples:
+ Sets the working directory for the local path to the flow:
+
+ ```python
+ from prefect.runner.storage import Localstorage
+
+ storage = LocalStorage(
+ path="/path/to/local/flow_directory",
+ )
+ ```
+ """
+
+ def __init__(
+ self,
+ path: str,
+ pull_interval: Optional[int] = None,
+ ):
+ self._path = Path(path).resolve()
+ self._logger = get_logger("runner.storage.local-storage")
+ self._storage_base_path = Path.cwd()
+ self._pull_interval = pull_interval
+
+ @property
+ def destination(self) -> Path:
+ return self._path
+
+ def set_base_path(self, path: Path):
+ self._storage_base_path = path
+
+ @property
+ def pull_interval(self) -> Optional[int]:
+ return self._pull_interval
+
+ async def pull_code(self):
+ # Local storage assumes the code already exists on the local filesystem
+ # and does not need to be pulled from a remote location
+ pass
+
+ def to_pull_step(self) -> dict:
+ """
+ Returns a dictionary representation of the storage object that can be
+ used as a deployment pull step.
+ """
+ step = {
+ "prefect.deployments.steps.set_working_directory": {
+ "directory": str(self.destination)
+ }
+ }
+ return step
+
+ def __eq__(self, __value) -> bool:
+ if isinstance(__value, LocalStorage):
+ return self._path == __value._path
+ return False
+
+ def __repr__(self) -> str:
+ return f"LocalStorage(path={self._path!r})"
+
+
+def create_storage_from_source(
+ source: str, pull_interval: Optional[int] = 60
) -> RunnerStorage:
"""
Creates a storage object from a URL.
@@ -579,11 +645,18 @@ def create_storage_from_url(
Returns:
RunnerStorage: A runner storage compatible object
"""
- parsed_url = urlparse(url)
- if parsed_url.scheme == "git" or parsed_url.path.endswith(".git"):
- return GitRepository(url=url, pull_interval=pull_interval)
+ logger = get_logger("runner.storage")
+ parsed_source = urlparse(source)
+ if parsed_source.scheme == "git" or parsed_source.path.endswith(".git"):
+ return GitRepository(url=source, pull_interval=pull_interval)
+ elif parsed_source.scheme in ("file", "local"):
+ source_path = source.split("://", 1)[-1]
+ return LocalStorage(path=source_path, pull_interval=pull_interval)
+ elif parsed_source.scheme in fsspec.available_protocols():
+ return RemoteStorage(url=source, pull_interval=pull_interval)
else:
- return RemoteStorage(url=url, pull_interval=pull_interval)
+ logger.debug("No valid fsspec protocol found for URL, assuming local storage.")
+ return LocalStorage(path=source, pull_interval=pull_interval)
def _format_token_from_credentials(netloc: str, credentials: dict) -> str:
diff --git a/src/prefect/server/api/server.py b/src/prefect/server/api/server.py
index 133b7e88eec1..3694d2d87d0a 100644
--- a/src/prefect/server/api/server.py
+++ b/src/prefect/server/api/server.py
@@ -553,33 +553,43 @@ async def start_services():
service_instances = []
if prefect.settings.PREFECT_API_SERVICES_SCHEDULER_ENABLED.value():
- service_instances.append(services.scheduler.Scheduler())
- service_instances.append(services.scheduler.RecentDeploymentsScheduler())
+ service_instances.append(services.scheduler.Scheduler(handle_signals=False))
+ service_instances.append(
+ services.scheduler.RecentDeploymentsScheduler(handle_signals=False)
+ )
if prefect.settings.PREFECT_API_SERVICES_LATE_RUNS_ENABLED.value():
- service_instances.append(services.late_runs.MarkLateRuns())
+ service_instances.append(
+ services.late_runs.MarkLateRuns(handle_signals=False)
+ )
if prefect.settings.PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_ENABLED.value():
- service_instances.append(services.pause_expirations.FailExpiredPauses())
+ service_instances.append(
+ services.pause_expirations.FailExpiredPauses(handle_signals=False)
+ )
if prefect.settings.PREFECT_API_SERVICES_CANCELLATION_CLEANUP_ENABLED.value():
service_instances.append(
- services.cancellation_cleanup.CancellationCleanup()
+ services.cancellation_cleanup.CancellationCleanup(handle_signals=False)
)
if prefect.settings.PREFECT_SERVER_ANALYTICS_ENABLED.value():
- service_instances.append(services.telemetry.Telemetry())
+ service_instances.append(services.telemetry.Telemetry(handle_signals=False))
if prefect.settings.PREFECT_API_SERVICES_FLOW_RUN_NOTIFICATIONS_ENABLED.value():
service_instances.append(
- services.flow_run_notifications.FlowRunNotifications()
+ services.flow_run_notifications.FlowRunNotifications(
+ handle_signals=False
+ )
)
if prefect.settings.PREFECT_API_SERVICES_FOREMAN_ENABLED.value():
- service_instances.append(services.foreman.Foreman())
+ service_instances.append(services.foreman.Foreman(handle_signals=False))
if prefect.settings.PREFECT_EXPERIMENTAL_ENABLE_TASK_SCHEDULING.value():
- service_instances.append(services.task_scheduling.TaskSchedulingTimeouts())
+ service_instances.append(
+ services.task_scheduling.TaskSchedulingTimeouts(handle_signals=False)
+ )
if (
prefect.settings.PREFECT_EXPERIMENTAL_EVENTS.value()
@@ -592,7 +602,7 @@ async def start_services():
and prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED.value()
):
service_instances.append(ReactiveTriggers())
- service_instances.append(ProactiveTriggers())
+ service_instances.append(ProactiveTriggers(handle_signals=False))
service_instances.append(Actions())
if (
diff --git a/src/prefect/server/api/workers.py b/src/prefect/server/api/workers.py
index 7a6b233ed0cb..ab92dfb3e264 100644
--- a/src/prefect/server/api/workers.py
+++ b/src/prefect/server/api/workers.py
@@ -376,20 +376,19 @@ async def get_scheduled_flow_runs(
limit=limit,
)
- polled_work_queue_ids = [wq.id for wq in work_queues]
- ready_work_queue_ids = [
- wq.id for wq in work_queues if wq.status != WorkQueueStatus.READY
- ]
-
background_tasks.add_task(
mark_work_queues_ready,
- polled_work_queue_ids=polled_work_queue_ids,
- ready_work_queue_ids=ready_work_queue_ids,
+ polled_work_queue_ids=[
+ wq.id for wq in work_queues if wq.status != WorkQueueStatus.NOT_READY
+ ],
+ ready_work_queue_ids=[
+ wq.id for wq in work_queues if wq.status == WorkQueueStatus.NOT_READY
+ ],
)
background_tasks.add_task(
mark_deployments_ready,
- work_queue_ids=ready_work_queue_ids,
+ work_queue_ids=[wq.id for wq in work_queues],
)
return queue_response
diff --git a/src/prefect/utilities/importtools.py b/src/prefect/utilities/importtools.py
index 31db25fe4a58..5e364e37b49f 100644
--- a/src/prefect/utilities/importtools.py
+++ b/src/prefect/utilities/importtools.py
@@ -423,7 +423,7 @@ def safe_load_namespace(source_code: str):
logger.debug("Failed to import from %s: %s", node.module, e)
# Handle local definitions
- for node in ast.walk(parsed_code):
+ for node in parsed_code.body:
if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.Assign)):
try:
# Compile and execute each class and function definition and assignment
diff --git a/tests/cli/test_deploy.py b/tests/cli/test_deploy.py
index 666c3a6bdf2c..32390cdfb999 100644
--- a/tests/cli/test_deploy.py
+++ b/tests/cli/test_deploy.py
@@ -4792,6 +4792,84 @@ async def test_deploy_resolves_block_references_in_deployments_section(
assert prefect_config["deployments"][0]["work_pool"]["name"] == work_pool.name
+ @pytest.mark.usefixtures("project_dir")
+ async def test_deploy_does_not_resolve_block_references_in_job_variables_section(
+ self, prefect_client, work_pool
+ ):
+ """
+ Ensure block references are resolved in deployments section of prefect.yaml
+ """
+ await Secret(value="bloop").save(name="test-secret-block")
+
+ # add block reference to prefect.yaml
+ prefect_file = Path("prefect.yaml")
+ with prefect_file.open(mode="r") as f:
+ prefect_config = yaml.safe_load(f)
+
+ prefect_config["deployments"] = [
+ {
+ "name": "test-name",
+ "entrypoint": "flows/hello.py:my_flow",
+ "work_pool": {
+ "name": work_pool.name,
+ "job_variables": {
+ "env": {
+ "SECRET": "{{ prefect.blocks.secret.test-secret-block}}"
+ }
+ },
+ },
+ }
+ ]
+
+ with prefect_file.open(mode="w") as f:
+ yaml.safe_dump(prefect_config, f)
+
+ # ensure block reference was added
+ assert (
+ prefect_config["deployments"][0]["work_pool"]["job_variables"]["env"][
+ "SECRET"
+ ]
+ == "{{ prefect.blocks.secret.test-secret-block}}"
+ )
+
+ # run deploy
+ await run_sync_in_worker_thread(
+ invoke_and_assert,
+ command="deploy flows/hello.py:my_flow -n test-name",
+ expected_code=0,
+ user_input=(
+ # reject schedule
+ "n"
+ + readchar.key.ENTER
+ # accept saving configuration
+ + "y"
+ + readchar.key.ENTER
+ # accept overwrite config
+ + "y"
+ + readchar.key.ENTER
+ ),
+ expected_output_contains=[
+ "Deployment 'An important name/test-name' successfully created",
+ ],
+ )
+
+ deployment = await prefect_client.read_deployment_by_name(
+ "An important name/test-name"
+ )
+ assert deployment.name == "test-name"
+ assert deployment.work_pool_name == work_pool.name
+
+ # ensure block reference was not resolved
+ with prefect_file.open(mode="r") as f:
+ prefect_config = yaml.safe_load(f)
+
+ assert (
+ prefect_config["deployments"][0]["work_pool"]["job_variables"]["env"][
+ "SECRET"
+ ]
+ == "{{ prefect.blocks.secret.test-secret-block}}"
+ )
+
@pytest.mark.usefixtures("project_dir", "interactive_console")
async def test_deploy_resolves_variables_in_deployments_section(
self, prefect_client, work_pool
diff --git a/tests/concurrency/test_concurrency_asyncio.py b/tests/concurrency/test_concurrency_asyncio.py
index 2f747ee8a3e6..ce9633cc5cd3 100644
--- a/tests/concurrency/test_concurrency_asyncio.py
+++ b/tests/concurrency/test_concurrency_asyncio.py
@@ -1,7 +1,8 @@
-import asyncio
from unittest import mock
import pytest
+from httpx import HTTPStatusError, Request, Response
+from prefect._vendor.starlette import status
from prefect import flow, task
from prefect.concurrency.asyncio import (
@@ -35,7 +36,7 @@ async def resource_heavy():
) as release_spy:
await resource_heavy()
- acquire_spy.assert_called_once_with(["test"], 1)
+ acquire_spy.assert_called_once_with(["test"], 1, timeout_seconds=None)
# On release we calculate how many seconds the slots were occupied
# for, so here we really just want to make sure that the value
@@ -173,18 +174,25 @@ async def resource_heavy():
@pytest.fixture
-def mock_acquire_concurrency_slots(monkeypatch):
- async def blocks_forever(*args, **kwargs):
- while True:
- await asyncio.sleep(1)
+def mock_increment_concurrency_slots(monkeypatch):
+ async def mocked_increment_concurrency_slots(*args, **kwargs):
+ response = Response(
+ status_code=status.HTTP_423_LOCKED,
+ headers={"Retry-After": "0.01"},
+ )
+ raise HTTPStatusError(
+ message="Locked",
+ request=Request("GET", "http://test.com"),
+ response=response,
+ )
monkeypatch.setattr(
- "prefect.concurrency.asyncio._acquire_concurrency_slots",
- blocks_forever,
+ "prefect.client.orchestration.PrefectClient.increment_concurrency_slots",
+ mocked_increment_concurrency_slots,
)
-@pytest.mark.usefixtures("concurrency_limit", "mock_acquire_concurrency_slots")
+@pytest.mark.usefixtures("concurrency_limit", "mock_increment_concurrency_slots")
async def test_concurrency_respects_timeout():
with pytest.raises(TimeoutError, match=".*timed out after 0.01 second(s)*"):
async with concurrency("test", occupy=1, timeout_seconds=0.01):
diff --git a/tests/concurrency/test_concurrency_slot_acquisition_service.py b/tests/concurrency/test_concurrency_slot_acquisition_service.py
index 86eddecc5894..16cdd0fc688f 100644
--- a/tests/concurrency/test_concurrency_slot_acquisition_service.py
+++ b/tests/concurrency/test_concurrency_slot_acquisition_service.py
@@ -41,7 +41,7 @@ async def test_returns_successful_response(mocked_client):
expected_mode = "concurrency"
service = ConcurrencySlotAcquisitionService.instance(frozenset(expected_names))
- future = service.send((expected_slots, expected_mode))
+ future = service.send((expected_slots, expected_mode, None))
await service.drain()
returned_response = await asyncio.wrap_future(future)
assert returned_response == response
@@ -67,7 +67,7 @@ async def test_retries_failed_call_respects_retry_after_header(mocked_client):
service = ConcurrencySlotAcquisitionService.instance(frozenset(limit_names))
with mock.patch("prefect.concurrency.asyncio.asyncio.sleep") as sleep:
- future = service.send((1, "concurrency"))
+ future = service.send((1, "concurrency", None))
await service.drain()
returned_response = await asyncio.wrap_future(future)
@@ -91,7 +91,7 @@ async def test_failed_call_status_code_not_retryable_returns_exception(mocked_cl
limit_names = sorted(["api", "database"])
service = ConcurrencySlotAcquisitionService.instance(frozenset(limit_names))
- future = service.send((1, "concurrency"))
+ future = service.send((1, "concurrency", None))
await service.drain()
exception = await asyncio.wrap_future(future)
@@ -106,7 +106,7 @@ async def test_basic_exception_returns_exception(mocked_client):
limit_names = sorted(["api", "database"])
service = ConcurrencySlotAcquisitionService.instance(frozenset(limit_names))
- future = service.send((1, "concurrency"))
+ future = service.send((1, "concurrency", None))
await service.drain()
exception = await asyncio.wrap_future(future)
diff --git a/tests/concurrency/test_concurrency_sync.py b/tests/concurrency/test_concurrency_sync.py
index c76007a6f8c4..2d6c5e36a992 100644
--- a/tests/concurrency/test_concurrency_sync.py
+++ b/tests/concurrency/test_concurrency_sync.py
@@ -1,7 +1,8 @@
-import asyncio
from unittest import mock
import pytest
+from httpx import HTTPStatusError, Request, Response
+from prefect._vendor.starlette import status
from prefect import flow, task
from prefect.concurrency.asyncio import (
@@ -34,7 +35,7 @@ def resource_heavy():
) as release_spy:
resource_heavy()
- acquire_spy.assert_called_once_with(["test"], 1)
+ acquire_spy.assert_called_once_with(["test"], 1, timeout_seconds=None)
# On release we calculate how many seconds the slots were occupied
# for, so here we really just want to make sure that the value
@@ -168,18 +169,25 @@ def resource_heavy():
@pytest.fixture
-def mock_acquire_concurrency_slots(monkeypatch):
- async def blocks_forever(*args, **kwargs):
- while True:
- await asyncio.sleep(1)
+def mock_increment_concurrency_slots(monkeypatch):
+ async def mocked_increment_concurrency_slots(*args, **kwargs):
+ response = Response(
+ status_code=status.HTTP_423_LOCKED,
+ headers={"Retry-After": "0.01"},
+ )
+ raise HTTPStatusError(
+ message="Locked",
+ request=Request("GET", "http://test.com"),
+ response=response,
+ )
monkeypatch.setattr(
- "prefect.concurrency.sync._acquire_concurrency_slots",
- blocks_forever,
+ "prefect.client.orchestration.PrefectClient.increment_concurrency_slots",
+ mocked_increment_concurrency_slots,
)
-@pytest.mark.usefixtures("concurrency_limit", "mock_acquire_concurrency_slots")
+@pytest.mark.usefixtures("concurrency_limit", "mock_increment_concurrency_slots")
def test_concurrency_respects_timeout():
with pytest.raises(TimeoutError, match=".*timed out after 0.01 second(s)*."):
with concurrency("test", occupy=1, timeout_seconds=0.01):
diff --git a/tests/runner/test_storage.py b/tests/runner/test_storage.py
index 090b4c47cc8b..ec23c9b7ec7a 100644
--- a/tests/runner/test_storage.py
+++ b/tests/runner/test_storage.py
@@ -12,9 +12,10 @@
from prefect.runner.storage import (
BlockStorageAdapter,
GitRepository,
+ LocalStorage,
RemoteStorage,
RunnerStorage,
- create_storage_from_url,
+ create_storage_from_source,
)
from prefect.testing.utilities import AsyncMock, MagicMock
from prefect.utilities.filesystem import tmpchdir
@@ -26,7 +27,7 @@
import pytest
-class TestCreateStorageFromUrl:
+class TestCreateStorageFromSource:
@pytest.mark.parametrize(
"url, expected_type",
[
@@ -35,7 +36,7 @@ class TestCreateStorageFromUrl:
],
)
def test_create_git_storage(self, url, expected_type):
- storage = create_storage_from_url(url)
+ storage = create_storage_from_source(url)
assert isinstance(storage, eval(expected_type))
assert storage.pull_interval == 60 # default value
@@ -47,7 +48,7 @@ def test_create_git_storage(self, url, expected_type):
],
)
def test_create_git_storage_custom_pull_interval(self, url, pull_interval):
- storage = create_storage_from_url(url, pull_interval=pull_interval)
+ storage = create_storage_from_source(url, pull_interval=pull_interval)
assert isinstance(
storage, GitRepository
) # We already know it's GitRepository from above tests
@@ -61,11 +62,29 @@ def test_create_git_storage_custom_pull_interval(self, url, pull_interval):
],
)
def test_alternative_storage_url(self, url):
- storage = create_storage_from_url(url)
+ storage = create_storage_from_source(url)
assert isinstance(storage, RemoteStorage)
assert storage._url == url
assert storage.pull_interval == 60 # default value
+ @pytest.mark.parametrize(
+ "path",
+ [
+ "/path/to/local/flows",
+ "C:\\path\\to\\local\\flows",
+ "file:///path/to/local/flows",
+ "flows", # Relative Path
+ ],
+ )
+ def test_local_storage_path(self, path):
+ storage = create_storage_from_source(path)
+
+ path = path.split("://")[-1] # split from Scheme when present
+
+ assert isinstance(storage, LocalStorage)
+ assert storage._path == Path(path).resolve()
+ assert storage.pull_interval == 60 # default value
+
@pytest.fixture
def mock_run_process(monkeypatch):
@@ -635,6 +654,43 @@ def test_repr(self):
assert repr(rs) == "RemoteStorage(url='s3://bucket/path')"
+class TestLocalStorage:
+ def test_init(self):
+ ls = LocalStorage("/path/to/directory", pull_interval=60)
+ assert ls._path == Path("/path/to/directory")
+ assert ls.pull_interval == 60
+
+ def test_set_base_path(self):
+ locals = LocalStorage("/path/to/directory")
+ path = Path.cwd() / "new_base_path"
+ locals.set_base_path(path)
+ assert locals._storage_base_path == path
+
+ def test_destination(self):
+ locals = LocalStorage("/path/to/directory")
+ assert locals.destination == Path("/path/to/directory")
+
+ def test_to_pull_step(self):
+ locals = LocalStorage("/path/to/directory")
+ pull_step = locals.to_pull_step()
+ assert pull_step == {
+ "prefect.deployments.steps.set_working_directory": {
+ "directory": "/path/to/directory"
+ }
+ }
+
+ def test_eq(self):
+ local1 = LocalStorage(path="/path/to/local/flows")
+ local2 = LocalStorage(path="/path/to/local/flows")
+ local3 = LocalStorage(path="C:\\path\\to\\local\\flows")
+ assert local1 == local2
+ assert local1 != local3
+
+ def test_repr(self):
+ local = LocalStorage(path="/path/to/local/flows")
+ assert repr(local) == "LocalStorage(path=PosixPath('/path/to/local/flows'))"
+
+
class TestBlockStorageAdapter:
@pytest.fixture
async def test_block(self):
diff --git a/tests/server/api/test_workers.py b/tests/server/api/test_workers.py
index 373417d99a9e..4cd3330c220b 100644
--- a/tests/server/api/test_workers.py
+++ b/tests/server/api/test_workers.py
@@ -4,7 +4,7 @@
import pendulum
from prefect._internal.pydantic import HAS_PYDANTIC_V2
-from prefect.server.schemas.statuses import WorkQueueStatus
+from prefect.server.schemas.statuses import DeploymentStatus, WorkQueueStatus
if HAS_PYDANTIC_V2:
import pydantic.v1 as pydantic
@@ -1835,16 +1835,22 @@ async def test_updates_last_polled_on_a_multiple_work_queues(
assert work_queue.last_polled is None
async def test_updates_last_polled_on_a_full_work_pool(
- self, client, work_queues, work_pools
+ self, client, session, work_queues, work_pools
):
+ work_pool = work_pools["wp_a"]
+ work_queues["wq_aa"].status = WorkQueueStatus.NOT_READY
+ work_queues["wq_ab"].status = WorkQueueStatus.PAUSED
+ work_queues["wq_ac"].status = WorkQueueStatus.READY
+ await session.commit()
+
now = pendulum.now("UTC")
poll_response = await client.post(
- f"/work_pools/{work_pools['wp_a'].name}/get_scheduled_flow_runs",
+ f"/work_pools/{work_pool.name}/get_scheduled_flow_runs",
)
assert poll_response.status_code == status.HTTP_200_OK
work_queues_response = await client.post(
- f"/work_pools/{work_pools['wp_a'].name}/queues/filter"
+ f"/work_pools/{work_pool.name}/queues/filter"
)
assert work_queues_response.status_code == status.HTTP_200_OK
@@ -1853,9 +1859,70 @@ async def test_updates_last_polled_on_a_full_work_pool(
)
for work_queue in work_queues:
- assert work_queue.last_polled is not None
+ assert (
+ work_queue.last_polled is not None
+ ), "Work queue should have updated last_polled"
assert work_queue.last_polled > now
+ async def test_updates_statuses_on_a_full_work_pool(
+ self, client, session, work_queues, work_pools, flow
+ ):
+ async def create_deployment_for_work_queue(work_queue_id):
+ return await models.deployments.create_deployment(
+ session=session,
+ deployment=schemas.core.Deployment(
+ name="My Deployment",
+ tags=["test"],
+ flow_id=flow.id,
+ work_queue_id=work_queue_id,
+ ),
+ )
+
+ work_pool = work_pools["wp_a"]
+
+ wq_not_ready = work_queues["wq_aa"]
+ wq_not_ready.status = WorkQueueStatus.NOT_READY
+
+ wq_paused = work_queues["wq_ab"]
+ wq_paused.status = WorkQueueStatus.PAUSED
+
+ wq_ready = work_queues["wq_ac"]
+ wq_ready.status = WorkQueueStatus.READY
+
+ deployments = [
+ await create_deployment_for_work_queue(wq.id)
+ for wq in (wq_not_ready, wq_paused, wq_ready)
+ ]
+
+ await session.commit()
+
+ poll_response = await client.post(
+ f"/work_pools/{work_pool.name}/get_scheduled_flow_runs",
+ )
+ assert poll_response.status_code == status.HTTP_200_OK
+
+ work_queues_response = await client.post(
+ f"/work_pools/{work_pool.name}/queues/filter"
+ )
+ assert work_queues_response.status_code == status.HTTP_200_OK
+
+ work_queues = pydantic.parse_obj_as(
+ List[WorkQueue], work_queues_response.json()
+ )
+
+ for work_queue in work_queues:
+ if work_queue.id == wq_not_ready.id:
+ assert work_queue.status == WorkQueueStatus.READY
+ elif work_queue.id == wq_paused.id:
+ # paused work queues should stay paused
+ assert work_queue.status == WorkQueueStatus.PAUSED
+ elif work_queue.id == wq_ready.id:
+ assert work_queue.status == WorkQueueStatus.READY
+
+ for deployment in deployments:
+ await session.refresh(deployment)
+ assert deployment.status == DeploymentStatus.READY
+
async def test_ensure_deployments_associated_with_work_pool_have_deployment_status_of_ready(
self, client, work_pools, deployment
):
diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py
index c3a470990e5d..c438a0829399 100644
--- a/tests/test_exceptions.py
+++ b/tests/test_exceptions.py
@@ -1,3 +1,5 @@
+from typing import Any, List, Type
+
import cloudpickle
import pytest
@@ -6,24 +8,32 @@
ParameterTypeError,
SignatureMismatchError,
)
-from prefect.pydantic import BaseModel, ValidationError, field_validator
+from prefect.pydantic import (
+ HAS_PYDANTIC_V2,
+ USE_PYDANTIC_V2,
+ BaseModel,
+ ValidationError,
+)
-class ValidationTestModel(BaseModel):
- num: int
- string: str
+def parse_as(T: Type[object], obj: Any) -> Any:
+ if HAS_PYDANTIC_V2 and USE_PYDANTIC_V2:
+ from pydantic import TypeAdapter
- @field_validator("num")
- def must_be_int(cls, n):
- if not isinstance(n, int):
- raise TypeError("must be int")
- return n
+ return TypeAdapter(T).validate_python(obj)
+ elif HAS_PYDANTIC_V2:
+ from pydantic.v1 import parse_obj_as
- @field_validator("string")
- def must_be_str(cls, n):
- if not isinstance(n, str):
- raise TypeError("must be str")
- return n
+ return parse_obj_as(T, obj)
+ else:
+ from pydantic import parse_obj_as
+
+ return parse_obj_as(T, obj)
+
+
+class Foo(BaseModel):
+ num: int
+ string: str
class TestParameterTypeError:
@@ -31,18 +41,37 @@ def test_construction_from_single_validation_error(self):
with pytest.raises(
ValidationError, match=r"validation error.*\s+num\s+.*integer"
):
- ValidationTestModel(**{"num": "not an int", "string": "a string"})
+ Foo(**{"num": "not an int", "string": "a string"})
def test_construction_from_two_validation_errors(self):
with pytest.raises(
ValidationError,
match=r"2 validation errors.*\s+num\s+.*integer.*\s+string\s+.*str",
):
- ValidationTestModel(**{"num": "not an int", "string": [1, 2]})
+ Foo(**{"num": "not an int", "string": [1, 2]})
+
+ def test_construction_with_list_of_model_type_inputs(self):
+ """regression test for https://github.com/PrefectHQ/prefect/issues/14406"""
+
+ errored = False
+
+ class HelloParams(BaseModel):
+ name: str
+
+ try:
+ parse_as(List[HelloParams], [{"name": "rodrigo"}, {}])
+ except ValidationError as exc:
+ errored = True
+ assert len(exc.errors()) == 1
+ parameter_type_error = ParameterTypeError.from_validation_error(exc)
+ assert "1.name" in str(parameter_type_error)
+ assert "field required" in str(parameter_type_error).lower()
+
+ assert errored
def test_pickle_roundtrip_single_error(self):
try:
- ValidationTestModel(**{"num": "not an int", "string": "a string"})
+ Foo(**{"num": "not an int", "string": "a string"})
except Exception as exc:
pte = ParameterTypeError.from_validation_error(exc)
pickled = cloudpickle.dumps(pte)
@@ -52,7 +81,7 @@ def test_pickle_roundtrip_single_error(self):
def test_pickle_roundtrip_two_errors(self):
try:
- ValidationTestModel(**{"num": "not an int", "string": [1, 2]})
+ Foo(**{"num": "not an int", "string": [1, 2]})
except Exception as exc:
pte = ParameterTypeError.from_validation_error(exc)
pickled = cloudpickle.dumps(pte)
diff --git a/tests/test_flows.py b/tests/test_flows.py
index 28986cb29add..e2df2c9ce598 100644
--- a/tests/test_flows.py
+++ b/tests/test_flows.py
@@ -3683,11 +3683,11 @@ def test_loaded_flow_can_be_updated_with_options(self):
assert deployment.storage == storage
async def test_load_flow_from_source_with_url(self, monkeypatch):
- def mock_create_storage_from_url(url):
+ def mock_create_storage_from_source(url):
return MockStorage()
monkeypatch.setattr(
- "prefect.flows.create_storage_from_url", mock_create_storage_from_url
+ "prefect.flows.create_storage_from_source", mock_create_storage_from_source
) # adjust the import path as per your module's name and location
loaded_flow = await Flow.from_source(
diff --git a/tests/utilities/test_importtools.py b/tests/utilities/test_importtools.py
index e397fc9a22e4..36d981ab8fa2 100644
--- a/tests/utilities/test_importtools.py
+++ b/tests/utilities/test_importtools.py
@@ -338,3 +338,26 @@ def test_safe_load_namespace_ignores_code_in_if_name_equals_main_block():
assert "x" in namespace
assert "y" in namespace
assert "z" not in namespace
+
+
+def test_safe_load_namespace_does_not_execute_function_body():
+ """
+ Regression test for https://github.com/PrefectHQ/prefect/issues/14402
+ """
+ source_code = dedent(
+ """
+ you_done_goofed = False
+
+ def my_fn():
+ nonlocal you_done_goofed
+ you_done_goofed = True
+
+ def my_other_fn():
+ foo = my_fn()
+ """
+ )
+
+ # should not raise any errors
+ namespace = safe_load_namespace(source_code)
+
+ assert not namespace["you_done_goofed"]