From e530ca3285764accaef9360ba17e84610cb7e0cf Mon Sep 17 00:00:00 2001 From: psofiterol Date: Thu, 27 Jul 2023 21:56:09 +0300 Subject: [PATCH] Make all calls to pendulum.now() explicitly in "UTC" (#10320) --- .../_internal/compatibility/deprecated.py | 4 ++-- src/prefect/client/orchestration.py | 2 +- src/prefect/events/utilities.py | 2 +- src/prefect/server/api/concurrency_limits.py | 2 +- src/prefect/server/api/deployments.py | 2 +- src/prefect/server/api/flow_runs.py | 4 ++-- src/prefect/server/api/saved_searches.py | 2 +- src/prefect/server/api/task_runs.py | 2 +- src/prefect/server/database/orm_models.py | 2 +- .../server/orchestration/core_policy.py | 2 +- src/prefect/server/services/scheduler.py | 2 +- src/prefect/server/services/telemetry.py | 2 +- tests/agent/test_agent_run_submission.py | 12 +++++----- tests/cli/deployment/test_deployment_run.py | 2 +- tests/events/test_event_schemas.py | 2 +- tests/events/test_events_emit_event.py | 2 +- tests/server/api/test_deployments.py | 22 +++++++++---------- tests/server/api/test_flow_runs.py | 20 +++++++++-------- tests/server/api/test_task_runs.py | 16 ++++++++------ tests/server/api/test_work_queues.py | 4 ++-- tests/server/api/test_workers.py | 18 +++++++-------- tests/server/database/test_queries.py | 4 ++-- tests/server/models/test_filters.py | 2 +- tests/server/models/test_flow_run_states.py | 6 ++--- tests/server/models/test_flow_runs.py | 8 +++---- tests/server/models/test_orm.py | 20 ++++++++--------- tests/server/models/test_task_run_states.py | 6 ++--- tests/server/models/test_task_runs.py | 4 ++-- tests/server/models/test_work_queues.py | 2 +- tests/server/models/test_workers.py | 6 ++--- .../server/orchestration/test_core_policy.py | 18 +++++++-------- .../orchestration/test_global_policy.py | 18 +++++++-------- tests/server/orchestration/test_rules.py | 2 +- tests/server/schemas/test_filters.py | 2 +- tests/server/services/test_late_runs.py | 8 +++---- tests/server/services/test_scheduler.py | 10 ++++----- tests/server/utilities/test_schemas.py | 10 ++++----- tests/test_deployments.py | 4 ++-- 38 files changed, 130 insertions(+), 126 deletions(-) diff --git a/src/prefect/_internal/compatibility/deprecated.py b/src/prefect/_internal/compatibility/deprecated.py index e26962ca6d73..bad4a429e651 100644 --- a/src/prefect/_internal/compatibility/deprecated.py +++ b/src/prefect/_internal/compatibility/deprecated.py @@ -55,8 +55,8 @@ def generate_deprecation_message( ): if not start_date and not end_date: raise ValueError( - "A start date is required if an end date is not provided. " - f"Suggested start date is {pendulum.now().format(DEPRECATED_DATEFMT)!r}" + "A start date is required if an end date is not provided. Suggested start" + f" date is {pendulum.now('UTC').format(DEPRECATED_DATEFMT)!r}" ) if not end_date: diff --git a/src/prefect/client/orchestration.py b/src/prefect/client/orchestration.py index aed309ee54c8..6d3ef3451110 100644 --- a/src/prefect/client/orchestration.py +++ b/src/prefect/client/orchestration.py @@ -979,7 +979,7 @@ async def get_runs_in_work_queue( List[FlowRun]: a list of FlowRun objects read from the queue """ if scheduled_before is None: - scheduled_before = pendulum.now() + scheduled_before = pendulum.now("UTC") try: response = await self._client.post( diff --git a/src/prefect/events/utilities.py b/src/prefect/events/utilities.py index fee21f21ee5a..d00db4456cdc 100644 --- a/src/prefect/events/utilities.py +++ b/src/prefect/events/utilities.py @@ -54,7 +54,7 @@ def emit_event( } if occurred is None: - occurred = pendulum.now() + occurred = pendulum.now("UTC") event_kwargs["occurred"] = occurred if related is not None: diff --git a/src/prefect/server/api/concurrency_limits.py b/src/prefect/server/api/concurrency_limits.py index 27ca4f9c0d58..bf134b83bff8 100644 --- a/src/prefect/server/api/concurrency_limits.py +++ b/src/prefect/server/api/concurrency_limits.py @@ -31,7 +31,7 @@ async def create_concurrency_limit( session=session, concurrency_limit=concurrency_limit_model ) - if model.created >= pendulum.now(): + if model.created >= pendulum.now("UTC"): response.status_code = status.HTTP_201_CREATED return model diff --git a/src/prefect/server/api/deployments.py b/src/prefect/server/api/deployments.py index 1d67adb10487..edca6586fa20 100644 --- a/src/prefect/server/api/deployments.py +++ b/src/prefect/server/api/deployments.py @@ -130,7 +130,7 @@ async def create_deployment( ), ) - now = pendulum.now() + now = pendulum.now("UTC") model = await models.deployments.create_deployment( session=session, deployment=deployment ) diff --git a/src/prefect/server/api/flow_runs.py b/src/prefect/server/api/flow_runs.py index 42d389fc5e67..150ef3604789 100644 --- a/src/prefect/server/api/flow_runs.py +++ b/src/prefect/server/api/flow_runs.py @@ -263,7 +263,7 @@ async def resume_flow_run( """ Resume a paused flow run. """ - now = pendulum.now() + now = pendulum.now("UTC") async with db.session_context(begin_transaction=True) as session: flow_run = await models.flow_runs.read_flow_run(session, flow_run_id) @@ -394,7 +394,7 @@ async def set_flow_run_state( # pass the request version to the orchestration engine to support compatibility code orchestration_parameters.update({"api-version": api_version}) - now = pendulum.now() + now = pendulum.now("UTC") # create the state async with db.session_context( diff --git a/src/prefect/server/api/saved_searches.py b/src/prefect/server/api/saved_searches.py index 397e67335225..22dd5357b0b3 100644 --- a/src/prefect/server/api/saved_searches.py +++ b/src/prefect/server/api/saved_searches.py @@ -33,7 +33,7 @@ async def create_saved_search( # hydrate the input model into a full model saved_search = schemas.core.SavedSearch(**saved_search.dict()) - now = pendulum.now() + now = pendulum.now("UTC") async with db.session_context(begin_transaction=True) as session: model = await models.saved_searches.create_saved_search( diff --git a/src/prefect/server/api/task_runs.py b/src/prefect/server/api/task_runs.py index f332d9736fcf..a54436050f69 100644 --- a/src/prefect/server/api/task_runs.py +++ b/src/prefect/server/api/task_runs.py @@ -220,7 +220,7 @@ async def set_task_run_state( ) -> OrchestrationResult: """Set a task run state, invoking any orchestration rules.""" - now = pendulum.now() + now = pendulum.now("UTC") # create the state async with db.session_context( diff --git a/src/prefect/server/database/orm_models.py b/src/prefect/server/database/orm_models.py index fe9818ded423..60b76206b7e1 100644 --- a/src/prefect/server/database/orm_models.py +++ b/src/prefect/server/database/orm_models.py @@ -414,7 +414,7 @@ def estimated_run_time(self): state is exited. To give up-to-date estimates, we estimate incremental run time for any runs currently in a RUNNING state.""" if self.state_type and self.state_type == schemas.states.StateType.RUNNING: - return self.total_run_time + (pendulum.now() - self.state_timestamp) + return self.total_run_time + (pendulum.now("UTC") - self.state_timestamp) else: return self.total_run_time diff --git a/src/prefect/server/orchestration/core_policy.py b/src/prefect/server/orchestration/core_policy.py index d2fd1b665c80..139570f5cc4e 100644 --- a/src/prefect/server/orchestration/core_policy.py +++ b/src/prefect/server/orchestration/core_policy.py @@ -454,7 +454,7 @@ async def before_transition( # At this moment, we round delay to the nearest second as the API schema # specifies an integer return value. - delay = scheduled_time - pendulum.now() + delay = scheduled_time - pendulum.now("UTC") delay_seconds = delay.in_seconds() delay_seconds += round(delay.microseconds / 1e6) if delay_seconds > 0: diff --git a/src/prefect/server/services/scheduler.py b/src/prefect/server/services/scheduler.py index 83140862e1be..e1b94b747b84 100644 --- a/src/prefect/server/services/scheduler.py +++ b/src/prefect/server/services/scheduler.py @@ -310,7 +310,7 @@ def _get_select_deployments_to_schedule_query(self, db: PrefectDBInterface): # second to run). Scheduling is idempotent so picking up schedules # multiple times is not a concern. db.Deployment.updated - >= pendulum.now().subtract(seconds=self.loop_seconds + 1), + >= pendulum.now("UTC").subtract(seconds=self.loop_seconds + 1), ) .order_by(db.Deployment.id) .limit(self.deployment_batch_size) diff --git a/src/prefect/server/services/telemetry.py b/src/prefect/server/services/telemetry.py index 48177007a926..329eb2f9afd1 100644 --- a/src/prefect/server/services/telemetry.py +++ b/src/prefect/server/services/telemetry.py @@ -50,7 +50,7 @@ async def _fetch_or_set_telemetry_session(self, db: PrefectDBInterface): if telemetry_session is None: self.logger.debug("No telemetry session found, setting") session_id = str(uuid4()) - session_start_timestamp = pendulum.now().to_iso8601_string() + session_start_timestamp = pendulum.now("UTC").to_iso8601_string() telemetry_session = Configuration( key="TELEMETRY_SESSION", diff --git a/tests/agent/test_agent_run_submission.py b/tests/agent/test_agent_run_submission.py index 5a0d3fd548d7..50f1335c3751 100644 --- a/tests/agent/test_agent_run_submission.py +++ b/tests/agent/test_agent_run_submission.py @@ -89,7 +89,7 @@ def create_run_with_deployment(state): deployment.work_queue_name ) work_queue_runs = await prefect_client.get_runs_in_work_queue( - work_queue.id, scheduled_before=pendulum.now().add(seconds=10) + work_queue.id, scheduled_before=pendulum.now("UTC").add(seconds=10) ) work_queue_flow_run_ids = {run.id for run in work_queue_runs} @@ -142,7 +142,7 @@ def create_run_with_deployment(state): deployment.work_queue_name ) work_queue_runs = await prefect_client.get_runs_in_work_queue( - work_queue.id, scheduled_before=pendulum.now().add(seconds=10) + work_queue.id, scheduled_before=pendulum.now("UTC").add(seconds=10) ) work_queue_runs.sort(key=lambda run: run.next_scheduled_start_time) work_queue_flow_run_ids = [run.id for run in work_queue_runs] @@ -921,7 +921,7 @@ async def test_agent_displays_message_on_work_queue_pause( await prefect_client.update_work_queue(work_queue.id, is_paused=True) # clear agent cache - agent._work_queue_cache_expiration = pendulum.now() + agent._work_queue_cache_expiration = pendulum.now("UTC") # Should emit the paused message await agent.get_and_submit_flow_runs() @@ -971,7 +971,7 @@ def create_run_with_deployment(state): responses = await prefect_client.get_scheduled_flow_runs_for_work_pool( work_pool_name=work_pool.name, work_queue_names=[work_queue_1.name], - scheduled_before=pendulum.now().add(seconds=10), + scheduled_before=pendulum.now("UTC").add(seconds=10), ) work_queue_flow_run_ids = {response.flow_run.id for response in responses} @@ -1036,7 +1036,7 @@ def create_run_with_deployment(state): responses = await prefect_client.get_scheduled_flow_runs_for_work_pool( work_pool_name=work_pool.name, work_queue_names=[work_queue.name], - scheduled_before=pendulum.now().add(seconds=10), + scheduled_before=pendulum.now("UTC").add(seconds=10), ) work_queue_flow_run_ids = {response.flow_run.id for response in responses} @@ -1100,7 +1100,7 @@ def create_run_with_deployment(state): responses = await prefect_client.get_scheduled_flow_runs_for_work_pool( work_pool_name=work_pool.name, work_queue_names=[work_queue.name], - scheduled_before=pendulum.now().add(seconds=10), + scheduled_before=pendulum.now("UTC").add(seconds=10), ) work_queue_flow_run_ids = {response.flow_run.id for response in responses} diff --git a/tests/cli/deployment/test_deployment_run.py b/tests/cli/deployment/test_deployment_run.py index a08abf360067..ae414c4a8c29 100644 --- a/tests/cli/deployment/test_deployment_run.py +++ b/tests/cli/deployment/test_deployment_run.py @@ -16,7 +16,7 @@ async def deployment_name(deployment, prefect_client): @pytest.fixture def frozen_now(monkeypatch): - now = pendulum.now() + now = pendulum.now("UTC") monkeypatch.setattr("pendulum.now", lambda *_: now) yield now diff --git a/tests/events/test_event_schemas.py b/tests/events/test_event_schemas.py index 5c90d364fc8f..3f2d8cd6a11e 100644 --- a/tests/events/test_event_schemas.py +++ b/tests/events/test_event_schemas.py @@ -30,7 +30,7 @@ def test_client_events_generate_an_id_by_default(): def test_client_events_generate_occurred_by_default(start_of_test: DateTime): event = Event(event="hello", resource={"prefect.resource.id": "hello"}) - assert start_of_test <= event.occurred <= pendulum.now() + assert start_of_test <= event.occurred <= pendulum.now("UTC") def test_client_events_may_have_empty_related_resources(): diff --git a/tests/events/test_events_emit_event.py b/tests/events/test_events_emit_event.py index 04a260dc55e6..f29f3ac4deb6 100644 --- a/tests/events/test_events_emit_event.py +++ b/tests/events/test_events_emit_event.py @@ -92,7 +92,7 @@ def test_does_not_set_follows_not_tight_timing( ): destroyed_event = emit_event( event="planet.destroyed", - occurred=pendulum.now() - timedelta(minutes=10), + occurred=pendulum.now("UTC") - timedelta(minutes=10), resource={"prefect.resource.id": "milky-way.sol.earth"}, ) diff --git a/tests/server/api/test_deployments.py b/tests/server/api/test_deployments.py index 85231600b141..91637afbb3cf 100644 --- a/tests/server/api/test_deployments.py +++ b/tests/server/api/test_deployments.py @@ -245,7 +245,7 @@ async def test_upserting_deployment_with_inactive_schedule_deletes_existing_auto flow_id=deployment.flow_id, deployment_id=deployment.id, state=schemas.states.Scheduled( - scheduled_time=pendulum.now().add(days=1) + scheduled_time=pendulum.now("UTC").add(days=1) ), ), ) @@ -286,7 +286,7 @@ async def test_upserting_deployment_with_new_schedule_deletes_existing_auto_sche flow_id=deployment.flow_id, deployment_id=deployment.id, state=schemas.states.Scheduled( - scheduled_time=pendulum.now().add(seconds=2) + scheduled_time=pendulum.now("UTC").add(seconds=2) ), ), ) @@ -313,7 +313,7 @@ async def test_upserting_deployment_with_new_schedule_deletes_existing_auto_sche # check that the maximum run is from the secondly schedule query = sa.select(sa.func.max(db.FlowRun.expected_start_time)) result = await session.execute(query) - assert result.scalar() < pendulum.now().add(seconds=100) + assert result.scalar() < pendulum.now("UTC").add(seconds=100) async def test_create_deployment_throws_useful_error_on_missing_blocks( self, @@ -1091,7 +1091,7 @@ async def test_set_schedule_inactive_deletes_auto_scheduled_runs( flow_id=deployment.flow_id, deployment_id=deployment.id, state=schemas.states.Scheduled( - scheduled_time=pendulum.now().add(days=1) + scheduled_time=pendulum.now("UTC").add(days=1) ), ), ) @@ -1113,8 +1113,8 @@ async def test_schedule_deployment(self, client, session, deployment): runs = await models.flow_runs.read_flow_runs(session) expected_dates = await deployment.schedule.get_dates( n=PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS.value(), - start=pendulum.now(), - end=pendulum.now() + start=pendulum.now("UTC"), + end=pendulum.now("UTC") + PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value(), ) actual_dates = {r.state.state_details.scheduled_time for r in runs} @@ -1131,8 +1131,8 @@ async def test_schedule_deployment_provide_runs(self, client, session, deploymen runs = await models.flow_runs.read_flow_runs(session) expected_dates = await deployment.schedule.get_dates( n=5, - start=pendulum.now(), - end=pendulum.now() + start=pendulum.now("UTC"), + end=pendulum.now("UTC") + PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value(), ) actual_dates = {r.state.state_details.scheduled_time for r in runs} @@ -1144,14 +1144,14 @@ async def test_schedule_deployment_start_time(self, client, session, deployment) await client.post( f"/deployments/{deployment.id}/schedule", - json=dict(start_time=str(pendulum.now().add(days=120))), + json=dict(start_time=str(pendulum.now("UTC").add(days=120))), ) runs = await models.flow_runs.read_flow_runs(session) expected_dates = await deployment.schedule.get_dates( n=PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS.value(), - start=pendulum.now().add(days=120), - end=pendulum.now().add(days=120) + start=pendulum.now("UTC").add(days=120), + end=pendulum.now("UTC").add(days=120) + PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value(), ) actual_dates = {r.state.state_details.scheduled_time for r in runs} diff --git a/tests/server/api/test_flow_runs.py b/tests/server/api/test_flow_runs.py index 791849dc4087..10a96a0a1895 100644 --- a/tests/server/api/test_flow_runs.py +++ b/tests/server/api/test_flow_runs.py @@ -58,7 +58,7 @@ async def test_create_flow_run_with_state_sets_timestamp_on_server( "/flow_runs/", json=actions.FlowRunCreate( flow_id=flow.id, - state=states.Completed(timestamp=pendulum.now().add(months=1)), + state=states.Completed(timestamp=pendulum.now("UTC").add(months=1)), ).dict(json_compatible=True), ) assert response.status_code == status.HTTP_201_CREATED @@ -67,7 +67,7 @@ async def test_create_flow_run_with_state_sets_timestamp_on_server( session=session, flow_run_id=response.json()["id"] ) # the timestamp was overwritten - assert flow_run.state.timestamp < pendulum.now() + assert flow_run.state.timestamp < pendulum.now("UTC") async def test_create_flow_run_without_state_yields_default_pending( self, flow, client, session @@ -534,7 +534,7 @@ async def test_read_flow_runs_returns_empty_list(self, client): assert response.json() == [] async def test_read_flow_runs_applies_sort(self, session, flow, client): - now = pendulum.now() + now = pendulum.now("UTC") flow_run_1 = await models.flow_runs.create_flow_run( session=session, flow_run=schemas.core.FlowRun( @@ -867,13 +867,15 @@ async def test_set_flow_run_state_ignores_client_provided_timestamp( state=dict( type="RUNNING", name="Test State", - timestamp=str(pendulum.now().add(months=1)), + timestamp=str(pendulum.now("UTC").add(months=1)), ) ), ) assert response.status_code == status.HTTP_201_CREATED state = schemas.states.State.parse_obj(response.json()["state"]) - assert state.timestamp < pendulum.now(), "The timestamp should be overwritten" + assert state.timestamp < pendulum.now( + "UTC" + ), "The timestamp should be overwritten" async def test_set_flow_run_state_force_skips_orchestration( self, flow_run, client, session @@ -885,7 +887,7 @@ async def test_set_flow_run_state_force_skips_orchestration( type=StateType.SCHEDULED, name="Scheduled", state_details=dict( - scheduled_time=str(pendulum.now().add(months=1)) + scheduled_time=str(pendulum.now("UTC").add(months=1)) ), ) ), @@ -1056,8 +1058,8 @@ async def test_history_interval_must_be_one_second_or_larger(self, client): response = await client.post( "/flow_runs/history", json=dict( - history_start=str(pendulum.now()), - history_end=str(pendulum.now().add(days=1)), + history_start=str(pendulum.now("UTC")), + history_end=str(pendulum.now("UTC").add(days=1)), history_interval_seconds=0.9, ), ) @@ -1074,7 +1076,7 @@ def url(self) -> str: async def late_flow_runs(self, session, flow): flow_runs = [] for i in range(5): - one_minute_ago = pendulum.now().subtract(minutes=1) + one_minute_ago = pendulum.now("UTC").subtract(minutes=1) flow_run = await models.flow_runs.create_flow_run( session=session, flow_run=schemas.core.FlowRun( diff --git a/tests/server/api/test_task_runs.py b/tests/server/api/test_task_runs.py index c90751c8c779..5e2b007ad412 100644 --- a/tests/server/api/test_task_runs.py +++ b/tests/server/api/test_task_runs.py @@ -79,7 +79,7 @@ async def test_create_task_run_with_state_ignores_client_provided_timestamp( task_key="a", state=schemas.actions.StateCreate( type=schemas.states.StateType.COMPLETED, - timestamp=pendulum.now().add(months=1), + timestamp=pendulum.now("UTC").add(months=1), ), dynamic_key="0", ).dict(json_compatible=True), @@ -90,7 +90,7 @@ async def test_create_task_run_with_state_ignores_client_provided_timestamp( session=session, task_run_id=response.json()["id"] ) # the timestamp was overwritten - assert task_run.state.timestamp < pendulum.now() + assert task_run.state.timestamp < pendulum.now("UTC") async def test_raises_on_retry_delay_validation(self, flow_run, client, session): task_run_data = { @@ -221,7 +221,7 @@ async def test_read_task_runs_applies_flow_filter(self, flow, task_run, client): assert response.json() == [] async def test_read_task_runs_applies_sort(self, flow_run, session, client): - now = pendulum.now() + now = pendulum.now("UTC") task_run_1 = await models.task_runs.create_task_run( session=session, task_run=schemas.core.TaskRun( @@ -386,13 +386,15 @@ async def test_set_task_run_ignores_client_provided_timestamp( state=dict( type="RUNNING", name="Test State", - timestamp=str(pendulum.now().add(months=1)), + timestamp=str(pendulum.now("UTC").add(months=1)), ) ), ) assert response.status_code == status.HTTP_201_CREATED state = schemas.states.State.parse_obj(response.json()["state"]) - assert state.timestamp < pendulum.now(), "The timestamp should be overwritten" + assert state.timestamp < pendulum.now( + "UTC" + ), "The timestamp should be overwritten" async def test_failed_becomes_awaiting_retry(self, task_run, client, session): # set max retries to 1 @@ -470,8 +472,8 @@ async def test_history_interval_must_be_one_second_or_larger(self, client): response = await client.post( "/task_runs/history", json=dict( - history_start=str(pendulum.now()), - history_end=str(pendulum.now().add(days=1)), + history_start=str(pendulum.now("UTC")), + history_end=str(pendulum.now("UTC").add(days=1)), history_interval_seconds=0.9, ), ) diff --git a/tests/server/api/test_work_queues.py b/tests/server/api/test_work_queues.py index 7cf80eb88308..e33032300ffd 100644 --- a/tests/server/api/test_work_queues.py +++ b/tests/server/api/test_work_queues.py @@ -353,7 +353,7 @@ async def test_get_runs_in_queue_scheduled_before( ): response1 = await client.post( f"/work_queues/{work_queue.id}/get_runs", - json=dict(scheduled_before=pendulum.now().isoformat()), + json=dict(scheduled_before=pendulum.now("UTC").isoformat()), ) runs_wq1 = pydantic.parse_obj_as( List[schemas.responses.FlowRunResponse], response1.json() @@ -550,7 +550,7 @@ async def work_queue_with_late_runs(self, session, flow, work_pool): flow_run=schemas.core.FlowRun( flow_id=flow.id, state=schemas.states.Late( - scheduled_time=pendulum.now().subtract(minutes=60) + scheduled_time=pendulum.now("UTC").subtract(minutes=60) ), work_queue_id=work_queue.id, ), diff --git a/tests/server/api/test_workers.py b/tests/server/api/test_workers.py index 7209b79c5b67..7e4348daa67b 100644 --- a/tests/server/api/test_workers.py +++ b/tests/server/api/test_workers.py @@ -597,7 +597,7 @@ async def test_heartbeat_worker(self, client, work_pool): assert workers_response.status_code == status.HTTP_200_OK assert len(workers_response.json()) == 0 - dt = pendulum.now() + dt = pendulum.now("UTC") response = await client.post( f"/work_pools/{work_pool.name}/workers/heartbeat", json=dict(name="test-worker"), @@ -748,7 +748,7 @@ async def setup(self, session, flow): flow_run=schemas.core.FlowRun( flow_id=flow.id, state=prefect.states.Scheduled( - scheduled_time=pendulum.now().add(hours=i) + scheduled_time=pendulum.now("UTC").add(hours=i) ), work_queue_id=wq.id, ), @@ -841,7 +841,7 @@ async def test_get_all_runs_wq_ba_wrong_pool(self, client, work_pools, work_queu async def test_get_all_runs_scheduled_before(self, client, work_pools, work_queues): response = await client.post( f"/work_pools/{work_pools['wp_a'].name}/get_scheduled_flow_runs", - json=dict(scheduled_before=str(pendulum.now())), + json=dict(scheduled_before=str(pendulum.now("UTC"))), ) data = pydantic.parse_obj_as( @@ -852,7 +852,7 @@ async def test_get_all_runs_scheduled_before(self, client, work_pools, work_queu async def test_get_all_runs_scheduled_after(self, client, work_pools): response = await client.post( f"/work_pools/{work_pools['wp_a'].name}/get_scheduled_flow_runs", - json=dict(scheduled_after=str(pendulum.now())), + json=dict(scheduled_after=str(pendulum.now("UTC"))), ) data = pydantic.parse_obj_as( @@ -864,8 +864,8 @@ async def test_get_all_runs_scheduled_before_and_after(self, client, work_pools) response = await client.post( f"/work_pools/{work_pools['wp_a'].name}/get_scheduled_flow_runs", json=dict( - scheduled_before=str(pendulum.now().subtract(hours=1)), - scheduled_after=str(pendulum.now()), + scheduled_before=str(pendulum.now("UTC").subtract(hours=1)), + scheduled_after=str(pendulum.now("UTC")), ), ) @@ -877,7 +877,7 @@ async def test_get_all_runs_scheduled_before_and_after(self, client, work_pools) async def test_updates_last_polled_on_a_single_work_queue( self, client, work_queues, work_pools ): - now = pendulum.now() + now = pendulum.now("UTC") poll_response = await client.post( f"/work_pools/{work_pools['wp_a'].name}/get_scheduled_flow_runs", json=dict(work_queue_names=[work_queues["wq_aa"].name]), @@ -909,7 +909,7 @@ async def test_updates_last_polled_on_a_single_work_queue( async def test_updates_last_polled_on_a_multiple_work_queues( self, client, work_queues, work_pools ): - now = pendulum.now() + now = pendulum.now("UTC") poll_response = await client.post( f"/work_pools/{work_pools['wp_a'].name}/get_scheduled_flow_runs", json=dict( @@ -940,7 +940,7 @@ async def test_updates_last_polled_on_a_multiple_work_queues( async def test_updates_last_polled_on_a_full_work_pool( self, client, work_queues, work_pools ): - now = pendulum.now() + now = pendulum.now("UTC") poll_response = await client.post( f"/work_pools/{work_pools['wp_a'].name}/get_scheduled_flow_runs", ) diff --git a/tests/server/database/test_queries.py b/tests/server/database/test_queries.py index abf2aeff16ec..870d3a1af0fd 100644 --- a/tests/server/database/test_queries.py +++ b/tests/server/database/test_queries.py @@ -167,7 +167,7 @@ async def test_get_runs_in_queue_scheduled_before( self, session, db, fr_1, fr_2, fr_3 ): query = db.queries.get_scheduled_flow_runs_from_work_queues( - db=db, scheduled_before=pendulum.now().subtract(seconds=90) + db=db, scheduled_before=pendulum.now("UTC").subtract(seconds=90) ) result = await session.execute(query) runs = result.all() @@ -326,7 +326,7 @@ async def setup(self, session, flow): flow_run=schemas.core.FlowRun( flow_id=flow.id, state=prefect.states.Scheduled( - scheduled_time=pendulum.now().add(hours=i) + scheduled_time=pendulum.now("UTC").add(hours=i) ), work_queue_id=wq.id, ), diff --git a/tests/server/models/test_filters.py b/tests/server/models/test_filters.py index 2b15d660f482..e6360b840cb0 100644 --- a/tests/server/models/test_filters.py +++ b/tests/server/models/test_filters.py @@ -193,7 +193,7 @@ def create_task_run(task_run): flow_run=core.FlowRun( flow_id=f_3.id, tags=["db", "red"], - state=states.Scheduled(scheduled_time=pendulum.now()), + state=states.Scheduled(scheduled_time=pendulum.now("UTC")), ) ) diff --git a/tests/server/models/test_flow_run_states.py b/tests/server/models/test_flow_run_states.py index 7847ab863d4a..d1f37fdcfe0e 100644 --- a/tests/server/models/test_flow_run_states.py +++ b/tests/server/models/test_flow_run_states.py @@ -198,7 +198,7 @@ async def test_database_is_not_updated_when_no_transition_takes_place( frs = await models.flow_runs.set_flow_run_state( session=session, flow_run_id=flow_run.id, - state=Scheduled(scheduled_time=pendulum.now().add(months=1)), + state=Scheduled(scheduled_time=pendulum.now("UTC").add(months=1)), flow_policy=await provide_flow_policy(), ) @@ -225,7 +225,7 @@ def priority(): frs = await models.flow_runs.set_flow_run_state( session=session, flow_run_id=flow_run.id, - state=Scheduled(scheduled_time=pendulum.now().add(months=1)), + state=Scheduled(scheduled_time=pendulum.now("UTC").add(months=1)), flow_policy=await provide_flow_policy(), ) @@ -261,7 +261,7 @@ def priority(): frs = await models.flow_runs.set_flow_run_state( session=session, flow_run_id=flow_run.id, - state=Scheduled(scheduled_time=pendulum.now().add(months=1)), + state=Scheduled(scheduled_time=pendulum.now("UTC").add(months=1)), flow_policy=await provide_flow_policy(), orchestration_parameters=await provide_flow_orchestration_parameters(), ) diff --git a/tests/server/models/test_flow_runs.py b/tests/server/models/test_flow_runs.py index 8f73fa08caa9..d937ecbbc6a8 100644 --- a/tests/server/models/test_flow_runs.py +++ b/tests/server/models/test_flow_runs.py @@ -549,7 +549,7 @@ async def test_read_flow_runs_filters_by_flow_versions_any(self, flow, session): assert len(result) == 0 async def test_read_flow_runs_filters_by_start_time(self, flow, session): - now = pendulum.now() + now = pendulum.now("UTC") flow_run_1 = await models.flow_runs.create_flow_run( session=session, flow_run=schemas.core.FlowRun( @@ -648,7 +648,7 @@ async def test_read_flow_runs_filters_by_start_time(self, flow, session): async def test_read_flow_runs_filters_by_next_scheduled_start_time( self, flow, session ): - now = pendulum.now() + now = pendulum.now("UTC") flow_run_1 = await models.flow_runs.create_flow_run( session=session, flow_run=schemas.core.FlowRun( @@ -717,7 +717,7 @@ async def test_read_flow_runs_filters_by_next_scheduled_start_time( assert {res.id for res in result} == {flow_run_3.id} async def test_read_flow_runs_filters_by_expected_start_time(self, flow, session): - now = pendulum.now() + now = pendulum.now("UTC") flow_run_1 = await models.flow_runs.create_flow_run( session=session, flow_run=schemas.core.FlowRun( @@ -1069,7 +1069,7 @@ async def test_read_flow_runs_filters_by_work_queue_id(self, session, flow): assert {res.id for res in result} == {flow_run_2.id} async def test_read_flow_runs_applies_sort(self, flow, session): - now = pendulum.now() + now = pendulum.now("UTC") await models.flow_runs.create_flow_run( session=session, flow_run=schemas.core.FlowRun( diff --git a/tests/server/models/test_orm.py b/tests/server/models/test_orm.py index 493547bb2657..ea4712ebed1c 100644 --- a/tests/server/models/test_orm.py +++ b/tests/server/models/test_orm.py @@ -387,7 +387,7 @@ class TestTotalRunTimeEstimate: async def test_flow_run_estimated_run_time_matches_total_run_time( self, session, flow, db ): - dt = pendulum.now().subtract(minutes=1) + dt = pendulum.now("UTC").subtract(minutes=1) fr = await models.flow_runs.create_flow_run( session=session, flow_run=schemas.core.FlowRun( @@ -420,7 +420,7 @@ async def test_flow_run_estimated_run_time_matches_total_run_time( async def test_flow_run_estimated_run_time_includes_current_run( self, session, flow, db ): - dt = pendulum.now().subtract(minutes=1) + dt = pendulum.now("UTC").subtract(minutes=1) fr = await models.flow_runs.create_flow_run( session=session, flow_run=schemas.core.FlowRun( @@ -456,7 +456,7 @@ async def test_flow_run_estimated_run_time_includes_current_run( async def test_task_run_estimated_run_time_matches_total_run_time( self, session, flow_run, db ): - dt = pendulum.now().subtract(minutes=1) + dt = pendulum.now("UTC").subtract(minutes=1) tr = await models.task_runs.create_task_run( session=session, task_run=schemas.core.TaskRun( @@ -492,7 +492,7 @@ async def test_task_run_estimated_run_time_matches_total_run_time( async def test_task_run_estimated_run_time_includes_current_run( self, session, flow_run, db ): - dt = pendulum.now().subtract(minutes=1) + dt = pendulum.now("UTC").subtract(minutes=1) tr = await models.task_runs.create_task_run( session=session, task_run=schemas.core.TaskRun( @@ -534,7 +534,7 @@ async def test_estimated_run_time_in_correlated_subquery(self, session, flow, db The estimated_run_time includes a .correlate() statement that ensures it can be used as a correlated subquery within other selects or joins. """ - dt = pendulum.now().subtract(minutes=1) + dt = pendulum.now("UTC").subtract(minutes=1) fr = await models.flow_runs.create_flow_run( session=session, flow_run=schemas.core.FlowRun( @@ -585,7 +585,7 @@ async def test_estimated_run_time_in_correlated_subquery(self, session, flow, db class TestExpectedStartTimeDelta: async def test_flow_run_lateness_when_scheduled(self, session, flow, db): - dt = pendulum.now().subtract(minutes=1) + dt = pendulum.now("UTC").subtract(minutes=1) fr = await models.flow_runs.create_flow_run( session=session, flow_run=schemas.core.FlowRun( @@ -610,7 +610,7 @@ async def test_flow_run_lateness_when_scheduled(self, session, flow, db): ) async def test_flow_run_lateness_when_pending(self, session, flow, db): - dt = pendulum.now().subtract(minutes=1) + dt = pendulum.now("UTC").subtract(minutes=1) fr = await models.flow_runs.create_flow_run( session=session, flow_run=schemas.core.FlowRun( @@ -642,7 +642,7 @@ async def test_flow_run_lateness_when_pending(self, session, flow, db): ) async def test_flow_run_lateness_when_running(self, session, flow, db): - dt = pendulum.now().subtract(minutes=1) + dt = pendulum.now("UTC").subtract(minutes=1) fr = await models.flow_runs.create_flow_run( session=session, flow_run=schemas.core.FlowRun( @@ -666,7 +666,7 @@ async def test_flow_run_lateness_when_running(self, session, flow, db): assert result.scalar() == pendulum.duration(seconds=5) async def test_flow_run_lateness_when_terminal(self, session, flow, db): - dt = pendulum.now().subtract(minutes=1) + dt = pendulum.now("UTC").subtract(minutes=1) fr = await models.flow_runs.create_flow_run( session=session, flow_run=schemas.core.FlowRun( @@ -690,7 +690,7 @@ async def test_flow_run_lateness_when_terminal(self, session, flow, db): assert result.scalar() == pendulum.duration(seconds=0) async def test_flow_run_lateness_is_zero_when_early(self, session, flow, db): - dt = pendulum.now().subtract(minutes=1) + dt = pendulum.now("UTC").subtract(minutes=1) fr = await models.flow_runs.create_flow_run( session=session, flow_run=schemas.core.FlowRun( diff --git a/tests/server/models/test_task_run_states.py b/tests/server/models/test_task_run_states.py index 03b9b8bf222f..f14bbb445d6b 100644 --- a/tests/server/models/test_task_run_states.py +++ b/tests/server/models/test_task_run_states.py @@ -145,7 +145,7 @@ async def test_database_is_not_updated_when_no_transition_takes_place( trs = await models.task_runs.set_task_run_state( session=session, task_run_id=task_run.id, - state=Scheduled(scheduled_time=pendulum.now().add(months=1)), + state=Scheduled(scheduled_time=pendulum.now("UTC").add(months=1)), task_policy=await provide_task_policy(), ) @@ -172,7 +172,7 @@ def priority(): trs = await models.task_runs.set_task_run_state( session=session, task_run_id=task_run.id, - state=Scheduled(scheduled_time=pendulum.now().add(months=1)), + state=Scheduled(scheduled_time=pendulum.now("UTC").add(months=1)), task_policy=await provide_task_policy(), ) @@ -208,7 +208,7 @@ def priority(): trs = await models.task_runs.set_task_run_state( session=session, task_run_id=task_run.id, - state=Scheduled(scheduled_time=pendulum.now().add(months=1)), + state=Scheduled(scheduled_time=pendulum.now("UTC").add(months=1)), task_policy=await provide_task_policy(), orchestration_parameters=await provide_task_orchestration_parameters(), ) diff --git a/tests/server/models/test_task_runs.py b/tests/server/models/test_task_runs.py index e2d351f28def..c5b8a15e76f3 100644 --- a/tests/server/models/test_task_runs.py +++ b/tests/server/models/test_task_runs.py @@ -355,7 +355,7 @@ async def test_read_task_runs_filters_by_task_run_states_any( async def test_read_task_runs_filters_by_task_run_start_time( self, flow_run, session ): - now = pendulum.now() + now = pendulum.now("UTC") task_run_1 = await models.task_runs.create_task_run( session=session, task_run=schemas.core.TaskRun( @@ -628,7 +628,7 @@ async def test_read_task_runs_applies_offset(self, flow_run, session): assert {result_1[0].id, result_2[0].id} == {task_run_1.id, task_run_2.id} async def test_read_task_runs_applies_sort(self, flow_run, session): - now = pendulum.now() + now = pendulum.now("UTC") await models.task_runs.create_task_run( session=session, task_run=schemas.core.TaskRun( diff --git a/tests/server/models/test_work_queues.py b/tests/server/models/test_work_queues.py index 1ff300a80cf7..078fedcbbbe7 100644 --- a/tests/server/models/test_work_queues.py +++ b/tests/server/models/test_work_queues.py @@ -332,7 +332,7 @@ async def test_get_runs_in_queue_scheduled_before( runs_wq1 = await models.work_queues.get_runs_in_work_queue( session=session, work_queue_id=work_queue.id, - scheduled_before=pendulum.now(), + scheduled_before=pendulum.now("UTC"), ) assert len(runs_wq1) == 1 diff --git a/tests/server/models/test_workers.py b/tests/server/models/test_workers.py index 73849a679840..d12258b7fef9 100644 --- a/tests/server/models/test_workers.py +++ b/tests/server/models/test_workers.py @@ -721,7 +721,7 @@ async def setup(self, session, flow): flow_run=schemas.core.FlowRun( flow_id=flow.id, state=prefect.states.Scheduled( - scheduled_time=pendulum.now().add(hours=i) + scheduled_time=pendulum.now("UTC").add(hours=i) ), work_queue_id=wq.id, ), @@ -773,13 +773,13 @@ async def test_get_all_runs_limit(self, session): async def test_get_all_runs_scheduled_before(self, session): runs = await models.workers.get_scheduled_flow_runs( - session=session, scheduled_before=pendulum.now() + session=session, scheduled_before=pendulum.now("UTC") ) assert len(runs) == 18 async def test_get_all_runs_scheduled_after(self, session): runs = await models.workers.get_scheduled_flow_runs( - session=session, scheduled_after=pendulum.now() + session=session, scheduled_after=pendulum.now("UTC") ) assert len(runs) == 27 diff --git a/tests/server/orchestration/test_core_policy.py b/tests/server/orchestration/test_core_policy.py index ff805d44814a..7628f2c88212 100644 --- a/tests/server/orchestration/test_core_policy.py +++ b/tests/server/orchestration/test_core_policy.py @@ -92,7 +92,7 @@ async def test_running_after_scheduled_start_time_is_not_delayed( session, run_type, *intended_transition, - initial_details={"scheduled_time": pendulum.now().subtract(minutes=5)}, + initial_details={"scheduled_time": pendulum.now("UTC").subtract(minutes=5)}, ) async with WaitForScheduledTime(ctx, *intended_transition) as ctx: @@ -116,7 +116,7 @@ async def test_running_before_scheduled_start_time_is_delayed( session, run_type, *intended_transition, - initial_details={"scheduled_time": pendulum.now().add(minutes=5)}, + initial_details={"scheduled_time": pendulum.now("UTC").add(minutes=5)}, ) async with WaitForScheduledTime(ctx, *intended_transition) as ctx: @@ -148,7 +148,7 @@ async def test_scheduling_rule_does_not_fire_against_other_state_types( session, run_type, *intended_transition, - initial_details={"scheduled_time": pendulum.now().add(minutes=5)}, + initial_details={"scheduled_time": pendulum.now("UTC").add(minutes=5)}, ) scheduling_rule = WaitForScheduledTime(ctx, *intended_transition) @@ -168,7 +168,7 @@ async def test_scheduled_time_copied_from_scheduled_to_pending( initial_state_type = states.StateType.SCHEDULED proposed_state_type = states.StateType.PENDING intended_transition = (initial_state_type, proposed_state_type) - scheduled_time = pendulum.now().subtract(minutes=5) + scheduled_time = pendulum.now("UTC").subtract(minutes=5) ctx = await initialize_orchestration( session, @@ -206,7 +206,7 @@ async def test_scheduled_time_not_copied_for_other_transitions( session, run_type, *intended_transition, - initial_details={"scheduled_time": pendulum.now().add(minutes=5)}, + initial_details={"scheduled_time": pendulum.now("UTC").add(minutes=5)}, ) scheduling_rule = CopyScheduledTime(ctx, *intended_transition) @@ -220,8 +220,8 @@ class TestCachingBackendLogic: @pytest.mark.parametrize( ["expiration", "expected_status", "expected_name"], [ - (pendulum.now().subtract(days=1), SetStateStatus.ACCEPT, "Running"), - (pendulum.now().add(days=1), SetStateStatus.REJECT, "Cached"), + (pendulum.now("UTC").subtract(days=1), SetStateStatus.ACCEPT, "Running"), + (pendulum.now("UTC").add(days=1), SetStateStatus.REJECT, "Cached"), (None, SetStateStatus.REJECT, "Cached"), ], ids=["past", "future", "null"], @@ -284,7 +284,7 @@ async def test_cache_insertion_requires_validated_state( initial_state_type = states.StateType.RUNNING proposed_state_type = states.StateType.COMPLETED intended_transition = (initial_state_type, proposed_state_type) - expiration = pendulum.now().subtract(days=1) + expiration = pendulum.now("UTC").subtract(days=1) ctx1 = await initialize_orchestration( session, @@ -367,7 +367,7 @@ async def test_retries( initialize_orchestration, monkeypatch, ): - now = pendulum.now() + now = pendulum.now("UTC") monkeypatch.setattr("pendulum.now", lambda *args: now) failed_task_runs = [ diff --git a/tests/server/orchestration/test_global_policy.py b/tests/server/orchestration/test_global_policy.py index 30083027abd9..d7349f37fbd6 100644 --- a/tests/server/orchestration/test_global_policy.py +++ b/tests/server/orchestration/test_global_policy.py @@ -94,7 +94,7 @@ async def test_rule_sets_scheduled_time( initial_state_type = None proposed_state_type = states.StateType.SCHEDULED intended_transition = (initial_state_type, proposed_state_type) - scheduled_time = pendulum.now().add(seconds=42) + scheduled_time = pendulum.now("UTC").add(seconds=42) ctx = await initialize_orchestration( session, run_type, @@ -120,7 +120,7 @@ async def test_rule_removes_scheduled_time_when_exiting_scheduled_state( initial_state_type = states.StateType.SCHEDULED proposed_state_type = states.StateType.PENDING intended_transition = (initial_state_type, proposed_state_type) - scheduled_time = pendulum.now().add(seconds=42) + scheduled_time = pendulum.now("UTC").add(seconds=42) ctx = await initialize_orchestration( session, run_type, @@ -173,7 +173,7 @@ async def test_rule_sets_expected_start_time_from_scheduled( run_type, initialize_orchestration, ): - dt = pendulum.now().add(days=10) + dt = pendulum.now("UTC").add(days=10) initial_state_type = None proposed_state_type = states.StateType.SCHEDULED @@ -288,7 +288,7 @@ async def test_rule_updates_run_time_after_running( *intended_transition, ) - now = pendulum.now() + now = pendulum.now("UTC") run = ctx.run run.start_time = now.subtract(seconds=42) ctx.initial_state.timestamp = now.subtract(seconds=42) @@ -316,7 +316,7 @@ async def test_rule_doesnt_update_run_time_when_not_running( *intended_transition, ) - now = pendulum.now() + now = pendulum.now("UTC") run = ctx.run run.start_time = now.subtract(seconds=42) ctx.initial_state.timestamp = now.subtract(seconds=42) @@ -343,7 +343,7 @@ async def test_rule_sets_end_time_when_when_run_ends( ) run = ctx.run - run.start_time = pendulum.now().subtract(seconds=42) + run.start_time = pendulum.now("UTC").subtract(seconds=42) assert run.end_time is None async with SetEndTime(ctx, *intended_transition) as ctx: @@ -364,8 +364,8 @@ async def test_rule_unsets_end_time_when_forced_out_of_terminal_state( ) run = ctx.run - run.start_time = pendulum.now().subtract(seconds=42) - run.end_time = pendulum.now() + run.start_time = pendulum.now("UTC").subtract(seconds=42) + run.end_time = pendulum.now("UTC") assert run.end_time is not None async with SetEndTime(ctx, *intended_transition) as ctx: @@ -388,7 +388,7 @@ async def test_rule_does_not_modify_end_time_when_transitioning_from_final_to_fi *intended_transition, ) - dt = pendulum.now() + dt = pendulum.now("UTC") run = ctx.run run.start_time = dt.subtract(seconds=42) diff --git a/tests/server/orchestration/test_rules.py b/tests/server/orchestration/test_rules.py index 976c5a57c81e..fb35306e2156 100644 --- a/tests/server/orchestration/test_rules.py +++ b/tests/server/orchestration/test_rules.py @@ -46,7 +46,7 @@ async def commit_task_run_state( state_type == states.StateType.SCHEDULED and "scheduled_time" not in state_details ): - state_details.update({"scheduled_time": pendulum.now()}) + state_details.update({"scheduled_time": pendulum.now("UTC")}) new_state = schemas.states.State( type=state_type, diff --git a/tests/server/schemas/test_filters.py b/tests/server/schemas/test_filters.py index ad62bf83ff9c..3210b49c2602 100644 --- a/tests/server/schemas/test_filters.py +++ b/tests/server/schemas/test_filters.py @@ -6,7 +6,7 @@ from prefect.server import schemas from prefect.server.schemas.filters import LogFilter -NOW = pendulum.now() +NOW = pendulum.now("UTC") async def test_filters_without_params_do_not_error(): diff --git a/tests/server/services/test_late_runs.py b/tests/server/services/test_late_runs.py index 29468275f6c0..eb5a42452f4b 100644 --- a/tests/server/services/test_late_runs.py +++ b/tests/server/services/test_late_runs.py @@ -17,7 +17,7 @@ async def late_run(session, flow): flow_run=schemas.core.FlowRun( flow_id=flow.id, state=schemas.states.Scheduled( - scheduled_time=pendulum.now().subtract(minutes=1) + scheduled_time=pendulum.now("UTC").subtract(minutes=1) ), ), ) @@ -31,7 +31,7 @@ async def late_run_2(session, flow): flow_run=schemas.core.FlowRun( flow_id=flow.id, state=schemas.states.Scheduled( - scheduled_time=pendulum.now().subtract(minutes=1) + scheduled_time=pendulum.now("UTC").subtract(minutes=1) ), ), ) @@ -45,7 +45,7 @@ async def future_run(session, flow): flow_run=schemas.core.FlowRun( flow_id=flow.id, state=schemas.states.Scheduled( - scheduled_time=pendulum.now().add(minutes=1) + scheduled_time=pendulum.now("UTC").add(minutes=1) ), ), ) @@ -58,7 +58,7 @@ async def now_run(session, flow): session=session, flow_run=schemas.core.FlowRun( flow_id=flow.id, - state=schemas.states.Scheduled(scheduled_time=pendulum.now()), + state=schemas.states.Scheduled(scheduled_time=pendulum.now("UTC")), ), ) diff --git a/tests/server/services/test_scheduler.py b/tests/server/services/test_scheduler.py index 237e55cb958f..fb3f4e3052b4 100644 --- a/tests/server/services/test_scheduler.py +++ b/tests/server/services/test_scheduler.py @@ -63,7 +63,7 @@ async def test_create_schedule_respects_max_future_time(flow, session): assert len(runs) == 3 expected_dates = await deployment.schedule.get_dates( - service.max_runs, end=pendulum.now() + service.max_scheduled_time + service.max_runs, end=pendulum.now("UTC") + service.max_scheduled_time ) assert set(expected_dates) == {r.state.state_details.scheduled_time for r in runs} @@ -116,8 +116,8 @@ async def test_create_schedules_from_multiple_deployments(flow, session): for deployment in [d1, d2, d3]: dep_runs = await deployment.schedule.get_dates( service.min_runs, - start=pendulum.now(), - end=pendulum.now() + service.max_scheduled_time, + start=pendulum.now("UTC"), + end=pendulum.now("UTC") + service.max_scheduled_time, ) expected_dates.update(dep_runs) assert set(expected_dates) == {r.state.state_details.scheduled_time for r in runs} @@ -272,7 +272,7 @@ async def test_schedules_runs_for_recently_updated_deployments( await session.execute( sa.update(db.Deployment) .where(db.Deployment.id == deployment.id) - .values(created=pendulum.now().subtract(hours=1)) + .values(created=pendulum.now("UTC").subtract(hours=1)) ) await session.commit() @@ -298,7 +298,7 @@ async def test_schedules_no_runs_for_deployments_updated_a_while_ago( await session.execute( sa.update(db.Deployment) .where(db.Deployment.id == deployment.id) - .values(updated=pendulum.now().subtract(minutes=1)) + .values(updated=pendulum.now("UTC").subtract(minutes=1)) ) await session.commit() diff --git a/tests/server/utilities/test_schemas.py b/tests/server/utilities/test_schemas.py index ce87d5d4b51d..2fe829cd1ad9 100644 --- a/tests/server/utilities/test_schemas.py +++ b/tests/server/utilities/test_schemas.py @@ -234,12 +234,12 @@ class Parent(PrefectBaseModel): return Parent(x=uuid4(), y=Child(z=uuid4())) def test_json_compatible_and_nested_errors(self): - model = self.Model(x=uuid4(), y=pendulum.now()) + model = self.Model(x=uuid4(), y=pendulum.now("UTC")) with pytest.raises(ValueError, match="(only be applied to the entire object)"): model.dict(json_compatible=True, shallow=True) def test_json_compatible(self): - model = self.Model(x=uuid4(), y=pendulum.now()) + model = self.Model(x=uuid4(), y=pendulum.now("UTC")) d1 = model.dict() d2 = model.dict(json_compatible=True) @@ -288,9 +288,9 @@ def test_ormbasemodel_equality(self): class X(ORMBaseModel): x: int - x1 = X(id=uuid4(), created=pendulum.now(), x=1) - x2 = X(id=uuid4(), created=pendulum.now().add(hours=1), x=1) - x3 = X(id=uuid4(), created=pendulum.now().subtract(hours=1), x=2) + x1 = X(id=uuid4(), created=pendulum.now("UTC"), x=1) + x2 = X(id=uuid4(), created=pendulum.now("UTC").add(hours=1), x=1) + x3 = X(id=uuid4(), created=pendulum.now("UTC").subtract(hours=1), x=2) assert x1 == x2 assert x1.created != x2.created assert x1 != x3 diff --git a/tests/test_deployments.py b/tests/test_deployments.py index 4320d202604c..2ed1931ea239 100644 --- a/tests/test_deployments.py +++ b/tests/test_deployments.py @@ -959,7 +959,7 @@ def test_schedules_immediately_by_default( ): d, deployment_id = test_deployment - scheduled_time = pendulum.now() + scheduled_time = pendulum.now("UTC") flow_run = run_deployment( f"{d.flow_name}/{d.name}", timeout=0, @@ -973,7 +973,7 @@ def test_accepts_custom_scheduled_time( ): d, deployment_id = test_deployment - scheduled_time = pendulum.now() + pendulum.Duration(minutes=5) + scheduled_time = pendulum.now("UTC") + pendulum.Duration(minutes=5) flow_run = run_deployment( f"{d.flow_name}/{d.name}", scheduled_time=scheduled_time,