Skip to content

Commit

Permalink
Merge pull request #65 from uclahs-cds/nwiltsie-format-code
Browse files Browse the repository at this point in the history
Standardize code format with ruff
  • Loading branch information
nwiltsie authored Jun 3, 2024
2 parents 1375f5f + 4932f7c commit c9cbf2e
Show file tree
Hide file tree
Showing 18 changed files with 442 additions and 392 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Make `nftest run` exit with the number of failed tests
- Use `shell=False` for subprocess
- Capture Nextflow logs via syslog rather than console
- Format all code with ruff

### Fixed
- Make `nftest` with no arguments print usage and exit
Expand Down
66 changes: 36 additions & 30 deletions nftest/NFTestAssert.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
""" NF Test assert """
"""NF Test assert"""

import datetime
import errno
import os
import subprocess
from typing import Callable
from logging import getLogger, DEBUG

from nftest.common import calculate_checksum, resolve_single_path, \
popen_with_logger
from nftest.common import calculate_checksum, resolve_single_path, popen_with_logger
from nftest.NFTestENV import NFTestENV


class NFTestAssert():
""" Defines how nextflow test results are asserted. """
def __init__(self, actual:str, expect:str, method:str='md5', script:str=None):
""" Constructor """
class NFTestAssert:
"""Defines how nextflow test results are asserted."""

def __init__(
self, actual: str, expect: str, method: str = "md5", script: str = None
):
"""Constructor"""
self._env = NFTestENV()
self._logger = getLogger('NFTest')
self._logger = getLogger("NFTest")
self.actual = actual
self.expect = expect
self.method = method
Expand All @@ -25,68 +28,71 @@ def __init__(self, actual:str, expect:str, method:str='md5', script:str=None):
self.startup_time = datetime.datetime.now(tz=datetime.timezone.utc)

def identify_assertion_files(self) -> None:
""" Resolve actual and expected paths """
"""Resolve actual and expected paths"""
self.actual = resolve_single_path(self.actual)
self.expect = resolve_single_path(self.expect)

def assert_exists(self) -> None:
"Assert that the expected and actual files exist."
if not self.actual.exists():
self._logger.error('Actual file not found: %s', self.actual)
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT),
self.actual)
self._logger.error("Actual file not found: %s", self.actual)
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), self.actual
)

if not self.expect.exists():
self._logger.error('Expect file not found: %s', self.expect)
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT),
self.expect)
self._logger.error("Expect file not found: %s", self.expect)
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), self.expect
)

def assert_updated(self) -> None:
"Assert that the actual file was updated during this test run."
file_mod_time = datetime.datetime.fromtimestamp(
self.actual.stat().st_mtime,
tz=datetime.timezone.utc
self.actual.stat().st_mtime, tz=datetime.timezone.utc
)

self._logger.debug("Test creation time: %s", self.startup_time)
self._logger.debug("Actual mod time: %s", file_mod_time)
assert file_mod_time > self.startup_time, \
f"{str(self.actual)} was not modified by this pipeline"
assert (
file_mod_time > self.startup_time
), f"{str(self.actual)} was not modified by this pipeline"

def assert_expected(self) -> None:
"Assert the results match with the expected values."
assert_method = self.get_assert_method()
try:
assert assert_method(self.actual, self.expect)
self._logger.debug('Assertion passed')
self._logger.debug("Assertion passed")
except AssertionError as error:
self._logger.error('Assertion failed')
self._logger.error('Actual: %s', self.actual)
self._logger.error('Expect: %s', self.expect)
self._logger.error("Assertion failed")
self._logger.error("Actual: %s", self.actual)
self._logger.error("Expect: %s", self.expect)
raise error

def get_assert_method(self) -> Callable:
""" Get the assert method """
"""Get the assert method"""
# pylint: disable=E0102
if self.script is not None:

def func(actual, expect):
cmd = [self.script, actual, expect]
self._logger.debug(subprocess.list2cmdline(cmd))

process = popen_with_logger(
cmd,
logger=self._logger,
stdout_level=DEBUG
cmd, logger=self._logger, stdout_level=DEBUG
)
return process.returncode == 0

return func
if self.method == 'md5':
if self.method == "md5":

def func(actual, expect):
self._logger.debug("md5 %s %s", actual, expect)
actual_value = calculate_checksum(actual)
expect_value = calculate_checksum(expect)
return actual_value == expect_value

return func
self._logger.error('assert method %s unknown.', self.method)
raise ValueError(f'assert method {self.method} unknown.')
self._logger.error("assert method %s unknown.", self.method)
raise ValueError(f"assert method {self.method} unknown.")
87 changes: 50 additions & 37 deletions nftest/NFTestCase.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""" NF Test case """
"""NF Test case"""

from __future__ import annotations

import logging
Expand All @@ -23,23 +24,36 @@
from nftest.NFTestAssert import NFTestAssert


class NFTestCase():
""" Defines the NF test case """
class NFTestCase:
"""Defines the NF test case"""

# pylint: disable=R0902
# pylint: disable=R0913
def __init__(self, name:str=None, message:str=None, nf_script:str=None,
nf_configs:List[str]=None, profiles:List[str]=None, params_file:str=None,
reference_params:List[Tuple[str,str]]=None,
output_directory_param_name:str='output_dir',
asserts:List[NFTestAssert]=None, temp_dir:str=None,
remove_temp:bool=None, clean_logs:bool=None,
skip:bool=False, verbose:bool=False):
""" Constructor """
def __init__(
self,
name: str = None,
message: str = None,
nf_script: str = None,
nf_configs: List[str] = None,
profiles: List[str] = None,
params_file: str = None,
reference_params: List[Tuple[str, str]] = None,
output_directory_param_name: str = "output_dir",
asserts: List[NFTestAssert] = None,
temp_dir: str = None,
remove_temp: bool = None,
clean_logs: bool = None,
skip: bool = False,
verbose: bool = False,
):
"""Constructor"""
self._env = NFTestENV()
self._logger = logging.getLogger('NFTest')
self._logger = logging.getLogger("NFTest")
self._nflogger = logging.getLogger("console")
self.name = name
self.name_for_output = re.sub(r'[^a-zA-Z0-9_\-.]', '', self.name.replace(' ', '-'))
self.name_for_output = re.sub(
r"[^a-zA-Z0-9_\-.]", "", self.name.replace(" ", "-")
)
self.message = message
self.nf_script = nf_script
self.nf_configs = nf_configs or []
Expand All @@ -54,20 +68,22 @@ def __init__(self, name:str=None, message:str=None, nf_script:str=None,
self.skip = skip
self.verbose = verbose

def resolve_actual(self, asserts:List[NFTestAssert]=None):
""" Resolve the file path for actual file """
def resolve_actual(self, asserts: List[NFTestAssert] = None):
"""Resolve the file path for actual file"""
if not asserts:
return []

for assertion in asserts:
assertion.actual = str(
Path(self._env.NFT_OUTPUT)/self.name_for_output/assertion.actual)
Path(self._env.NFT_OUTPUT) / self.name_for_output / assertion.actual
)

return asserts

# pylint: disable=E0213
def test_wrapper(func: Callable):
""" Wrap tests with additional logging and cleaning. """
"""Wrap tests with additional logging and cleaning."""

def wrapper(self):
# pylint: disable=E1102
self.print_prolog()
Expand All @@ -83,14 +99,14 @@ def wrapper(self):

@test_wrapper
def test(self) -> bool:
""" Run test cases. """
"""Run test cases."""
if self.skip:
self._logger.info(' [ skipped ]')
self._logger.info(" [ skipped ]")
return True

nextflow_succeeded = self.submit()
if not nextflow_succeeded:
self._logger.error(' [ failed ]')
self._logger.error(" [ failed ]")
return False

for ass in self.asserts:
Expand All @@ -101,13 +117,13 @@ def test(self) -> bool:
ass.assert_expected()
except Exception as error:
self._logger.error(error.args)
self._logger.error(' [ failed ]')
self._logger.error(" [ failed ]")
raise error
self._logger.info(' [ succeed ]')
self._logger.info(" [ succeed ]")
return True

def submit(self) -> sp.CompletedProcess:
""" Submit a nextflow run """
"""Submit a nextflow run"""
# Use ExitStack to handle the multiple nested context managers
with ExitStack() as stack:
# Create a server with a random port to accept syslogs from
Expand All @@ -123,7 +139,7 @@ def submit(self) -> sp.CompletedProcess:
threading.Thread(
name="SyslogThread",
target=syslog_server.serve_forever,
kwargs={"poll_interval": 1}
kwargs={"poll_interval": 1},
).start()

syslog_address = ":".join(
Expand All @@ -133,9 +149,10 @@ def submit(self) -> sp.CompletedProcess:
nextflow_command = [
"nextflow",
"-quiet",
"-syslog", syslog_address,
"-syslog",
syslog_address,
"run",
self.nf_script
self.nf_script,
]

if self.profiles:
Expand All @@ -152,30 +169,26 @@ def submit(self) -> sp.CompletedProcess:

nextflow_command.extend([
f"--{self.output_directory_param_name}",
Path(self._env.NFT_OUTPUT, self.name_for_output)
Path(self._env.NFT_OUTPUT, self.name_for_output),
])

envmod = {
"NXF_WORK": self.temp_dir
}
envmod = {"NXF_WORK": self.temp_dir}

# Log the shell equivalent of this command
self._logger.info(
"%s %s",
" ".join([f"{k}={shlex.quote(v)}" for k, v in envmod.items()]),
sp.list2cmdline(nextflow_command)
sp.list2cmdline(nextflow_command),
)

process = popen_with_logger(
nextflow_command,
env={**os.environ, **envmod},
logger=self._nflogger
nextflow_command, env={**os.environ, **envmod}, logger=self._nflogger
)

return process

def combine_global(self, _global: NFTestGlobal) -> None:
""" Combine test case configs with the global configs. """
"""Combine test case configs with the global configs."""
if _global.nf_config:
self.nf_configs.insert(0, _global.nf_config)

Expand All @@ -192,6 +205,6 @@ def combine_global(self, _global: NFTestGlobal) -> None:
self.clean_logs = _global.clean_logs

def print_prolog(self):
""" Print prolog message """
prolog = f'{self.name}: {self.message}'
"""Print prolog message"""
prolog = f"{self.name}: {self.message}"
self._logger.info(prolog)
39 changes: 23 additions & 16 deletions nftest/NFTestENV.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,50 @@
""" Environment variables """
"""Environment variables"""

import os
import datetime
from dataclasses import dataclass, field
from dotenv import load_dotenv
from nftest.Singleton import Singleton


# pylint: disable=C0103
@dataclass
class NFTestENV(metaclass=Singleton):
""" Class for initializng and holding environment variables.
"""
"""Class for initializng and holding environment variables."""

NFT_OUTPUT: str = field(init=False)
NFT_TEMP: str = field(init=False)
NFT_INIT: str = field(init=False)
NFT_LOG_LEVEL: str = field(init=False)
NFT_LOG: str = field(init=False)

def __post_init__(self):
""" Post-init set env variables """
"""Post-init set env variables"""
NFTestENV.load_env()

self.NFT_OUTPUT = os.getenv('NFT_OUTPUT', default='./')
self.NFT_TEMP = os.getenv('NFT_TEMP', default='./')
self.NFT_INIT = os.getenv('NFT_INIT', default=str(os.getcwd()))
self.NFT_LOG_LEVEL = os.getenv('NFT_LOG_LEVEL', default='INFO')
self.NFT_LOG = os.getenv('NFT_LOG', default=os.path.join(self.NFT_OUTPUT, \
f'log-nftest-{datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")}.log'))
self.NFT_OUTPUT = os.getenv("NFT_OUTPUT", default="./")
self.NFT_TEMP = os.getenv("NFT_TEMP", default="./")
self.NFT_INIT = os.getenv("NFT_INIT", default=str(os.getcwd()))
self.NFT_LOG_LEVEL = os.getenv("NFT_LOG_LEVEL", default="INFO")
self.NFT_LOG = os.getenv(
"NFT_LOG",
default=os.path.join(
self.NFT_OUTPUT,
f'log-nftest-{datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")}.log',
),
)

@staticmethod
def load_env():
""" Load and set env variables """
"""Load and set env variables"""
dirs_to_check = []
dirs_to_check.append(os.getcwd())
dirs_to_check.append(os.path.expanduser('~'))
dirs_to_check.append(os.path.expanduser("~"))
for adir in dirs_to_check:
if not load_dotenv(os.path.join(adir, '.env')):
print(f'LOG: .env not found in {adir}.', flush=True)
if not load_dotenv(os.path.join(adir, ".env")):
print(f"LOG: .env not found in {adir}.", flush=True)
else:
print(f'LOG: Loaded .env from {adir}', flush=True)
print(f"LOG: Loaded .env from {adir}", flush=True)
return

print('WARN: unable to find .env. Default values will be used.', flush=True)
print("WARN: unable to find .env. Default values will be used.", flush=True)
Loading

0 comments on commit c9cbf2e

Please sign in to comment.