Skip to content

Commit

Permalink
Refactored waiting for cache entries to become active using threading…
Browse files Browse the repository at this point in the history
….Events

Removed STATUS_POLL_PERIOD app configuration variable
Added ENTRY_WAIT_TIMEOUT app configuration variable
Refactored run_tests.py to allow concurrent execution of requests
Added description of run_tests.py to README.md
Changed ttl in example requests
Made mypy ignore build directory
  • Loading branch information
martin committed May 28, 2021
1 parent 92bd804 commit f9e1c96
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 84 deletions.
121 changes: 106 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ The application has following configurable parameters which can be either provid
| KENTIK_API_RETRIES | no | Number of retries on transient failures | 3 |
| KENTIK_API_TIMEOUT | no | Timeout for requests to Kentik API | 60 seconds |
| DEFAULT_TTL| no | Default cache entry lifetime | 300 seconds |
| STATUS_POLL_PERIOD | no | Interval for polling for completion of cache entries | 3 seconds |
| ENTRY_WAIT_TIMEOUT | no | Timeout for cache entry to become active | retries * timeout + 5 seconds |
| CACHE_PATH | yes | Directory for storing cached content | |
| CACHE_MAINTENANCE_PERIOD | no | Interval for periodic cache pruning | 60 seconds |

Expand Down Expand Up @@ -61,9 +61,13 @@ On GET request to the _/image/<id>_ end-point, the Image Cache:
if found
if expired
return 404 to the client
while entry is pending
wait for status_poll_period
return 200 and image data to the client
if pending
return 200 and image data to the client
wait for entry to become active
if active
return 200 and image data to the client
else
return 500 error to the client
else
return 404 error to the client
```
Expand Down Expand Up @@ -97,17 +101,19 @@ The bellow procedure assumes:
- User running the procedure has sufficient permissions, including communication with the docker server

#### Clone repo to local disk:
`git clone https://github.com/kentik/kentik_image_cache.git`
```shell
git clone https://github.com/kentik/kentik_image_cache.git
```

#### Create docker image
```
```shell
cd kentik_image_cache
docker build -t kentik_image_cache .
```
#### Create cache directory (to be mounted in the container)
The directory should be located on filesystem with enough of disk space and must be readable and writeable
to the user with whose identity Docker containers are executed.
```
```shell
mkdir -p /opt/kentik_image_cache
chown root:docker /opt/kentik_image_cache
```
Expand All @@ -118,15 +124,15 @@ The bellow procedure passes Kentik authentication to the container via environme
- _<kentik_api_token>_ has to be replaced with API token of that user.

**Access to Kentik API must be allowed from the external IP address of the Docker container**.
```
```shell
docker run -d --name kentik_image_cache \
--env KT_AUTH_EMAIL=<kentik_user_mail> \
--env KT_AUTH_TOKEN=<kentik_api_token> \
-v /opt/kentik_image_cache:/cache -p 80:80 kentik_image_cache
```

Credentials and other configuration information can be also provided via environment file:
```
```shell
echo "KT_AUTH_EMAIL=<kentik_user_mail>" > .env
echo "KT_AUTH_TOKEN=<kentik_api_token>" >> .env
docker run -d --name kentik_image_cache --env-file .env \
Expand All @@ -139,38 +145,123 @@ The bellow procedure assumes:
- Python 3.8 or newer installed as `python3`

#### Clone repo to local disk:
`git clone https://github.com/kentik/kentik_image_cache.git`
```shell
git clone https://github.com/kentik/kentik_image_cache.git
```

#### Create virtual environment
```
```shell
cd kentik_image_cache
python3 -m venv venv
```

#### Install dependencies
```
```shell
venv/bin/pip3 install -r requirements.txt
```

#### Create cache directory
_Note_: the cache directory **must not** be in the repo tree in order for the `uvicorn --reload` feature to work correctly.
```
```shell
mkdir /tmp/cache
```

#### Create environment file with Kentik credentials
```
```shell
echo "KT_AUTH_EMAIL=<kentik_user_mail>" > .env
echo "KT_AUTH_TOKEN=<kentik_api_token>" >> .env
echo "CACHE_PATH=/tmp/cache" >> .env
```

#### Start the server with debug messages enabled and in self-reload mode
```
```shell
DEBUG=1 venv/bin/uvicorn app.main:app --reload
```

#### Test access
- API spec and tester: http://127.0.0.1:8000/docs
- Cache content info: http://127.0.0.1:8000/info

## Testing

Simple integration test script `tests/run_tests.py` can be used to test a fully configured instance
of the image cache (it must have nertwork access and valid credentials to access Kentik API).
The test script (by default) uses requests stored in `tests/data`.

Example of running all test requests concurrently against URL `http://127.0.0.1:8000`:

```shell
tests/run_tests.py --concurrent --url http://127.0.0.1:8000
2021-05-27 23:00:53 Using URL: http://127.0.0.1:8000
2021-05-27 23:00:53 tid: 0 loading request from: tests/data/bad_query.json
2021-05-27 23:00:53 tid: 1 loading request from: tests/data/example_query_1.json
2021-05-27 23:00:53 tid: 2 loading request from: tests/data/example_query_2.json
2021-05-27 23:00:53 tid: 3 loading request from: tests/data/example_query_3.json
2021-05-27 23:00:53 tid: 4 loading request from: tests/data/example_query_4.json
2021-05-27 23:00:53 5 requests loaded
2021-05-27 23:00:53 Running tests concurrently
2021-05-27 23:00:53 tid: 0 posting request
2021-05-27 23:00:53 tid: 1 posting request
2021-05-27 23:00:53 tid: 2 posting request
2021-05-27 23:00:53 tid: 3 posting request
2021-05-27 23:00:53 tid: 4 posting request
2021-05-27 23:00:53 tid: 0: got id: 53693a077a6e1eec6407df161ae964d34170a7743e67488b0ece175005ac69aa_1622181773.873009
2021-05-27 23:00:53 tid: 2: got id: d8e8c3038d31e454971f6a067a4190d9a34f9d1a8415ff3260939a0ca53e6436_1622181953.864447
2021-05-27 23:00:53 tid: 3: got id: 4f2278d1a4d4d5a202a9972027d05e9ac13032b00282b5c2b67a79b60002cb7c_1622181773.848177
2021-05-27 23:00:53 tid: 3 requesting: http://127.0.0.1:8000/image/4f2278d1a4d4d5a202a9972027d05e9ac13032b00282b5c2b67a79b60002cb7c_1622181773.848177
2021-05-27 23:00:53 tid: 0 requesting: http://127.0.0.1:8000/image/53693a077a6e1eec6407df161ae964d34170a7743e67488b0ece175005ac69aa_1622181773.873009
2021-05-27 23:00:53 tid: 1: got id: fbdaa71df680024289416c97b392d55c2ce218e8a69054ff084f032bbfb3f867_1622182253.88189
2021-05-27 23:00:53 tid: 1 requesting: http://127.0.0.1:8000/image/fbdaa71df680024289416c97b392d55c2ce218e8a69054ff084f032bbfb3f867_1622182253.88189
2021-05-27 23:00:53 tid: 2 requesting: http://127.0.0.1:8000/image/d8e8c3038d31e454971f6a067a4190d9a34f9d1a8415ff3260939a0ca53e6436_1622181953.864447
2021-05-27 23:00:53 tid: 4: got id: 5d8ed88d722d32ca3b42883671a27a7cbd76140bf8b154dc471936b291118d1c_1622181713.891764
2021-05-27 23:00:53 tid: 4 requesting: http://127.0.0.1:8000/image/5d8ed88d722d32ca3b42883671a27a7cbd76140bf8b154dc471936b291118d1c_1622181713.891764
2021-05-27 23:00:54 tid: 0 got: status: 400 type: application/json length: 194
2021-05-27 23:01:01 tid: 1 got: status: 200 type: image/png length: 61117
2021-05-27 23:01:01 tid: 2 got: status: 200 type: image/png length: 71561
2021-05-27 23:01:02 tid: 3 got: status: 200 type: image/png length: 147938
2021-05-27 23:01:04 tid: 4 got: status: 200 type: application/pdf length: 73343
```

Example of running all tests matching `example_*.json` sequentially against `http://127.0.0.1:8000`:

```shell
tests/run_tests.py --glob 'example_*.json' --url http://127.0.0.1:8000
2021-05-27 22:59:21 Using URL: http://127.0.0.1:8000
2021-05-27 22:59:21 tid: 0 loading request from: tests/data/example_query_1.json
2021-05-27 22:59:21 tid: 1 loading request from: tests/data/example_query_2.json
2021-05-27 22:59:21 tid: 2 loading request from: tests/data/example_query_3.json
2021-05-27 22:59:21 tid: 3 loading request from: tests/data/example_query_4.json
2021-05-27 22:59:21 4 requests loaded
2021-05-27 22:59:21 Running tests
2021-05-27 22:59:21 tid: 0 posting request
2021-05-27 22:59:22 tid: 0: got id: fbdaa71df680024289416c97b392d55c2ce218e8a69054ff084f032bbfb3f867_1622182162.004539
2021-05-27 22:59:22 tid: 0 requesting: http://127.0.0.1:8000/image/fbdaa71df680024289416c97b392d55c2ce218e8a69054ff084f032bbfb3f867_1622182162.004539
2021-05-27 22:59:28 tid: 0 got: status: 200 type: image/png length: 60706
2021-05-27 22:59:28 tid: 1 posting request
2021-05-27 22:59:28 tid: 1: got id: d8e8c3038d31e454971f6a067a4190d9a34f9d1a8415ff3260939a0ca53e6436_1622181868.80616
2021-05-27 22:59:28 tid: 1 requesting: http://127.0.0.1:8000/image/d8e8c3038d31e454971f6a067a4190d9a34f9d1a8415ff3260939a0ca53e6436_1622181868.80616
2021-05-27 22:59:37 tid: 1 got: status: 200 type: image/png length: 72078
2021-05-27 22:59:37 tid: 2 posting request
2021-05-27 22:59:37 tid: 2: got id: 4f2278d1a4d4d5a202a9972027d05e9ac13032b00282b5c2b67a79b60002cb7c_1622181697.10503
2021-05-27 22:59:37 tid: 2 requesting: http://127.0.0.1:8000/image/4f2278d1a4d4d5a202a9972027d05e9ac13032b00282b5c2b67a79b60002cb7c_1622181697.10503
2021-05-27 22:59:46 tid: 2 got: status: 200 type: image/png length: 146699
2021-05-27 22:59:46 tid: 3 posting request
2021-05-27 22:59:46 tid: 3: got id: 5d8ed88d722d32ca3b42883671a27a7cbd76140bf8b154dc471936b291118d1c_1622181646.687514
2021-05-27 22:59:46 tid: 3 requesting: http://127.0.0.1:8000/image/5d8ed88d722d32ca3b42883671a27a7cbd76140bf8b154dc471936b291118d1c_1622181646.687514
2021-05-27 22:59:56 tid: 3 got: status: 200 type: application/pdf length: 72974
```

Full usage help:
```
tests/run_tests.py --help
Usage: run_tests.py [OPTIONS]
Options:
--url TEXT URL to test against [default: http://127.0.0.1]
--dir TEXT Directory to load requests from [default: tests/data]
--glob TEXT Globbing pattern for request files [default: *.json]
--concurrent Run requests concurrently [default: False]
--help Show this message and exit.
```

_Note_: Full unit test suite is in development.
Empty file added app/__init__.py
Empty file.
74 changes: 37 additions & 37 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Settings(BaseSettings):
kentik_api_url: str = "https://api.kentik.com/api/v5"
kentik_api_retries: int = 3
kentik_api_timeout: int = 60 # seconds
status_poll_period: int = 3 # seconds
entry_wait_timeout: int = kentik_api_retries * kentik_api_timeout + 5 # seconds
default_ttl: int = 300 # seconds
cache_path: str = "cache"
cache_maintenance_period: int = 60 # seconds
Expand Down Expand Up @@ -85,7 +85,7 @@ class CacheInfo(BaseModel):


# Create object cache
cache = ObjectCache(Path(settings.cache_path).resolve())
cache = ObjectCache(Path(settings.cache_path).resolve(), entry_wait_timeout=settings.entry_wait_timeout)
# Create Kentik API client
retry_strategy = deepcopy(RetryableSession.DEFAULT_RETRY_STRATEGY)
retry_strategy.total = settings.kentik_api_retries
Expand Down Expand Up @@ -245,49 +245,49 @@ def get_image(image_id: str):
"""

log.info("GET image %s", image_id)
ts = expiration(image_id)
if ts is None:
log.info("GET %s: invalid ID", image_id)
if is_expired(image_id):
log.info("GET %s: entry is expired", image_id)
return JSONResponse(
content={"loc": image_id, "msg": "Invalid image ID", "type": "error"},
status_code=422,
content={"loc": image_id, "msg": "Image not found", "type": "error"},
status_code=404,
)
if ts < datetime.now(timezone.utc):
log.info("GET %s: expired ts: %s", image_id, ts.isoformat())
cache.wait_for(image_id)
entry = cache.get_entry(image_id)
if entry is None:
return JSONResponse(
content={"loc": image_id, "msg": "Image not found", "type": "error"},
status_code=404,
)
while True:
entry = cache.get_entry(image_id)
if entry is None:
if entry.status == EntryStatus.ACTIVE:
if entry.type == CacheEntryType.IMAGE:
img = pickle.loads(entry.data)
return Response(content=img.image_data, media_type=img_type_to_media(img.image_type))
if entry.type == CacheEntryType.ERROR_MSG:
d = json.loads(entry.data.decode())
return JSONResponse(
content={"loc": image_id, "msg": "Image not found", "type": "error"},
status_code=404,
)
if entry.status == EntryStatus.ACTIVE:
if entry.type == CacheEntryType.IMAGE:
img = pickle.loads(entry.data)
return Response(content=img.image_data, media_type=img_type_to_media(img.image_type))
if entry.type == CacheEntryType.ERROR_MSG:
d = json.loads(entry.data.decode())
return JSONResponse(
content={"loc": image_id, "msg": d["msgs"], "type": "Kentik API error"},
status_code=d["status_code"],
)
if entry.status == EntryStatus.PENDING:
log.debug("GET %s: still pending", image_id)
sleep(settings.status_poll_period)
else:
log.error("GET %s: unknown entry status: %s", image_id, entry.status.value)
return JSONResponse(
content={
"loc": image_id,
"msg": f"Internal error (unknown entry status: {entry.status.value})",
"type": "error",
},
status_code=500,
content={"loc": image_id, "msg": d["msgs"], "type": "Kentik API error"},
status_code=d["status_code"],
)
if entry.status == EntryStatus.PENDING:
log.error("GET %s: got pending entry", image_id)
return JSONResponse(
content={
"loc": image_id,
"msg": f"Internal error (entry status: {entry.status.value})",
"type": "error",
},
status_code=500,
)
else:
log.error("GET %s: unknown entry status: %s", image_id, entry.status.value)
return JSONResponse(
content={
"loc": image_id,
"msg": f"Internal error (unknown entry status: {entry.status.value})",
"type": "error",
},
status_code=500,
)


def entry_info(entry: CacheEntry) -> CacheEntryInfo:
Expand Down
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[mypy]
ignore_missing_imports = True
exclude = build/
26 changes: 24 additions & 2 deletions object_cache/local_file_cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from pathlib import Path
from typing import Any, Callable, Optional
from threading import Event
from typing import Any, Callable, Dict, Optional

from .cache_entry import CacheEntry, CacheEntryType
from .types import ActivationStatus, CreationStatus, EntryStatus
Expand All @@ -23,21 +24,36 @@ class ObjectCache:
The cache does not parse or use content of cached data.
"""

def __init__(self, base_dir: Path) -> None:
def __init__(self, base_dir: Path, entry_wait_timeout: int) -> None:
if not base_dir.is_dir():
raise RuntimeError(f"Invalid cache base directory {base_dir}: not a directory")
d = base_dir.resolve()
self._active_dir = base_dir.joinpath("active")
self._pending_dir = base_dir.joinpath("pending")
self._active_dir.mkdir(exist_ok=True)
self._pending_dir.mkdir(exist_ok=True)
self._events: Dict[str, Event] = dict()
self._entry_wait_timeout = entry_wait_timeout
log.debug("New ObjectCache: base_dir: %s", d)
log.debug(
"active: %d entries, pending: %d entries",
self.active_count,
self.pending_count,
)

def wait_for(self, entry_id: str) -> None:
"""
Wait for entry to become active
Execution of the calling thread blocks until the entry becomes active or until timeout
If the event object does not exist, the method returns immediately
"""
try:
log.debug("Waiting for id: %s", entry_id)
self._events[entry_id].wait(self._entry_wait_timeout)
log.debug("Entry id: %s is active", entry_id)
except KeyError:
log.error("No event for id: %s", entry_id)

def get_entry(self, entry_id: str) -> Optional[CacheEntry]:
"""
Locate file matching entry_id if found return CacheEntry object, otherwise None.
Expand Down Expand Up @@ -80,6 +96,7 @@ def create_entry(self, entry_id: str, entry_type: CacheEntryType, data: Any) ->
)
return CreationStatus.EXISTING
else:
self._events[entry_id] = Event()
CacheEntry(EntryStatus.PENDING, path=self._pending_dir.joinpath(entry_id)).write(entry_type, data)
return CreationStatus.CREATED

Expand All @@ -99,6 +116,7 @@ def activate_entry(self, entry_id: str, entry_type: CacheEntryType, data: Any) -
else:
entry.write(entry_type, data)
entry.rename(self._active_dir.joinpath(entry_id))
self._events[entry_id].set()
return ActivationStatus.SUCCESS

def prune(self, is_expired: Callable[[str], bool]):
Expand All @@ -123,6 +141,10 @@ def prune(self, is_expired: Callable[[str], bool]):
to_remove.append(e)
for e in to_remove:
log.info("removing %s", e)
try:
del self._events[e.name]
except KeyError as exc:
log.debug("Event was missing for %s", str(exc))
e.unlink()
log.debug(
"cache pruning complete: active: %d, pending: %d",
Expand Down
2 changes: 1 addition & 1 deletion tests/data/example_query_1.json
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,5 @@
],
"imageType": "png"
},
"ttl": 3600
"ttl": 600
}
3 changes: 1 addition & 2 deletions tests/data/example_query_2.json
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,5 @@
}
],
"imageType": "png"
},
"ttl": 3600
}
}
Loading

0 comments on commit f9e1c96

Please sign in to comment.