Skip to content

Commit

Permalink
Merge pull request #1620 from emilyanndavis/bugfix/1599-prevent-reado…
Browse files Browse the repository at this point in the history
…nly-workspace-dir

Updates directory permissions validation to use an EAFP approach
  • Loading branch information
davemfish authored Sep 5, 2024
2 parents f18b211 + 210b505 commit 4b39db1
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 40 deletions.
1 change: 1 addition & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Unreleased Changes
------------------
* Workbench
* Several small updates to the model input form UI to improve usability and visual consistency (https://github.com/natcap/invest/issues/912)
* Fixed a bug that was allowing readonly workspace directories on Windows (https://github.com/natcap/invest/issues/1599)

3.14.2 (2024-05-29)
-------------------
Expand Down
68 changes: 33 additions & 35 deletions src/natcap/invest/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,8 @@ def check_directory(dirpath, must_exist=True, permissions='rx', **kwargs):
must_exist=True (bool): If ``True``, the directory at ``dirpath``
must already exist on the filesystem.
permissions='rx' (string): A string that includes the lowercase
characters ``r``, ``w`` and/or ``x`` indicating required
permissions for this folder . See ``check_permissions`` for
details.
characters ``r``, ``w`` and/or ``x``, indicating read, write, and
execute permissions (respectively) required for this directory.
Returns:
A string error message if an error was found. ``None`` otherwise.
Expand All @@ -193,9 +192,33 @@ def check_directory(dirpath, must_exist=True, permissions='rx', **kwargs):
dirpath = parent
break

permissions_warning = check_permissions(dirpath, permissions, True)
if permissions_warning:
return permissions_warning
MESSAGE_KEY = 'NEED_PERMISSION_DIRECTORY'

if 'r' in permissions:
try:
os.scandir(dirpath).close()
except OSError:
return MESSAGES[MESSAGE_KEY].format(permission='read')

# Check for x access before checking for w,
# since w operations to a dir are dependent on x access
if 'x' in permissions:
try:
cwd = os.getcwd()
os.chdir(dirpath)
except OSError:
return MESSAGES[MESSAGE_KEY].format(permission='execute')
finally:
os.chdir(cwd)

if 'w' in permissions:
try:
temp_path = os.path.join(dirpath, 'temp__workspace_validation.txt')
with open(temp_path, 'w') as temp:
temp.close()
os.remove(temp_path)
except OSError:
return MESSAGES[MESSAGE_KEY].format(permission='write')


def check_file(filepath, permissions='r', **kwargs):
Expand All @@ -204,9 +227,8 @@ def check_file(filepath, permissions='r', **kwargs):
Args:
filepath (string): The filepath to validate.
permissions='r' (string): A string that includes the lowercase
characters ``r``, ``w`` and/or ``x`` indicating required
permissions for this file. See ``check_permissions`` for
details.
characters ``r``, ``w`` and/or ``x``, indicating read, write, and
execute permissions (respectively) required for this file.
Returns:
A string error message if an error was found. ``None`` otherwise.
Expand All @@ -215,36 +237,12 @@ def check_file(filepath, permissions='r', **kwargs):
if not os.path.exists(filepath):
return MESSAGES['FILE_NOT_FOUND']

permissions_warning = check_permissions(filepath, permissions)
if permissions_warning:
return permissions_warning


def check_permissions(path, permissions, is_directory=False):
"""Validate permissions on a filesystem object.
This function uses ``os.access`` to determine permissions access.
Args:
path (string): The path to examine for permissions.
permissions (string): a string including the characters ``r``, ``w``
and/or ``x`` (lowercase), indicating read, write, and execute
permissions (respectively) that the filesystem object at ``path``
must have.
is_directory (boolean): Indicates whether the path refers to a directory
(True) or a file (False). Defaults to False.
Returns:
A string error message if an error was found. ``None`` otherwise.
"""
for letter, mode, descriptor in (
('r', os.R_OK, 'read'),
('w', os.W_OK, 'write'),
('x', os.X_OK, 'execute')):
if letter in permissions and not os.access(path, mode):
message_key = 'NEED_PERMISSION_DIRECTORY' if is_directory else 'NEED_PERMISSION_FILE'
return MESSAGES[message_key].format(permission=descriptor)
if letter in permissions and not os.access(filepath, mode):
return MESSAGES['NEED_PERMISSION_FILE'].format(permission=descriptor)


def _check_projection(srs, projected, projection_units):
Expand Down
92 changes: 87 additions & 5 deletions tests/test_validation.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
"""Testing module for validation."""
import codecs
import collections
import functools
import os
import platform
import shutil
import stat
import string
import sys
import tempfile
import textwrap
import time
Expand Down Expand Up @@ -330,6 +329,91 @@ def test_workspace_not_exists(self):
new_dir, must_exist=False, permissions='rwx'))


@unittest.skipIf(
sys.platform.startswith('win'),
'requires support for os.chmod(), which is unreliable on Windows')
class DirectoryValidationMacOnly(unittest.TestCase):
"""Test Directory Permissions Validation."""

def test_invalid_permissions_r(self):
"""Validation: when a folder must have read/write/execute
permissions but is missing write and execute permissions."""
from natcap.invest import validation

with tempfile.TemporaryDirectory() as tempdir:
os.chmod(tempdir, stat.S_IREAD)
validation_warning = validation.check_directory(tempdir,
permissions='rwx')
self.assertEqual(
validation_warning,
validation.MESSAGES['NEED_PERMISSION_DIRECTORY'].format(permission='execute'))

def test_invalid_permissions_w(self):
"""Validation: when a folder must have read/write/execute
permissions but is missing read and execute permissions."""
from natcap.invest import validation

with tempfile.TemporaryDirectory() as tempdir:
os.chmod(tempdir, stat.S_IWRITE)
validation_warning = validation.check_directory(tempdir,
permissions='rwx')
self.assertEqual(
validation_warning,
validation.MESSAGES['NEED_PERMISSION_DIRECTORY'].format(permission='read'))

def test_invalid_permissions_x(self):
"""Validation: when a folder must have read/write/execute
permissions but is missing read and write permissions."""
from natcap.invest import validation

with tempfile.TemporaryDirectory() as tempdir:
os.chmod(tempdir, stat.S_IEXEC)
validation_warning = validation.check_directory(tempdir,
permissions='rwx')
self.assertEqual(
validation_warning,
validation.MESSAGES['NEED_PERMISSION_DIRECTORY'].format(permission='read'))

def test_invalid_permissions_rw(self):
"""Validation: when a folder must have read/write/execute
permissions but is missing execute permission."""
from natcap.invest import validation

with tempfile.TemporaryDirectory() as tempdir:
os.chmod(tempdir, stat.S_IREAD | stat.S_IWRITE)
validation_warning = validation.check_directory(tempdir,
permissions='rwx')
self.assertEqual(
validation_warning,
validation.MESSAGES['NEED_PERMISSION_DIRECTORY'].format(permission='execute'))

def test_invalid_permissions_rx(self):
"""Validation: when a folder must have read/write/execute
permissions but is missing write permission."""
from natcap.invest import validation

with tempfile.TemporaryDirectory() as tempdir:
os.chmod(tempdir, stat.S_IREAD | stat.S_IEXEC)
validation_warning = validation.check_directory(tempdir,
permissions='rwx')
self.assertEqual(
validation_warning,
validation.MESSAGES['NEED_PERMISSION_DIRECTORY'].format(permission='write'))

def test_invalid_permissions_wx(self):
"""Validation: when a folder must have read/write/execute
permissions but is missing read permission."""
from natcap.invest import validation

with tempfile.TemporaryDirectory() as tempdir:
os.chmod(tempdir, stat.S_IWRITE | stat.S_IEXEC)
validation_warning = validation.check_directory(tempdir,
permissions='rwx')
self.assertEqual(
validation_warning,
validation.MESSAGES['NEED_PERMISSION_DIRECTORY'].format(permission='read'))


class FileValidation(unittest.TestCase):
"""Test File Validator."""

Expand Down Expand Up @@ -539,7 +623,6 @@ def test_vector_projected_in_m(self):

def test_wrong_geom_type(self):
"""Validation: checks that the vector's geometry type is correct."""
from natcap.invest import spec_utils
from natcap.invest import validation
driver = gdal.GetDriverByName('GPKG')
filepath = os.path.join(self.workspace_dir, 'vector.gpkg')
Expand Down Expand Up @@ -1369,7 +1452,6 @@ def test_csv_raster_validation_missing_file(self):
})
self.assertIn('File not found', str(cm.exception))


def test_csv_raster_validation_not_projected(self):
"""validation: validate unprojected raster within csv column"""
from natcap.invest import validation
Expand Down

0 comments on commit 4b39db1

Please sign in to comment.