diff --git a/.github/workflows/run-tests.yaml b/.github/workflows/run-tests.yaml new file mode 100644 index 0000000..de58056 --- /dev/null +++ b/.github/workflows/run-tests.yaml @@ -0,0 +1,22 @@ +name: Run tests + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +jobs: + test: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v2 + - name: Set up Python 3.11 + uses: actions/setup-python@v2 + with: + python-version: '3.11' + - name: Install dependencies + run: pip install -r requirements-test.txt + - name: Run tests + run: pytest diff --git a/.gitignore b/.gitignore index 584f258..2d6b31b 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,6 @@ *.pyc /dist /build -*.egg-info \ No newline at end of file +*.egg-info +.venv + diff --git a/README.md b/README.md index 38908a8..fe8bf13 100644 --- a/README.md +++ b/README.md @@ -115,7 +115,6 @@ Youc an also provide a logger class (from `logging` module) to have full control - No access to your Ruby classes, you should implement all your logic from scratch in Python - Reads only raw attributes of jobs from the database (job table columns), no relations - Assumes you only need to call the `run` method in your job with no arguments -- No unit tests ## Contribute @@ -127,6 +126,11 @@ Install the code for development: Do your changes, then send a pull request. +## Test + + pip install -r requirements-test.txt + pytest + ## Publish ### Using Python diff --git a/pyworker/job.py b/pyworker/job.py index 7d975f6..07b4491 100644 --- a/pyworker/job.py +++ b/pyworker/job.py @@ -18,17 +18,20 @@ def __new__(meta, name, bases, class_dict): class Job(object, metaclass=Meta): """docstring for Job""" def __init__(self, class_name, database, logger, - job_id, queue, run_at, attempts=0, max_attempts=1, attributes=None): + job_id, queue, run_at, attempts=0, max_attempts=1, + attributes=None, abstract=False): super(Job, self).__init__() self.class_name = class_name self.database = database self.logger = logger self.job_id = job_id + self.job_name = '%s#run' % class_name self.attempts = attempts self.run_at = run_at self.queue = queue self.max_attempts = max_attempts self.attributes = attributes + self.abstract = abstract def __str__(self): return "%s: %s" % (self.__class__.__name__, str(self.__dict__)) @@ -69,7 +72,8 @@ def extract_attributes(lines): return Job(class_name=class_name, logger=logger, max_attempts=max_attempts, job_id=job_id, attempts=attempts, - run_at=run_at, queue=queue, database=database) + run_at=run_at, queue=queue, database=database, + abstract=True) attributes = extract_attributes(handler[2:]) logger.debug("Found attributes: %s" % str(attributes)) @@ -100,6 +104,7 @@ def success(self): self.logger.debug("Running Job.success hook") def set_error_unlock(self, error): + failed = False self.logger.error('Job %d raised error: %s' % (self.job_id, error)) # run error hook self.error(error) @@ -115,6 +120,7 @@ def set_error_unlock(self, error): error ] if self.attempts >= self.max_attempts: + failed = True # set failed_at = now setters.append('failed_at = %s') values.append(now) @@ -130,6 +136,7 @@ def set_error_unlock(self, error): self.logger.debug('set error values: %s' % str(values)) self.database.cursor().execute(query, tuple(values)) self.database.commit() + return failed def remove(self): self.logger.debug('Job %d finished successfully' % self.job_id) diff --git a/pyworker/worker.py b/pyworker/worker.py index a0ae791..873954c 100644 --- a/pyworker/worker.py +++ b/pyworker/worker.py @@ -75,18 +75,16 @@ def _latency(job_run_at): with newrelic.agent.BackgroundTask( application=self.newrelic_app, - name='%s#run' % job.class_name, + name=job.job_name, group='DelayedJob') as task: - # Record a custom metrics - # 1) Custom/DelayedJobQueueLatency/ => latency - # 2) Custom/DelayedJobMethodLatency/ => latency - # 3) Custom/DelayedJobMethodAttempts/ => attempts - newrelic.agent.record_custom_metrics([ - ('Custom/DelayedJobQueueLatency/%s' % job.queue, latency), - ('Custom/DelayedJobMethodLatency/%s' % job.class_name, latency), - ('Custom/DelayedJobMethodAttempts/%s' % job.class_name, job.attempts) - ], application=self.newrelic_app) + # Record custom attributes for the job transaction + newrelic.agent.add_custom_attribute('job_id', job.job_id) + newrelic.agent.add_custom_attribute('job_name', job.job_name) + newrelic.agent.add_custom_attribute('job_queue', job.queue) + newrelic.agent.add_custom_attribute('job_latency', latency) + newrelic.agent.add_custom_attribute('job_attempts', job.attempts) + # TODO report job.enqueue_attributes if available yield task else: @@ -100,35 +98,11 @@ def run(self): self.logger.debug('Picking up jobs...') job = self.get_job() self._current_job = job # used in signal handlers - start_time = time.time() try: - if type(job) == Job: - raise ValueError(('Unsupported Job: %s, please import it ' \ - + 'before you can handle it') % job.class_name) - elif job is not None: - with self._instrument(job): - self.logger.info('Running Job %d' % job.job_id) - with self._time_limit(self.max_run_time): - job.before() - job.run() - job.after() - job.success() - job.remove() - except Exception as exception: if job is not None: - error_str = traceback.format_exc() - job.set_error_unlock(error_str) - if type(exception) == TerminatedException: - break - finally: - if job is not None: - time_diff = time.time() - start_time - self.logger.info('Job %d finished in %d seconds' % \ - (job.job_id, time_diff)) - - # Sleep for a while between each job and break if received SIGTERM - try: - time.sleep(self.sleep_delay) + self.handle_job(job) + else: # sleep for a while before checking again for new jobs + time.sleep(self.sleep_delay) except TerminatedException: break @@ -165,3 +139,42 @@ def get_job_row(): database=self.database, logger=self.logger) else: return None + + def handle_job(self, job): + if job is None: + return + with self._instrument(job): + start_time = time.time() + error = failed = False + caught_exception = None + try: + if job.abstract: + raise ValueError(('Unsupported Job: %s, please import it ' \ + + 'before you can handle it') % job.class_name) + else: + self.logger.info('Running Job %d' % job.job_id) + with self._time_limit(self.max_run_time): + job.before() + job.run() + job.after() + job.success() + job.remove() + except Exception as exception: + error = True + caught_exception = exception + # handle error + error_str = traceback.format_exc() + failed = job.set_error_unlock(error_str) + # if that was a termination error, bubble up to caller + if type(exception) == TerminatedException: + raise exception + finally: + # report error status + if self.newrelic_app: + newrelic.agent.add_custom_attribute('error', error) + newrelic.agent.add_custom_attribute('job_failure', failed) + if caught_exception: + newrelic.agent.record_exception(caught_exception) + time_diff = time.time() - start_time + self.logger.info('Job %d finished in %d seconds' % \ + (job.job_id, time_diff)) diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 0000000..0da5544 --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1,6 @@ +pytest==7.4.3 +pytest-cov==4.1.0 +newrelic==9.3.0 +PyYAML==6.0.1 +python-dateutil==2.6.0 +psycopg2-binary==2.9.9 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_worker.py b/tests/test_worker.py new file mode 100644 index 0000000..eb11faf --- /dev/null +++ b/tests/test_worker.py @@ -0,0 +1,192 @@ +import datetime +from unittest import TestCase +from unittest.mock import patch, MagicMock, Mock +from pyworker.worker import Worker, TerminatedException, get_current_time + +class TestWorker(TestCase): + @patch('pyworker.worker.DBConnector') + def setUp(self, mock_db): + self.worker = Worker('dummy') + self.mock_db = mock_db + mock_db.connect = MagicMock() + mock_db.disconnect = MagicMock() + mocked_run_at = datetime.datetime(2023, 10, 7, 0, 0, 1) + self.mocked_now = datetime.datetime(2023, 10, 7, 0, 0, 10) + self.mocked_latency = 9 # seconds: mocked_now - mocked_run_at + self.mock_job = MagicMock( + abstract=False, + job_id=1, + job_name='test_job', + queue='default', + attempts=0, + run_at=mocked_run_at) + + def tearDown(self): + pass + + #********** __init__ tests **********# + + @patch('pyworker.worker.os.uname', return_value=['pytest', 'localhost']) + @patch('pyworker.worker.os.getpid', return_value=1234) + @patch('pyworker.worker.DBConnector') + def test_worker_init(self, mock_db, *_): + worker = Worker('dummy') + + self.assertEqual(worker.database, mock_db.return_value) + self.assertEqual(worker.sleep_delay, 10) + self.assertEqual(worker.max_attempts, 3) + self.assertEqual(worker.max_run_time, 3600) + self.assertEqual(worker.queue_names, 'default') + self.assertEqual(worker.name, 'host:localhost pid:1234') + + #********** .run tests **********# + + @patch('pyworker.worker.Worker.get_job', return_value=None) + # make sleep raise an exception to stop the loop + @patch('pyworker.worker.time.sleep', side_effect=TerminatedException('SIGTERM')) + def test_worker_run_connects_to_and_disconnects_from_database(self, *_): + self.worker.run() + + self.worker.database.connect.assert_called_once_with() + self.worker.database.disconnect.assert_called_once_with() + + @patch('pyworker.worker.Worker.get_job', return_value=None) + @patch('pyworker.worker.time.sleep', side_effect=TerminatedException('SIGTERM')) + @patch('pyworker.worker.newrelic.agent', return_value=MagicMock()) + def test_worker_run_shuts_down_newrelic_agent(self, newrelic_agent, *_): + self.worker.newrelic_app = MagicMock() + + self.worker.run() + + newrelic_agent.shutdown_agent.assert_called_once_with() + + @patch('pyworker.worker.time.sleep', side_effect=TerminatedException('SIGTERM')) + @patch('pyworker.worker.Worker.get_job', return_value=None) + def test_worker_run_when_no_jobs_found_sleeps(self, mock_get_job, mock_time_sleep): + self.worker.run() + + mock_get_job.assert_called_once_with() + mock_time_sleep.assert_called_once_with(self.worker.sleep_delay) + + @patch('pyworker.worker.Worker.handle_job', side_effect=TerminatedException('SIGTERM')) + @patch('pyworker.worker.Worker.get_job', return_value=MagicMock()) + def test_worker_run_when_job_found_handles_job(self, mock_get_job, mock_handle_job): + self.worker.run() + + mock_get_job.assert_called_once_with() + mock_handle_job.assert_called_once_with(mock_get_job.return_value) + + #********** .handle_job tests **********# + + def assert_instrument_context_reports_custom_attributes(self, job, newrelic_agent): + newrelic_agent.BackgroundTask.assert_called_once() + newrelic_agent.add_custom_attribute.assert_any_call('job_id', job.job_id) + newrelic_agent.add_custom_attribute.assert_any_call('job_name', job.job_name) + newrelic_agent.add_custom_attribute.assert_any_call('job_queue', job.queue) + newrelic_agent.add_custom_attribute.assert_any_call('job_latency', self.mocked_latency) + newrelic_agent.add_custom_attribute.assert_any_call('job_attempts', job.attempts) + + def test_worker_handle_job_when_job_is_none_does_nothing(self): + self.worker.handle_job(None) # no error raised + + @patch('pyworker.worker.newrelic.agent', return_value=MagicMock()) + def test_worker_handle_job_when_job_is_unsupported_type_sets_error(self, newrelic_agent): + job = self.mock_job + job.abstract = True + + self.worker.handle_job(job) + + job.set_error_unlock.assert_called_once() + assert 'Unsupported Job' in job.set_error_unlock.call_args[0][0] + + @patch('pyworker.worker.get_current_time') + @patch('pyworker.worker.newrelic.agent', return_value=MagicMock()) + def test_worker_handle_job_when_job_is_unsupported_type_reports_error_to_newrelic( + self, newrelic_agent, get_current_time): + get_current_time.return_value = self.mocked_now + job = self.mock_job + job.abstract = True + self.worker.newrelic_app = MagicMock() + + self.worker.handle_job(job) + + self.assert_instrument_context_reports_custom_attributes(job, newrelic_agent) + newrelic_agent.record_exception.assert_called_once() + newrelic_agent.add_custom_attribute.assert_any_call('error', True) + + def test_worker_handle_job_calls_all_hooks_then_removes_from_queue(self): + self.worker.handle_job(self.mock_job) + + self.mock_job.before.assert_called_once() + self.mock_job.run.assert_called_once() + self.mock_job.after.assert_called_once() + self.mock_job.success.assert_called_once() + + self.mock_job.remove.assert_called_once() + + @patch('pyworker.worker.get_current_time') + @patch('pyworker.worker.newrelic.agent', return_value=MagicMock()) + def test_worker_handle_job_when_no_errors_reports_success_to_newrelic( + self, newrelic_agent, get_current_time): + get_current_time.return_value = self.mocked_now + self.worker.newrelic_app = MagicMock() + + self.worker.handle_job(self.mock_job) + + self.assert_instrument_context_reports_custom_attributes(self.mock_job, newrelic_agent) + newrelic_agent.record_exception.assert_not_called() + newrelic_agent.add_custom_attribute.assert_any_call('error', False) + newrelic_agent.add_custom_attribute.assert_any_call('job_failure', False) + + def test_worker_handle_job_when_error_sets_error_and_unlocks_job(self): + job = self.mock_job + job.run.side_effect = Exception('test error') + + self.worker.handle_job(job) + + job.set_error_unlock.assert_called_once() + assert 'test error' in job.set_error_unlock.call_args[0][0] + job.remove.assert_not_called() + + @patch('pyworker.worker.get_current_time') + @patch('pyworker.worker.newrelic.agent', return_value=MagicMock()) + def test_worker_handle_job_when_error_report_to_newrelic(self, + newrelic_agent, get_current_time): + get_current_time.return_value = self.mocked_now + job = self.mock_job + job.set_error_unlock.return_value = False + job.run.side_effect = Exception('test error') + self.worker.newrelic_app = MagicMock() + + self.worker.handle_job(job) + + self.assert_instrument_context_reports_custom_attributes(job, newrelic_agent) + newrelic_agent.record_exception.assert_called_once() + newrelic_agent.add_custom_attribute.assert_any_call('error', True) + newrelic_agent.add_custom_attribute.assert_any_call('job_failure', False) + job.remove.assert_not_called() + + @patch('pyworker.worker.get_current_time') + @patch('pyworker.worker.newrelic.agent', return_value=MagicMock()) + def test_worker_handle_job_when_permanent_error_reports_failure_to_newrelic( + self, newrelic_agent, get_current_time): + get_current_time.return_value = self.mocked_now + job = self.mock_job + job.set_error_unlock.return_value = True + job.run.side_effect = Exception('test error') + self.worker.newrelic_app = MagicMock() + + self.worker.handle_job(job) + + self.assert_instrument_context_reports_custom_attributes(job, newrelic_agent) + newrelic_agent.record_exception.assert_called_once() + newrelic_agent.add_custom_attribute.assert_any_call('error', True) + newrelic_agent.add_custom_attribute.assert_any_call('job_failure', True) + job.remove.assert_not_called() + + def test_worker_handle_job_when_error_is_termination_error_bubbles_up(self): + job = self.mock_job + job.run.side_effect = TerminatedException('SIGTERM') + + with self.assertRaises(TerminatedException): + self.worker.handle_job(job)