Skip to content

Commit

Permalink
Merge branch '2.x' into fix-14390
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Jul 8, 2024
2 parents 0776184 + f86bb85 commit 7e5c30a
Show file tree
Hide file tree
Showing 34 changed files with 596 additions and 134 deletions.
29 changes: 29 additions & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
# Prefect Release Notes

## Release 2.19.7

### Fixes
- Fix bug where assignments inside functions are evaluated when running `prefect deploy` - https://github.com/PrefectHQ/prefect/pull/14403
- Allow `uvicorn>0.29.0` - https://github.com/PrefectHQ/prefect/pull/14370

### Documentation
- Add Prefect 3 announcement bar to 2.x docs - https://github.com/PrefectHQ/prefect/pull/14248
- Update Prefect intro in 2.19.x docs - https://github.com/PrefectHQ/prefect/pull/14376
- Update docs edit pencil icon link to route to `2.x` branch in GitHub - https://github.com/PrefectHQ/prefect/pull/14378

**All changes**: https://github.com/PrefectHQ/prefect/compare/2.19.6...2.19.7

## Release 2.19.6

### Enhancements
- Enable deploying from local paths with `Flow.deploy` - https://github.com/PrefectHQ/prefect/pull/13981

### Fixes
- Fix `concurrency` timeout scoping - https://github.com/PrefectHQ/prefect/pull/14183
- Fix deployment and work queue statuses - https://github.com/PrefectHQ/prefect/pull/14097
* Fix resolution of block documents in `job_variables` in a `prefect.yaml` before saving server-side - https://github.com/PrefectHQ/prefect/pull/14156

### Documentation
- Fix typo in Cloud Run V2 worker navigation link - https://github.com/PrefectHQ/prefect/pull/14170


**All changes**: https://github.com/PrefectHQ/prefect/compare/2.19.5...2.19.6

## Release 2.19.5

### Fixes
Expand Down
1 change: 1 addition & 0 deletions docs/guides/host.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Learn how to host your own Prefect server instance.
If you would like to host a Prefect server instance on Kubernetes, check out the prefect-server [Helm chart](https://github.com/PrefectHQ/prefect-helm/tree/main/charts/prefect-server).

After installing Prefect, you have:

- a Python SDK client that can communicate with [Prefect Cloud](https://app.prefect.cloud)
- an [API server](/api-ref/) instance backed by a database and a UI

Expand Down
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ search:

# **Welcome to Prefect**

Prefect is a workflow orchestration tool empowering developers to build, observe, and react to data pipelines.
Prefect is a workflow orchestration framework for building resilient data pipelines in Python.

It's the easiest way to transform any Python function into a unit of work that can be observed and orchestrated. Just bring your Python code, sprinkle in a few decorators, and go!

Expand Down
9 changes: 9 additions & 0 deletions docs/overrides/main.html
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,13 @@

{% block analytics %}
{{ super() }}
{% endblock %}

{% block announce %}
<div style="text-align:center">
<strong>
Prefect 3 is live! See the docs at <a href="https://docs-3.prefect.io" target="_blank"> docs-3.prefect.io </a>
</strong>
</div>

{% endblock %}
6 changes: 5 additions & 1 deletion docs/stylesheets/extra.css
Original file line number Diff line number Diff line change
Expand Up @@ -472,4 +472,8 @@ to force column width */

a.cloud-button:hover {
background-color: rgb(2, 37, 172);
}
}

[data-md-component="announce"] {
display: block !important;
}
4 changes: 2 additions & 2 deletions mkdocs.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
site_name: Prefect Docs
site_url: https://docs.prefect.io/
repo_url: https://github.com/PrefectHQ/prefect
edit_uri: edit/main/docs
edit_uri: edit/2.x/docs
extra_css:
- stylesheets/theme.css
- stylesheets/admonitions.css
Expand Down Expand Up @@ -322,7 +322,7 @@ nav:
- Deployment Steps: integrations/prefect-gcp/deployments/steps.md
- Workers:
- Cloud Run: integrations/prefect-gcp/cloud_run_worker.md
- Cloud Run V2: cintegrations/prefect-gcp/loud_run_worker_v2.md
- Cloud Run V2: integrations/prefect-gcp/cloud_run_worker_v2.md
- Vertex AI: integrations/prefect-gcp/vertex_worker.md
- GitHub:
- integrations/prefect-github/index.md
Expand Down
2 changes: 1 addition & 1 deletion requirements-client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ sniffio >=1.3.0, < 2.0.0
toml >= 0.10.0
typing_extensions >= 4.5.0, < 5.0.0
ujson >= 5.8.0, < 6.0.0
uvicorn >= 0.14.0, < 0.29.0
uvicorn >= 0.14.0, !=0.29.0
websockets >= 10.4, < 13.0

# additional dependencies of starlette, which we're currently vendoring
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ asyncpg >= 0.23
click >= 8.0, < 8.2
cryptography >= 36.0.1
dateparser >= 1.1.1, < 2.0.0
docker >= 4.0, < 7.0
docker >= 4.0
graphviz >= 0.20.1
griffe >= 0.20.0
jinja2 >= 3.0.0, < 4.0.0
Expand Down
20 changes: 18 additions & 2 deletions src/integrations/prefect-aws/prefect_aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from prefect.utilities.filesystem import filter_files

if PYDANTIC_VERSION.startswith("2."):
from pydantic.v1 import Field
from pydantic.v1 import Field, root_validator
else:
from pydantic import Field
from pydantic import Field, root_validator

from prefect_aws import AwsCredentials, MinIOCredentials
from prefect_aws.client_parameters import AwsClientParameters
Expand Down Expand Up @@ -426,6 +426,22 @@ class S3Bucket(WritableFileSystem, WritableDeploymentStorage, ObjectStorageBlock
),
)

@root_validator
def validate_credentials(cls, values):
creds = values["credentials"]
if isinstance(creds, AwsCredentials) and isinstance(
creds.aws_client_parameters, dict
):
# There's an issue with pydantic and nested blocks in a Union
# that is causing `aws_client_parameters` to be a dict in some
# cases instead of an `AwsClientParameters` object. Here we
# convert it to the correct type.
creds.aws_client_parameters = AwsClientParameters(
**creds.aws_client_parameters
)

return values

# Property to maintain compatibility with storage block based deployments
@property
def basepath(self) -> str:
Expand Down
23 changes: 23 additions & 0 deletions src/integrations/prefect-aws/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from pathlib import Path, PurePosixPath, PureWindowsPath

import boto3
import botocore
import botocore.exceptions
import pytest
from botocore.exceptions import ClientError, EndpointConnectionError
from moto import mock_s3
from prefect_aws import AwsCredentials, MinIOCredentials
from prefect_aws.client_parameters import AwsClientParameters
from prefect_aws.credentials import _get_client_cached
from prefect_aws.s3 import (
S3Bucket,
s3_copy,
Expand Down Expand Up @@ -1118,3 +1121,23 @@ def test_move_subpaths(
to_bucket=to_bucket,
)
assert key == expected_path

def test_round_trip_default_credentials(self):
# Regression test for
# https://github.com/PrefectHQ/prefect/issues/14147
block = S3Bucket(
bucket_name="test",
bucket_path="test/weather",
)
block.save(name="default-creds", overwrite=True)

loaded = S3Bucket.load("default-creds")

# Ensure that the client cache is cleared and that we will in fact
# get the broken client instead of one created earlier in this suite.
_get_client_cached.cache_clear()

# Attempt to use the client, which will raise an error, but it should
# be a `NoCredentialsError` instead of an opaque 'unhashable dict' error.
with pytest.raises(botocore.exceptions.NoCredentialsError):
loaded.copy_object("my_folder/notes.txt", "my_folder/notes_copy.txt")
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"""

import io
from distutils.dir_util import copy_tree
import shutil
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional, Tuple, Union
Expand Down Expand Up @@ -197,4 +197,6 @@ async def get_directory(
dst_dir=local_path, src_dir=tmp_dir, sub_directory=from_path
)

copy_tree(src=content_source, dst=content_destination)
shutil.copytree(
src=content_source, dst=content_destination, dirs_exist_ok=True
)
6 changes: 4 additions & 2 deletions src/integrations/prefect-github/prefect_github/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

import io
import shlex
import shutil
from datetime import datetime
from distutils.dir_util import copy_tree
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Dict, Iterable, Optional, Tuple, Union
Expand Down Expand Up @@ -160,7 +160,9 @@ async def get_directory(
dst_dir=local_path, src_dir=tmp_path_str, sub_directory=from_path
)

copy_tree(src=content_source, dst=content_destination)
shutil.copytree(
src=content_source, dst=content_destination, dirs_exist_ok=True
)


@task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@
private_gitlab_block.save()
```
"""

import io
import shutil
import urllib.parse
from distutils.dir_util import copy_tree
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional, Tuple, Union
Expand Down Expand Up @@ -213,4 +214,6 @@ async def get_directory(
dst_dir=local_path, src_dir=tmp_dir, sub_directory=from_path
)

copy_tree(src=content_source, dst=content_destination)
shutil.copytree(
src=content_source, dst=content_destination, dirs_exist_ok=True
)
8 changes: 2 additions & 6 deletions src/prefect/cli/cloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,6 @@ async def serve_login_api(cancel_scope, task_status):
cause = exc.__context__ # Hide the system exit
traceback.print_exception(type(cause), value=cause, tb=cause.__traceback__)
cancel_scope.cancel()
except KeyboardInterrupt:
# `uvicorn.serve` can raise `KeyboardInterrupt` when it's done serving.
cancel_scope.cancel()
else:
# Exit if we are done serving the API
# Uvicorn overrides signal handlers so without this Ctrl-C is broken
Expand Down Expand Up @@ -274,9 +271,8 @@ async def login_with_browser() -> str:
app.console.print("Waiting for response...")
await result_event.wait()

# Uvicorn installs signal handlers, this is the cleanest way to shutdown the
# login API
raise_signal(signal.SIGINT)
# Shut down the background uvicorn server
tg.cancel_scope.cancel()

result = login_api.extra.get("result")
if not result:
Expand Down
6 changes: 5 additions & 1 deletion src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,12 +461,17 @@ async def _run_single_deploy(
push_steps = deploy_config.get("push", actions.get("push")) or []
pull_steps = deploy_config.get("pull", actions.get("pull")) or []

# Don't resolve references in job variables. That will happen when
# preparing for a flow run.
_job_variables = deploy_config["work_pool"].pop("job_variables", {})
deploy_config = await resolve_block_document_references(deploy_config)
deploy_config = await resolve_variables(deploy_config)

# check for env var placeholders early so users can pass work pool names, etc.
deploy_config = apply_values(deploy_config, os.environ, remove_notset=False)

deploy_config["work_pool"]["job_variables"] = _job_variables

if not deploy_config.get("entrypoint"):
if not is_interactive():
raise ValueError(
Expand Down Expand Up @@ -676,7 +681,6 @@ async def _run_single_deploy(
deploy_config_before_templating = deepcopy(deploy_config)
## apply templating from build and push steps to the final deployment spec
_parameter_schema = deploy_config.pop("parameter_openapi_schema")

_schedules = deploy_config.pop("schedules")

deploy_config = apply_values(deploy_config, step_outputs)
Expand Down
1 change: 1 addition & 0 deletions src/prefect/cli/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ async def start(
server_env["PREFECT_SERVER_ANALYTICS_ENABLED"] = str(analytics)
server_env["PREFECT_API_SERVICES_LATE_RUNS_ENABLED"] = str(late_runs)
server_env["PREFECT_API_SERVICES_UI"] = str(ui)
server_env["PREFECT_UI_ENABLED"] = str(ui)
server_env["PREFECT_LOGGING_SERVER_LEVEL"] = log_level

base_url = f"http://{host}:{port}"
Expand Down
18 changes: 14 additions & 4 deletions src/prefect/concurrency/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

from prefect import get_client
from prefect.client.schemas.responses import MinimalConcurrencyLimitResponse
from prefect.utilities.timeout import timeout_async

from .events import (
_emit_concurrency_acquisition_events,
Expand All @@ -26,6 +25,10 @@ class ConcurrencySlotAcquisitionError(Exception):
"""Raised when an unhandlable occurs while acquiring concurrency slots."""


class AcquireConcurrencySlotTimeoutError(TimeoutError):
"""Raised when acquiring a concurrency slot times out."""


@asynccontextmanager
async def concurrency(
names: Union[str, List[str]],
Expand Down Expand Up @@ -58,8 +61,9 @@ async def main():
```
"""
names = names if isinstance(names, list) else [names]
with timeout_async(seconds=timeout_seconds):
limits = await _acquire_concurrency_slots(names, occupy)
limits = await _acquire_concurrency_slots(
names, occupy, timeout_seconds=timeout_seconds
)
acquisition_time = pendulum.now("UTC")
emitted_events = _emit_concurrency_acquisition_events(limits, occupy)

Expand Down Expand Up @@ -91,12 +95,18 @@ async def _acquire_concurrency_slots(
names: List[str],
slots: int,
mode: Union[Literal["concurrency"], Literal["rate_limit"]] = "concurrency",
timeout_seconds: Optional[float] = None,
) -> List[MinimalConcurrencyLimitResponse]:
service = ConcurrencySlotAcquisitionService.instance(frozenset(names))
future = service.send((slots, mode))
future = service.send((slots, mode, timeout_seconds))
response_or_exception = await asyncio.wrap_future(future)

if isinstance(response_or_exception, Exception):
if isinstance(response_or_exception, TimeoutError):
raise AcquireConcurrencySlotTimeoutError(
f"Attempt to acquire concurrency slots timed out after {timeout_seconds} second(s)"
) from response_or_exception

raise ConcurrencySlotAcquisitionError(
f"Unable to acquire concurrency slots on {names!r}"
) from response_or_exception
Expand Down
Loading

0 comments on commit 7e5c30a

Please sign in to comment.