Skip to content

Commit

Permalink
Make all calls to pendulum.now() explicitly in "UTC" (#10320)
Browse files Browse the repository at this point in the history
  • Loading branch information
psofiterol authored Jul 27, 2023
1 parent b42c09f commit e530ca3
Show file tree
Hide file tree
Showing 38 changed files with 130 additions and 126 deletions.
4 changes: 2 additions & 2 deletions src/prefect/_internal/compatibility/deprecated.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def generate_deprecation_message(
):
if not start_date and not end_date:
raise ValueError(
"A start date is required if an end date is not provided. "
f"Suggested start date is {pendulum.now().format(DEPRECATED_DATEFMT)!r}"
"A start date is required if an end date is not provided. Suggested start"
f" date is {pendulum.now('UTC').format(DEPRECATED_DATEFMT)!r}"
)

if not end_date:
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/client/orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ async def get_runs_in_work_queue(
List[FlowRun]: a list of FlowRun objects read from the queue
"""
if scheduled_before is None:
scheduled_before = pendulum.now()
scheduled_before = pendulum.now("UTC")

try:
response = await self._client.post(
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/events/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def emit_event(
}

if occurred is None:
occurred = pendulum.now()
occurred = pendulum.now("UTC")
event_kwargs["occurred"] = occurred

if related is not None:
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/server/api/concurrency_limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def create_concurrency_limit(
session=session, concurrency_limit=concurrency_limit_model
)

if model.created >= pendulum.now():
if model.created >= pendulum.now("UTC"):
response.status_code = status.HTTP_201_CREATED

return model
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/server/api/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async def create_deployment(
),
)

now = pendulum.now()
now = pendulum.now("UTC")
model = await models.deployments.create_deployment(
session=session, deployment=deployment
)
Expand Down
4 changes: 2 additions & 2 deletions src/prefect/server/api/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ async def resume_flow_run(
"""
Resume a paused flow run.
"""
now = pendulum.now()
now = pendulum.now("UTC")

async with db.session_context(begin_transaction=True) as session:
flow_run = await models.flow_runs.read_flow_run(session, flow_run_id)
Expand Down Expand Up @@ -394,7 +394,7 @@ async def set_flow_run_state(
# pass the request version to the orchestration engine to support compatibility code
orchestration_parameters.update({"api-version": api_version})

now = pendulum.now()
now = pendulum.now("UTC")

# create the state
async with db.session_context(
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/server/api/saved_searches.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def create_saved_search(
# hydrate the input model into a full model
saved_search = schemas.core.SavedSearch(**saved_search.dict())

now = pendulum.now()
now = pendulum.now("UTC")

async with db.session_context(begin_transaction=True) as session:
model = await models.saved_searches.create_saved_search(
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/server/api/task_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async def set_task_run_state(
) -> OrchestrationResult:
"""Set a task run state, invoking any orchestration rules."""

now = pendulum.now()
now = pendulum.now("UTC")

# create the state
async with db.session_context(
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/server/database/orm_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ def estimated_run_time(self):
state is exited. To give up-to-date estimates, we estimate incremental
run time for any runs currently in a RUNNING state."""
if self.state_type and self.state_type == schemas.states.StateType.RUNNING:
return self.total_run_time + (pendulum.now() - self.state_timestamp)
return self.total_run_time + (pendulum.now("UTC") - self.state_timestamp)
else:
return self.total_run_time

Expand Down
2 changes: 1 addition & 1 deletion src/prefect/server/orchestration/core_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ async def before_transition(

# At this moment, we round delay to the nearest second as the API schema
# specifies an integer return value.
delay = scheduled_time - pendulum.now()
delay = scheduled_time - pendulum.now("UTC")
delay_seconds = delay.in_seconds()
delay_seconds += round(delay.microseconds / 1e6)
if delay_seconds > 0:
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/server/services/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def _get_select_deployments_to_schedule_query(self, db: PrefectDBInterface):
# second to run). Scheduling is idempotent so picking up schedules
# multiple times is not a concern.
db.Deployment.updated
>= pendulum.now().subtract(seconds=self.loop_seconds + 1),
>= pendulum.now("UTC").subtract(seconds=self.loop_seconds + 1),
)
.order_by(db.Deployment.id)
.limit(self.deployment_batch_size)
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/server/services/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def _fetch_or_set_telemetry_session(self, db: PrefectDBInterface):
if telemetry_session is None:
self.logger.debug("No telemetry session found, setting")
session_id = str(uuid4())
session_start_timestamp = pendulum.now().to_iso8601_string()
session_start_timestamp = pendulum.now("UTC").to_iso8601_string()

telemetry_session = Configuration(
key="TELEMETRY_SESSION",
Expand Down
12 changes: 6 additions & 6 deletions tests/agent/test_agent_run_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def create_run_with_deployment(state):
deployment.work_queue_name
)
work_queue_runs = await prefect_client.get_runs_in_work_queue(
work_queue.id, scheduled_before=pendulum.now().add(seconds=10)
work_queue.id, scheduled_before=pendulum.now("UTC").add(seconds=10)
)
work_queue_flow_run_ids = {run.id for run in work_queue_runs}

Expand Down Expand Up @@ -142,7 +142,7 @@ def create_run_with_deployment(state):
deployment.work_queue_name
)
work_queue_runs = await prefect_client.get_runs_in_work_queue(
work_queue.id, scheduled_before=pendulum.now().add(seconds=10)
work_queue.id, scheduled_before=pendulum.now("UTC").add(seconds=10)
)
work_queue_runs.sort(key=lambda run: run.next_scheduled_start_time)
work_queue_flow_run_ids = [run.id for run in work_queue_runs]
Expand Down Expand Up @@ -921,7 +921,7 @@ async def test_agent_displays_message_on_work_queue_pause(
await prefect_client.update_work_queue(work_queue.id, is_paused=True)

# clear agent cache
agent._work_queue_cache_expiration = pendulum.now()
agent._work_queue_cache_expiration = pendulum.now("UTC")

# Should emit the paused message
await agent.get_and_submit_flow_runs()
Expand Down Expand Up @@ -971,7 +971,7 @@ def create_run_with_deployment(state):
responses = await prefect_client.get_scheduled_flow_runs_for_work_pool(
work_pool_name=work_pool.name,
work_queue_names=[work_queue_1.name],
scheduled_before=pendulum.now().add(seconds=10),
scheduled_before=pendulum.now("UTC").add(seconds=10),
)
work_queue_flow_run_ids = {response.flow_run.id for response in responses}

Expand Down Expand Up @@ -1036,7 +1036,7 @@ def create_run_with_deployment(state):
responses = await prefect_client.get_scheduled_flow_runs_for_work_pool(
work_pool_name=work_pool.name,
work_queue_names=[work_queue.name],
scheduled_before=pendulum.now().add(seconds=10),
scheduled_before=pendulum.now("UTC").add(seconds=10),
)
work_queue_flow_run_ids = {response.flow_run.id for response in responses}

Expand Down Expand Up @@ -1100,7 +1100,7 @@ def create_run_with_deployment(state):
responses = await prefect_client.get_scheduled_flow_runs_for_work_pool(
work_pool_name=work_pool.name,
work_queue_names=[work_queue.name],
scheduled_before=pendulum.now().add(seconds=10),
scheduled_before=pendulum.now("UTC").add(seconds=10),
)
work_queue_flow_run_ids = {response.flow_run.id for response in responses}

Expand Down
2 changes: 1 addition & 1 deletion tests/cli/deployment/test_deployment_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def deployment_name(deployment, prefect_client):

@pytest.fixture
def frozen_now(monkeypatch):
now = pendulum.now()
now = pendulum.now("UTC")
monkeypatch.setattr("pendulum.now", lambda *_: now)
yield now

Expand Down
2 changes: 1 addition & 1 deletion tests/events/test_event_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_client_events_generate_an_id_by_default():

def test_client_events_generate_occurred_by_default(start_of_test: DateTime):
event = Event(event="hello", resource={"prefect.resource.id": "hello"})
assert start_of_test <= event.occurred <= pendulum.now()
assert start_of_test <= event.occurred <= pendulum.now("UTC")


def test_client_events_may_have_empty_related_resources():
Expand Down
2 changes: 1 addition & 1 deletion tests/events/test_events_emit_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_does_not_set_follows_not_tight_timing(
):
destroyed_event = emit_event(
event="planet.destroyed",
occurred=pendulum.now() - timedelta(minutes=10),
occurred=pendulum.now("UTC") - timedelta(minutes=10),
resource={"prefect.resource.id": "milky-way.sol.earth"},
)

Expand Down
22 changes: 11 additions & 11 deletions tests/server/api/test_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ async def test_upserting_deployment_with_inactive_schedule_deletes_existing_auto
flow_id=deployment.flow_id,
deployment_id=deployment.id,
state=schemas.states.Scheduled(
scheduled_time=pendulum.now().add(days=1)
scheduled_time=pendulum.now("UTC").add(days=1)
),
),
)
Expand Down Expand Up @@ -286,7 +286,7 @@ async def test_upserting_deployment_with_new_schedule_deletes_existing_auto_sche
flow_id=deployment.flow_id,
deployment_id=deployment.id,
state=schemas.states.Scheduled(
scheduled_time=pendulum.now().add(seconds=2)
scheduled_time=pendulum.now("UTC").add(seconds=2)
),
),
)
Expand All @@ -313,7 +313,7 @@ async def test_upserting_deployment_with_new_schedule_deletes_existing_auto_sche
# check that the maximum run is from the secondly schedule
query = sa.select(sa.func.max(db.FlowRun.expected_start_time))
result = await session.execute(query)
assert result.scalar() < pendulum.now().add(seconds=100)
assert result.scalar() < pendulum.now("UTC").add(seconds=100)

async def test_create_deployment_throws_useful_error_on_missing_blocks(
self,
Expand Down Expand Up @@ -1091,7 +1091,7 @@ async def test_set_schedule_inactive_deletes_auto_scheduled_runs(
flow_id=deployment.flow_id,
deployment_id=deployment.id,
state=schemas.states.Scheduled(
scheduled_time=pendulum.now().add(days=1)
scheduled_time=pendulum.now("UTC").add(days=1)
),
),
)
Expand All @@ -1113,8 +1113,8 @@ async def test_schedule_deployment(self, client, session, deployment):
runs = await models.flow_runs.read_flow_runs(session)
expected_dates = await deployment.schedule.get_dates(
n=PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS.value(),
start=pendulum.now(),
end=pendulum.now()
start=pendulum.now("UTC"),
end=pendulum.now("UTC")
+ PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value(),
)
actual_dates = {r.state.state_details.scheduled_time for r in runs}
Expand All @@ -1131,8 +1131,8 @@ async def test_schedule_deployment_provide_runs(self, client, session, deploymen
runs = await models.flow_runs.read_flow_runs(session)
expected_dates = await deployment.schedule.get_dates(
n=5,
start=pendulum.now(),
end=pendulum.now()
start=pendulum.now("UTC"),
end=pendulum.now("UTC")
+ PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value(),
)
actual_dates = {r.state.state_details.scheduled_time for r in runs}
Expand All @@ -1144,14 +1144,14 @@ async def test_schedule_deployment_start_time(self, client, session, deployment)

await client.post(
f"/deployments/{deployment.id}/schedule",
json=dict(start_time=str(pendulum.now().add(days=120))),
json=dict(start_time=str(pendulum.now("UTC").add(days=120))),
)

runs = await models.flow_runs.read_flow_runs(session)
expected_dates = await deployment.schedule.get_dates(
n=PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS.value(),
start=pendulum.now().add(days=120),
end=pendulum.now().add(days=120)
start=pendulum.now("UTC").add(days=120),
end=pendulum.now("UTC").add(days=120)
+ PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value(),
)
actual_dates = {r.state.state_details.scheduled_time for r in runs}
Expand Down
20 changes: 11 additions & 9 deletions tests/server/api/test_flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def test_create_flow_run_with_state_sets_timestamp_on_server(
"/flow_runs/",
json=actions.FlowRunCreate(
flow_id=flow.id,
state=states.Completed(timestamp=pendulum.now().add(months=1)),
state=states.Completed(timestamp=pendulum.now("UTC").add(months=1)),
).dict(json_compatible=True),
)
assert response.status_code == status.HTTP_201_CREATED
Expand All @@ -67,7 +67,7 @@ async def test_create_flow_run_with_state_sets_timestamp_on_server(
session=session, flow_run_id=response.json()["id"]
)
# the timestamp was overwritten
assert flow_run.state.timestamp < pendulum.now()
assert flow_run.state.timestamp < pendulum.now("UTC")

async def test_create_flow_run_without_state_yields_default_pending(
self, flow, client, session
Expand Down Expand Up @@ -534,7 +534,7 @@ async def test_read_flow_runs_returns_empty_list(self, client):
assert response.json() == []

async def test_read_flow_runs_applies_sort(self, session, flow, client):
now = pendulum.now()
now = pendulum.now("UTC")
flow_run_1 = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
Expand Down Expand Up @@ -867,13 +867,15 @@ async def test_set_flow_run_state_ignores_client_provided_timestamp(
state=dict(
type="RUNNING",
name="Test State",
timestamp=str(pendulum.now().add(months=1)),
timestamp=str(pendulum.now("UTC").add(months=1)),
)
),
)
assert response.status_code == status.HTTP_201_CREATED
state = schemas.states.State.parse_obj(response.json()["state"])
assert state.timestamp < pendulum.now(), "The timestamp should be overwritten"
assert state.timestamp < pendulum.now(
"UTC"
), "The timestamp should be overwritten"

async def test_set_flow_run_state_force_skips_orchestration(
self, flow_run, client, session
Expand All @@ -885,7 +887,7 @@ async def test_set_flow_run_state_force_skips_orchestration(
type=StateType.SCHEDULED,
name="Scheduled",
state_details=dict(
scheduled_time=str(pendulum.now().add(months=1))
scheduled_time=str(pendulum.now("UTC").add(months=1))
),
)
),
Expand Down Expand Up @@ -1056,8 +1058,8 @@ async def test_history_interval_must_be_one_second_or_larger(self, client):
response = await client.post(
"/flow_runs/history",
json=dict(
history_start=str(pendulum.now()),
history_end=str(pendulum.now().add(days=1)),
history_start=str(pendulum.now("UTC")),
history_end=str(pendulum.now("UTC").add(days=1)),
history_interval_seconds=0.9,
),
)
Expand All @@ -1074,7 +1076,7 @@ def url(self) -> str:
async def late_flow_runs(self, session, flow):
flow_runs = []
for i in range(5):
one_minute_ago = pendulum.now().subtract(minutes=1)
one_minute_ago = pendulum.now("UTC").subtract(minutes=1)
flow_run = await models.flow_runs.create_flow_run(
session=session,
flow_run=schemas.core.FlowRun(
Expand Down
Loading

0 comments on commit e530ca3

Please sign in to comment.