Skip to content

Commit

Permalink
Merge pull request #3869 from open-formulieren/issue/3858-upload-perm…
Browse files Browse the repository at this point in the history
…ission-errors

Fix concurrency issue with bulk-uploads
  • Loading branch information
sergei-maertens authored Feb 12, 2024
2 parents dfecb2d + 8c3850d commit 776ff19
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 14 deletions.
6 changes: 4 additions & 2 deletions src/openforms/conf/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@
"IGNORE_EXCEPTIONS": True,
},
},
# TODO: rename to 'redis-locks' and get rid of portalocker in favour of plain
# redis locks?
"portalocker": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": f"redis://{config('CACHE_PORTALOCKER', 'localhost:6379/0')}",
Expand Down Expand Up @@ -380,7 +382,7 @@
"disable_existing_loggers": False,
"formatters": {
"verbose": {
"format": "%(asctime)s %(levelname)s %(name)s %(module)s %(process)d %(thread)d %(message)s"
"format": "%(asctime)s %(levelname)s %(name)s %(module)s P%(process)d/T%(thread)d | %(message)s"
},
"timestamped": {"format": "%(asctime)s %(levelname)s %(name)s %(message)s"},
"simple": {"format": "%(levelname)s %(message)s"},
Expand All @@ -405,7 +407,7 @@
"console": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"formatter": "timestamped",
"formatter": config("LOG_FORMAT_CONSOLE", default="timestamped"),
},
"django": {
"level": "DEBUG",
Expand Down
54 changes: 53 additions & 1 deletion src/openforms/formio/tests/test_api_fileupload.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os
import tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from unittest.mock import patch

from django.conf import settings
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import override_settings, tag
from django.utils.translation import gettext as _
Expand All @@ -11,7 +13,7 @@
from privates.test import temp_private_root
from rest_framework import status
from rest_framework.reverse import reverse
from rest_framework.test import APITestCase
from rest_framework.test import APITestCase, APITransactionTestCase

from openforms.config.models import GlobalConfiguration
from openforms.submissions.attachments import temporary_upload_from_url
Expand Down Expand Up @@ -344,3 +346,53 @@ def test_cannot_connect_to_clamdav(self, m_config):
tmpdir_contents = os.listdir(tmpdir)

self.assertEqual(0, len(tmpdir_contents))


@override_settings(
# Deliberately set to cache backend to not fall in the trap of using DB row-level
# locking. This also reflects how we deploy in prod.
SESSION_ENGINE="django.contrib.sessions.backends.cache",
SESSION_CACHE_ALIAS="session",
CACHES={
**settings.CACHES,
"session": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"},
},
)
class ConcurrentUploadTests(SubmissionsMixin, APITransactionTestCase):

@tag("gh-3858")
def test_concurrent_file_uploads(self):
submission = SubmissionFactory.from_components(
[
{
"type": "file",
"key": "file",
"label": "Some upload",
"multiple": True,
}
]
)
self._add_submission_to_session(submission)
endpoint = reverse("api:formio:temporary-file-upload")

def do_upload() -> str:
file = SimpleUploadedFile(
"my-file.txt", b"my content", content_type="text/plain"
)
response = self.client.post(endpoint, {"file": file}, format="multipart")
assert response.status_code == status.HTTP_200_OK
resp_data = response.json()
return resp_data["url"]

# do both uploads in parallel in their own thread
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(do_upload) for _ in range(0, 2)]
urls = [future.result() for future in as_completed(futures)]

uuids = {
url.removeprefix("http://testserver/api/v2/submissions/files/")
for url in urls
}

session_uuids = set(self.client.session[UPLOADS_SESSION_KEY])
self.assertEqual(session_uuids, uuids)
1 change: 0 additions & 1 deletion src/openforms/submissions/api/viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ def perform_create(self, serializer):

# store the submission ID in the session, so that only the session owner can
# mutate/view the submission
# note: possible race condition with concurrent requests
add_submmission_to_session(serializer.instance, self.request.session)

logevent.submission_start(serializer.instance)
Expand Down
108 changes: 98 additions & 10 deletions src/openforms/submissions/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
from contextlib import contextmanager
from typing import Any

from django.conf import settings
from django.contrib.sessions.backends.base import SessionBase
from django.core.cache import caches
from django.http import HttpRequest
from django.utils import translation

Expand Down Expand Up @@ -37,23 +39,109 @@

logger = logging.getLogger(__name__)

# with the interval of 0.1s, this gives us 2.0 / 0.1 = 20 concurrent requests,
# which is far above the typical browser concurrency mode (~6-8 requests).
SESSION_LOCK_TIMEOUT_SECONDS = 2.0


@contextmanager
def _session_lock(session: SessionBase, key: str):
"""
Helper to manage session data mutations for the specified key.
Concurrent session updates see stale data from when the request initially
got processed, so any added items from parallel requests is not taken into
account. This context manager refreshes the session data just-in-time and uses
a Redis distributed lock to synchronize access.
.. note:: this is pretty deep in Django internals, there doesn't appear to be a
public API for things like these :(
"""
# only existing session have an existing key. If this is a new session, it hasn't
# been persisted to the backend yet, so there is also no possible race condition.
is_new = session.session_key is None
if is_new:
yield
return

# See TODO in settings about renaming this cache
redis_cache = caches["portalocker"]

# make the lock tied to the session itself, so that we don't affect other people's
# sessions.
cache_key = f"django:session-update:{session.session_key}"

# this is... tricky. To ensure we aren't still operating on stale data, we refresh
# the session data after acquiring a lock so that we're the only one that will be
# writing to it.
#
# For the locking interface, see redis-py :meth:`redis.client.Redis.lock`

logger.debug("Acquiring session lock for session %s", session.session_key)
with redis_cache.lock(
cache_key,
# max lifetime for the lock itself, must always be provided in case something
# crashes and we fail to call release
timeout=SESSION_LOCK_TIMEOUT_SECONDS,
# wait rather than failing immediately, we are trying to handle parallel
# requests here. Can't explicitly specify this, see
# https://github.com/jazzband/django-redis/issues/596. redis-py default is True.
# blocking=True,
# how long we can try to acquire the lock
blocking_timeout=SESSION_LOCK_TIMEOUT_SECONDS,
):
logger.debug("Got session lock for session %s", session.session_key)
# nasty bit... the session itself can already be modified with *other*
# information that isn't relevant. So, we load the data from the storage again
# and only look at the provided key. If that one is different, we update our
# local data. We can not just reset to the result of session.load(), as that
# would discard modifications that should be persisted.
persisted_data = session.load()
if (data_slice := persisted_data.get(key)) != (current := session.get(key)):
logger.debug(
"Data from storage is different than what we currently have. "
"Session %s, key '%s' - in storage: %s, our view: %s",
session.session_key,
key,
data_slice,
current,
)
session[key] = data_slice
logger.debug(
"Updated key '%s' from storage for session %s", key, session.session_key
)

# execute the calling code and exit, clearing the lock.
yield

logger.debug(
"New session data for session %s is: %s",
session.session_key,
session._session,
)

# ensure we save in-between to persist the modifications, before the request
# may even be finished
session.save()
logger.debug("Saved session data for session %s", session.session_key)


def append_to_session_list(session: SessionBase, session_key: str, value: Any) -> None:
# note: possible race condition with concurrent requests
active = session.get(session_key, [])
if value not in active:
active.append(value)
session[session_key] = active
with _session_lock(session, session_key):
active = session.get(session_key, [])
if value not in active:
active.append(value)
session[session_key] = active


def remove_from_session_list(
session: SessionBase, session_key: str, value: Any
) -> None:
# note: possible race condition with concurrent requests
active = session.get(session_key, [])
if value in active:
active.remove(value)
session[session_key] = active
with _session_lock(session, session_key):
active = session.get(session_key, [])
if value in active:
active.remove(value)
session[session_key] = active


def add_submmission_to_session(submission: Submission, session: SessionBase) -> None:
Expand Down

0 comments on commit 776ff19

Please sign in to comment.