Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cs3: fixed lock and error handling on writefile #139

Merged
merged 7 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions src/core/cs3iface.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ def setxattr(endpoint, filepath, userid, key, value, lockmd):
_, lockid = lockmd
req = cs3sp.SetArbitraryMetadataRequest(ref=ref, arbitrary_metadata=md, lock_id=lockid)
res = ctx['cs3gw'].SetArbitraryMetadata(request=req, metadata=[('x-access-token', userid)])
if res.status.code in [cs3code.CODE_FAILED_PRECONDITION, cs3code.CODE_ABORTED]:
# CS3 storages may refuse to set an xattr in case of lock mismatch: this is an overprotection,
# as the lock should concern the file's content, not its metadata, however we need to handle that
log.info('msg="Failed precondition on setxattr" filepath="%s" key="%s" trace="%s" reason="%s"' %
(filepath, key, res.status.trace, res.status.message.replace('"', "'")))
raise IOError(common.EXCL_ERROR)
if res.status.code != cs3code.CODE_OK:
log.error('msg="Failed to setxattr" filepath="%s" key="%s" trace="%s" code="%s" reason="%s"' %
(filepath, key, res.status.trace, res.status.code, res.status.message.replace('"', "'")))
Expand Down Expand Up @@ -224,6 +230,10 @@ def rmxattr(endpoint, filepath, userid, key, lockmd):
_, lockid = lockmd
req = cs3sp.UnsetArbitraryMetadataRequest(ref=ref, arbitrary_metadata_keys=[key], lock_id=lockid)
res = ctx['cs3gw'].UnsetArbitraryMetadata(request=req, metadata=[('x-access-token', userid)])
if res.status.code in [cs3code.CODE_FAILED_PRECONDITION, cs3code.CODE_ABORTED]:
log.info('msg="Failed precondition on rmxattr" filepath="%s" key="%s" trace="%s" reason="%s"' %
(filepath, key, res.status.trace, res.status.message.replace('"', "'")))
raise IOError(common.EXCL_ERROR)
if res.status.code != cs3code.CODE_OK:
log.error('msg="Failed to rmxattr" filepath="%s" trace="%s" key="%s" reason="%s"' %
(filepath, key, res.status.trace, res.status.message.replace('"', "'")))
Expand Down Expand Up @@ -409,8 +419,8 @@ def readfile(endpoint, filepath, userid, lockid):
try:
protocol = [p for p in res.protocols if p.protocol in ["simple", "spaces"]][0]
headers = {
'x-access-token': userid,
'x-reva-transfer': protocol.token # needed if the downloads pass through the data gateway in reva
'X-Access-Token': userid,
'X-Reva-Transfer': protocol.token
}
fileget = requests.get(url=protocol.download_endpoint, headers=headers, verify=ctx['ssl_verify'], timeout=10, stream=True)
except requests.exceptions.RequestException as e:
Expand All @@ -432,28 +442,30 @@ def writefile(endpoint, filepath, userid, content, size, lockmd, islock=False):
and any pre-existing file is deleted (or moved to the previous version if supported).
The islock flag is currently not supported. The backend should at least support
writing the file with O_CREAT|O_EXCL flags to prevent races.'''
tstart = time.time()
if islock:
log.warning('msg="Lock (no-overwrite) flag not supported, going for standard upload"')
tstart = time.time()

# prepare endpoint
if lockmd:
_, lockid = lockmd # TODO we are not validating the holder on write, only the lock_id
appname, lockid = lockmd
else:
lockid = None
appname = lockid = ''

# prepare endpoint
if size == -1:
if isinstance(content, str):
content = bytes(content, 'UTF-8')
size = len(content)
reference = _getcs3reference(endpoint, filepath)
req = cs3sp.InitiateFileUploadRequest(ref=reference, lock_id=lockid, opaque=types.Opaque(
map={"Upload-Length": types.OpaqueEntry(decoder="plain", value=str.encode(str(size)))}))
map={'Upload-Length': types.OpaqueEntry(decoder='plain', value=str.encode(str(size)))}))
res = ctx['cs3gw'].InitiateFileUpload(request=req, metadata=[('x-access-token', userid)])
if res.status.code == cs3code.CODE_FAILED_PRECONDITION:
log.info('msg="Lock mismatch uploading file" filepath="%s" reason="%s"' %
(filepath, res.status.message.replace('"', "'")))
raise IOError(common.EXCL_ERROR)
if res.status.code != cs3code.CODE_OK:
log.error('msg="Failed to initiateFileUpload on write" filepath="%s" trace="%s" code="%s" reason="%s"' %
(filepath, res.status.trace, res.status.code, res.status.message.replace('"', "'")))
if '_lock_' in res.status.message: # TODO find the error code returned by Reva once this is implemented
raise IOError(common.EXCL_ERROR)
raise IOError(res.status.message)
tend = time.time()
log.debug('msg="writefile: InitiateFileUploadRes returned" trace="%s" protocols="%s"' %
Expand All @@ -463,16 +475,21 @@ def writefile(endpoint, filepath, userid, content, size, lockmd, islock=False):
try:
protocol = [p for p in res.protocols if p.protocol in ["simple", "spaces"]][0]
headers = {
'x-access-token': userid,
'X-Access-Token': userid,
'Upload-Length': str(size),
'x-reva-transfer': protocol.token # needed if the uploads pass through the data gateway in reva
'X-Reva-Transfer': protocol.token,
'X-Lock-Id': lockid,
'X-Lock-Holder': appname,
}
putres = requests.put(url=protocol.upload_endpoint, data=content, headers=headers, verify=ctx['ssl_verify'], timeout=10)
except requests.exceptions.RequestException as e:
log.error(f'msg="Exception when uploading file to Reva" reason="{e}"')
raise IOError(e) from e
if putres.status_code == http.client.CONFLICT:
log.info(f'msg="Got conflict on PUT, file is locked" reason="{putres.reason}" filepath="{filepath}"')
raise IOError(common.EXCL_ERROR)
if putres.status_code == http.client.UNAUTHORIZED:
log.warning(f'msg="Access denied uploading file to Reva" reason="{putres.reason}"')
log.warning(f'msg="Access denied uploading file to Reva" reason="{putres.reason}" filepath="{filepath}"')
raise IOError(common.ACCESS_ERROR)
if putres.status_code != http.client.OK:
if size == 0: # 0-byte file uploads may have been finalized after InitiateFileUploadRequest, let's assume it's OK
Expand Down
7 changes: 0 additions & 7 deletions src/core/localiface.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,6 @@ def _validatelock(filepath, currlock, lockmd, op, log):

def setxattr(endpoint, filepath, userid, key, value, lockmd):
'''Set the extended attribute <key> to <value> on behalf of the given userid'''
if key != common.LOCKKEY:
currlock = getlock(endpoint, filepath, userid)
if currlock:
# enforce lock only if previously set
_validatelock(filepath, currlock, lockmd, 'setxattr', log)
try:
os.setxattr(_getfilepath(filepath), 'user.' + key, str(value).encode())
except OSError as e:
Expand All @@ -163,8 +158,6 @@ def getxattr(_endpoint, filepath, _userid, key):

def rmxattr(endpoint, filepath, userid, key, lockmd):
'''Remove the extended attribute <key> on behalf of the given userid'''
if key != common.LOCKKEY:
_validatelock(filepath, getlock(endpoint, filepath, userid), lockmd, 'rmxattr', log)
try:
os.removexattr(_getfilepath(filepath), 'user.' + key)
except OSError as e:
Expand Down
9 changes: 2 additions & 7 deletions src/core/xrootiface.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,17 +298,12 @@ def statx(endpoint, fileref, userid, versioninv=1):
}


def setxattr(endpoint, filepath, userid, key, value, lockmd):
def setxattr(endpoint, filepath, _userid, key, value, lockmd):
'''Set the extended attribute <key> to <value> via a special open.
The userid is overridden to make sure it also works on shared files.'''
appname = 'wopi'
lockid = None
if lockmd:
appname, lockid = lockmd
if key not in (EOSLOCKKEY, common.LOCKKEY):
currlock = getlock(endpoint, filepath, userid)
if currlock and currlock['lock_id'] != lockid:
raise IOError(common.EXCL_ERROR)
appname, _ = lockmd
if 'user' not in key and 'sys' not in key:
# if nothing is given, assume it's a user attr
key = 'user.' + key
Expand Down
49 changes: 34 additions & 15 deletions test/test_storageiface.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ class TestStorage(unittest.TestCase):
'''Simple tests for the storage layers of the WOPI server. See README for how to run the tests for each storage provider'''
initialized = False
storagetype = None
log = None

@classmethod
def globalinit(cls):
'''One-off initialization of the test environment: create mock logging and import the library'''
loghandler = logging.FileHandler('/tmp/wopiserver-test.log')
loghandler.setFormatter(logging.Formatter(fmt='%(asctime)s %(name)s[%(process)d] %(levelname)-8s %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S'))
log = logging.getLogger('wopiserver.test')
log.addHandler(loghandler)
log.setLevel(logging.DEBUG)
cls.log = logging.getLogger('wopiserver.test')
cls.log.addHandler(loghandler)
cls.log.setLevel(logging.DEBUG)
config = configparser.ConfigParser()
try:
with open('test/wopiserver-test.conf') as fdconf:
Expand All @@ -58,7 +59,7 @@ def globalinit(cls):
raise ImportError(f'Unsupported/Unknown storage type {storagetype}')
try:
cls.storage = __import__('core.' + storagetype, globals(), locals(), [storagetype])
cls.storage.init(config, log)
cls.storage.init(config, cls.log)
cls.homepath = ''
cls.username = ''
if 'cs3' in storagetype:
Expand All @@ -75,14 +76,15 @@ def globalinit(cls):

def __init__(self, *args, **kwargs):
'''Initialization of a test'''
super(TestStorage, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
if not TestStorage.initialized:
TestStorage.globalinit()
self.userid = TestStorage.userid
self.endpoint = TestStorage.endpoint
self.storage = TestStorage.storage
self.homepath = TestStorage.homepath
self.username = TestStorage.username
self.log = TestStorage.log

def test_stat(self):
'''Call stat() and assert the path matches'''
Expand Down Expand Up @@ -177,6 +179,9 @@ def test_write_remove_specialchars(self):

def test_write_islock(self):
'''Test double write with the islock flag'''
if self.storagetype == 'cs3':
self.log.warn('Skipping test_write_islock for storagetype cs3')
return
try:
self.storage.removefile(self.endpoint, self.homepath + '/testoverwrite', self.userid)
except IOError:
Expand All @@ -191,6 +196,9 @@ def test_write_islock(self):

def test_write_race(self):
'''Test multithreaded double write with the islock flag. Might fail as it relies on tight timing'''
if self.storagetype == 'cs3':
self.log.warn('Skipping test_write_race for storagetype cs3')
return
try:
self.storage.removefile(self.endpoint, self.homepath + '/testwriterace', self.userid)
except IOError:
Expand Down Expand Up @@ -223,7 +231,7 @@ def test_lock(self):
self.assertIsInstance(l['expiration'], dict)
self.assertIsInstance(l['expiration']['seconds'], int)
with self.assertRaises(IOError) as context:
self.storage.setlock(self.endpoint, self.homepath + '/testlock', self.userid, 'test app', 'lockmismatch')
self.storage.setlock(self.endpoint, self.homepath + '/testlock', self.userid, 'mismatched app', 'mismatchlock')
self.assertIn(EXCL_ERROR, str(context.exception))
self.storage.unlock(self.endpoint, self.homepath + '/testlock', self.userid, 'test app', 'testlock')
self.storage.removefile(self.endpoint, self.homepath + '/testlock', self.userid)
Expand Down Expand Up @@ -271,7 +279,7 @@ def test_lock_race(self):
t.start()
with self.assertRaises(IOError) as context:
time.sleep(0.001)
self.storage.setlock(self.endpoint, self.homepath + '/testlockrace', self.userid, 'test app', 'testlock2')
self.storage.setlock(self.endpoint, self.homepath + '/testlockrace', self.userid, 'test app 2', 'testlock2')
self.assertIn(EXCL_ERROR, str(context.exception))
self.storage.removefile(self.endpoint, self.homepath + '/testlockrace', self.userid)

Expand All @@ -291,14 +299,25 @@ def test_lock_operations(self):
# this is why we test that both mismatch. Could be improved, though we specifically care about the lock paylaod.
self.storage.writefile(self.endpoint, self.homepath + '/testlockop', self.userid, databuf, -1,
('mismatch app', 'mismatchlock'))
with self.assertRaises(IOError):
# same as above
# with xattrs, it's fine to set them without lock context (the lock should apply to files' contents only)
# BUT the CS3 API does have the possibility to fail a setxattr in case of lock mismatch, so we allow that
# (cf. https://buf.build/cs3org-buf/cs3apis/docs/main:cs3.storage.provider.v1beta1#cs3.storage.provider.v1beta1.ProviderAPI.SetArbitraryMetadata) # noqa: E501
try:
self.storage.setxattr(self.endpoint, self.homepath + '/testlockop', self.userid, 'testkey', 123,
('mismatch app', 'mismatchlock'))
with self.assertRaises(IOError):
except IOError as e:
if str(e) == EXCL_ERROR:
pass
try:
self.storage.setxattr(self.endpoint, self.homepath + '/testlockop', self.userid, 'testkey', 123, None)
with self.assertRaises(IOError):
except IOError as e:
if str(e) == EXCL_ERROR:
pass
try:
self.storage.rmxattr(self.endpoint, self.homepath + '/testlockop', self.userid, 'testkey', None)
except IOError as e:
if str(e) == EXCL_ERROR:
pass
self.storage.refreshlock(self.endpoint, self.homepath + '/testlockop', self.userid, 'test app', 'testlock')
with self.assertRaises(IOError):
self.storage.refreshlock(self.endpoint, self.homepath + '/testlockop', self.userid, 'mismatched app', 'mismatchlock')
Expand All @@ -320,21 +339,21 @@ def test_expired_locks(self):
statInfo = self.storage.stat(self.endpoint, self.homepath + '/testelock', self.userid)
self.assertIsInstance(statInfo, dict)
self.storage.setlock(self.endpoint, self.homepath + '/testelock', self.userid, 'test app', 'testlock')
time.sleep(2.1)
time.sleep(3.1)
l = self.storage.getlock(self.endpoint, self.homepath + '/testelock', self.userid) # noqa: E741
self.assertEqual(l, None)
self.storage.setlock(self.endpoint, self.homepath + '/testelock', self.userid, 'test app', 'testlock2')
time.sleep(2.1)
time.sleep(3.1)
self.storage.setlock(self.endpoint, self.homepath + '/testelock', self.userid, 'test app', 'testlock3')
l = self.storage.getlock(self.endpoint, self.homepath + '/testelock', self.userid) # noqa: E741
self.assertIsInstance(l, dict)
self.assertEqual(l['lock_id'], 'testlock3')
time.sleep(2.1)
time.sleep(3.1)
with self.assertRaises(IOError) as context:
self.storage.refreshlock(self.endpoint, self.homepath + '/testelock', self.userid, 'test app', 'testlock4')
self.assertEqual(EXCL_ERROR, str(context.exception))
self.storage.setlock(self.endpoint, self.homepath + '/testelock', self.userid, 'test app', 'testlock5')
time.sleep(2.1)
time.sleep(3.1)
with self.assertRaises(IOError) as context:
self.storage.unlock(self.endpoint, self.homepath + '/testelock', self.userid, 'test app', 'testlock5')
self.assertEqual(EXCL_ERROR, str(context.exception))
Expand Down
2 changes: 1 addition & 1 deletion test/wopiserver-test.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[general]
storagetype = local
port = 8880
wopilockexpiration = 2
wopilockexpiration = 3

[security]
usehttps = no
Expand Down
Loading