Skip to content

Commit

Permalink
WIP: mixed dummy + looptime tests for daemons — split and extract
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Vasilyev <[email protected]>
  • Loading branch information
nolar committed Jul 30, 2023
1 parent 58eaa69 commit 687228d
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 314 deletions.
64 changes: 24 additions & 40 deletions tests/handling/daemons/conftest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import asyncio
import collections
import contextlib
import time
from typing import Optional

import freezegun
import pytest
from mock import MagicMock, patch
from mock import MagicMock

import kopf
from kopf._cogs.aiokits.aiotoggles import ToggleSet
Expand All @@ -20,23 +20,32 @@ class DaemonDummy:
def __init__(self):
super().__init__()
self.mock = MagicMock()
self.kwargs = {}
self._flag_statuses = collections.defaultdict(lambda: False)
self.steps = {
'called': asyncio.Event(),
'finish': asyncio.Event(),
'error': asyncio.Event(),
}
self.called = asyncio.Condition()
self.failed = asyncio.Event()
self.finished = asyncio.Event()

async def wait_for_daemon_done(self):
stopped = self.kwargs['stopped']
async def wait_for_daemon_done(self) -> None:
stopped = self.mock.call_args[1]['stopped']
await stopped.wait()
while not stopped.reason & stopped.reason.DONE:
while stopped.reason is None or not stopped.reason & stopped.reason.DONE:
await asyncio.sleep(0) # give control back to asyncio event loop


@pytest.fixture()
def dummy():
return DaemonDummy()
async def dummy(simulate_cycle):
dummy = DaemonDummy()
yield dummy

# Cancel the background tasks, if any.
event_object = {'metadata': {'deletionTimestamp': '...'}}
await simulate_cycle(event_object)
await dummy.wait_for_daemon_done()


@pytest.fixture()
Expand All @@ -52,7 +61,11 @@ def _merge_dicts(src, dst):
else:
dst[key] = val

async def _simulate_cycle(event_object: RawBody):
async def _simulate_cycle(
event_object: RawBody,
*,
stream_pressure: Optional[asyncio.Event] = None,
) -> None:
mocker.resetall()

await process_resource_event(
Expand All @@ -65,6 +78,7 @@ async def _simulate_cycle(event_object: RawBody):
indexers=OperatorIndexers(),
raw_event={'type': 'irrelevant', 'object': event_object},
event_queue=asyncio.Queue(),
stream_pressure=stream_pressure,
)

# Do the same as k8s does: merge the patches into the object.
Expand Down Expand Up @@ -96,33 +110,3 @@ async def background_daemon_killer(settings, memories, operator_paused):
with contextlib.suppress(asyncio.CancelledError):
task.cancel()
await task


@pytest.fixture()
async def frozen_time():
"""
A helper to simulate time movements to step over long sleeps/timeouts.
"""
with freezegun.freeze_time("2020-01-01 00:00:00") as frozen:
# Use freezegun-supported time instead of system clocks -- for testing purposes only.
# NB: Patch strictly after the time is frozen -- to use fake_time(), not real time().
# NB: StdLib's event loops use time.monotonic(), but uvloop uses its own C-level time,
# so patch the loop object directly instead of its implied underlying implementation.
with patch.object(asyncio.get_running_loop(), 'time', time.time):
yield frozen


# The time-driven tests mock the sleeps, and shift the time as much as it was requested to sleep.
# This makes the sleep realistic for the app code, though executed instantly for the tests.
@pytest.fixture()
def manual_time(k8s_mocked, frozen_time):
async def sleep_substitute(delay, *_, **__):
if delay is None:
pass
elif isinstance(delay, float):
frozen_time.tick(delay)
else:
frozen_time.tick(min(delay))

k8s_mocked.sleep.side_effect = sleep_substitute
yield frozen_time
116 changes: 49 additions & 67 deletions tests/handling/daemons/test_daemon_errors.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
import asyncio
import logging

import kopf
from kopf._core.actions.execution import ErrorsMode, PermanentError, TemporaryError


async def test_daemon_stopped_on_permanent_error(
settings, resource, dummy, manual_time, caplog, assert_logs, k8s_mocked, simulate_cycle):
settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime):
caplog.set_level(logging.DEBUG)

@kopf.daemon(*resource, id='fn', backoff=0.01)
@kopf.daemon(*resource, id='fn', backoff=1.23)
async def fn(**kwargs):
dummy.mock()
dummy.kwargs = kwargs
dummy.steps['called'].set()
dummy.mock(**kwargs)
raise PermanentError("boo!")

finalizer = settings.persistence.finalizer
event_object = {'metadata': {'finalizers': [finalizer]}}
await simulate_cycle(event_object)
await asyncio.sleep(123) # give it enough opportunities to misbehave (e.g. restart)

await dummy.steps['called'].wait()
await dummy.wait_for_daemon_done()

assert looptime == 123
assert dummy.mock.call_count == 1
assert k8s_mocked.patch.call_count == 0
assert k8s_mocked.sleep.call_count == 0

assert_logs([
"Daemon 'fn' failed permanently: boo!",
Expand All @@ -35,25 +31,21 @@ async def fn(**kwargs):


async def test_daemon_stopped_on_arbitrary_errors_with_mode_permanent(
settings, resource, dummy, manual_time, caplog, assert_logs, k8s_mocked, simulate_cycle):
settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime):
caplog.set_level(logging.DEBUG)

@kopf.daemon(*resource, id='fn', errors=ErrorsMode.PERMANENT, backoff=0.01)
@kopf.daemon(*resource, id='fn', errors=ErrorsMode.PERMANENT, backoff=1.23)
async def fn(**kwargs):
dummy.mock()
dummy.kwargs = kwargs
dummy.steps['called'].set()
dummy.mock(**kwargs)
raise Exception("boo!")

finalizer = settings.persistence.finalizer
event_object = {'metadata': {'finalizers': [finalizer]}}
await simulate_cycle(event_object)
await asyncio.sleep(123) # give it enough opportunities to misbehave (e.g. restart)

await dummy.steps['called'].wait()
await dummy.wait_for_daemon_done()

assert looptime == 123
assert dummy.mock.call_count == 1
assert k8s_mocked.sleep.call_count == 0

assert_logs([
"Daemon 'fn' failed with an exception and will stop now: boo!",
Expand All @@ -64,31 +56,25 @@ async def fn(**kwargs):


async def test_daemon_retried_on_temporary_error(
registry, settings, resource, dummy, manual_time,
caplog, assert_logs, k8s_mocked, simulate_cycle):
registry, settings, resource, dummy,
caplog, assert_logs, k8s_mocked, simulate_cycle, looptime):
caplog.set_level(logging.DEBUG)
finished = asyncio.Event()

@kopf.daemon(*resource, id='fn', backoff=1.0)
@kopf.daemon(*resource, id='fn', backoff=1.23)
async def fn(retry, **kwargs):
dummy.mock()
dummy.kwargs = kwargs
dummy.steps['called'].set()
dummy.mock(**kwargs)
if not retry:
raise TemporaryError("boo!", delay=1.0)
raise TemporaryError("boo!", delay=3.45)
else:
dummy.steps['finish'].set()
finished.set()

finalizer = settings.persistence.finalizer
event_object = {'metadata': {'finalizers': [finalizer]}}
await simulate_cycle(event_object)
await finished.wait()

await dummy.steps['called'].wait()
await dummy.steps['finish'].wait()
await dummy.wait_for_daemon_done()

assert k8s_mocked.sleep.call_count == 1 # one for each retry
assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#]

assert looptime == 3.45
assert_logs([
"Daemon 'fn' failed temporarily: boo!",
"Daemon 'fn' succeeded.",
Expand All @@ -97,70 +83,66 @@ async def fn(retry, **kwargs):


async def test_daemon_retried_on_arbitrary_error_with_mode_temporary(
settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time):
settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime):
caplog.set_level(logging.DEBUG)
finished = asyncio.Event()

@kopf.daemon(*resource, id='fn', errors=ErrorsMode.TEMPORARY, backoff=1.0)
@kopf.daemon(*resource, id='fn', errors=ErrorsMode.TEMPORARY, backoff=1.23)
async def fn(retry, **kwargs):
dummy.mock()
dummy.kwargs = kwargs
dummy.steps['called'].set()
dummy.mock(**kwargs)
if not retry:
raise Exception("boo!")
else:
dummy.steps['finish'].set()
finished.set()

finalizer = settings.persistence.finalizer
event_object = {'metadata': {'finalizers': [finalizer]}}
await simulate_cycle(event_object)
await finished.wait()

await dummy.steps['called'].wait()
await dummy.steps['finish'].wait()
await dummy.wait_for_daemon_done()

assert k8s_mocked.sleep.call_count == 1 # one for each retry
assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#]

assert looptime == 1.23
assert_logs([
"Daemon 'fn' failed with an exception and will try again in 1.0 seconds: boo!",
"Daemon 'fn' failed with an exception and will try again in 1.23 seconds: boo!",
"Daemon 'fn' succeeded.",
"Daemon 'fn' has exited on its own",
])


async def test_daemon_retried_until_retries_limit(
resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time):
resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime):
caplog.set_level(logging.DEBUG)
trigger = asyncio.Condition()

@kopf.daemon(*resource, id='fn', retries=3)
async def fn(**kwargs):
dummy.kwargs = kwargs
dummy.steps['called'].set()
raise TemporaryError("boo!", delay=1.0)
dummy.mock(**kwargs)
async with trigger:
trigger.notify_all()
raise TemporaryError("boo!", delay=1.23)

await simulate_cycle({})
await dummy.steps['called'].wait()
await dummy.wait_for_daemon_done()
async with trigger:
await trigger.wait_for(lambda: any("but will" in m for m in caplog.messages))

assert k8s_mocked.sleep.call_count == 2 # one between each retry (3 attempts - 2 sleeps)
assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#]
assert k8s_mocked.sleep.call_args_list[1][0][0] == 1.0 # [call#][args/kwargs][arg#]
assert looptime == 2.46
assert dummy.mock.call_count == 3


async def test_daemon_retried_until_timeout(
resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time):
resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle, looptime):
caplog.set_level(logging.DEBUG)
trigger = asyncio.Condition()

@kopf.daemon(*resource, id='fn', timeout=3.0)
@kopf.daemon(*resource, id='fn', timeout=4)
async def fn(**kwargs):
dummy.kwargs = kwargs
dummy.steps['called'].set()
raise TemporaryError("boo!", delay=1.0)
dummy.mock(**kwargs)
async with trigger:
trigger.notify_all()
raise TemporaryError("boo!", delay=1.23)

await simulate_cycle({})
await dummy.steps['called'].wait()
await dummy.wait_for_daemon_done()
async with trigger:
await trigger.wait_for(lambda: any("but will" in m for m in caplog.messages))

assert k8s_mocked.sleep.call_count == 2 # one between each retry (3 attempts - 2 sleeps)
assert k8s_mocked.sleep.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#]
assert k8s_mocked.sleep.call_args_list[1][0][0] == 1.0 # [call#][args/kwargs][arg#]
assert looptime == 3.69
assert dummy.mock.call_count == 4
11 changes: 7 additions & 4 deletions tests/handling/daemons/test_daemon_filtration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging

import pytest
Expand All @@ -11,22 +12,23 @@
async def test_daemon_filtration_satisfied(
settings, resource, dummy, caplog, assert_logs, k8s_mocked, simulate_cycle):
caplog.set_level(logging.DEBUG)
executed = asyncio.Event()

@kopf.daemon(*resource, id='fn',
labels={'a': 'value', 'b': kopf.PRESENT, 'c': kopf.ABSENT},
annotations={'x': 'value', 'y': kopf.PRESENT, 'z': kopf.ABSENT})
async def fn(**kwargs):
dummy.kwargs = kwargs
dummy.steps['called'].set()
dummy.mock(**kwargs)
executed.set()

finalizer = settings.persistence.finalizer
event_body = {'metadata': {'labels': {'a': 'value', 'b': '...'},
'annotations': {'x': 'value', 'y': '...'},
'finalizers': [finalizer]}}
await simulate_cycle(event_body)
await executed.wait()

await dummy.steps['called'].wait()
await dummy.wait_for_daemon_done()
assert dummy.mock.call_count == 1


@pytest.mark.parametrize('labels, annotations', [
Expand Down Expand Up @@ -56,6 +58,7 @@ async def fn(**kwargs):
'annotations': annotations,
'finalizers': [finalizer]}}
await simulate_cycle(event_body)
await asyncio.sleep(123) # give it enough time to do something when nothing is expected

assert spawn_daemons.called
assert spawn_daemons.call_args_list[0][1]['handlers'] == []
11 changes: 6 additions & 5 deletions tests/handling/daemons/test_daemon_rematching.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging

import kopf
Expand All @@ -7,19 +8,19 @@
async def test_running_daemon_is_stopped_when_mismatches(
resource, dummy, looptime, mocker, caplog, assert_logs, k8s_mocked, simulate_cycle):
caplog.set_level(logging.DEBUG)
executed = asyncio.Event()

@kopf.daemon(*resource, id='fn', when=lambda **_: is_matching)
async def fn(**kwargs):
dummy.mock()
dummy.kwargs = kwargs
dummy.steps['called'].set()
dummy.mock(**kwargs)
executed.set()
await kwargs['stopped'].wait()

# Ensure it is spawned while it is matching. (The same as the spawning tests.)
mocker.resetall()
is_matching = True
await simulate_cycle({})
await dummy.steps['called'].wait()
await executed.wait()
assert dummy.mock.call_count == 1

# Ensure it is stopped once it stops matching. (The same as the termination tests.)
Expand All @@ -29,5 +30,5 @@ async def fn(**kwargs):
await dummy.wait_for_daemon_done()

assert looptime == 0
stopped = dummy.kwargs['stopped']
stopped = dummy.mock.call_args[1]['stopped']
assert DaemonStoppingReason.FILTERS_MISMATCH in stopped.reason
Loading

0 comments on commit 687228d

Please sign in to comment.