Skip to content

Commit

Permalink
Add Reporter class for Newrelic abstraction and job utility (#21)
Browse files Browse the repository at this point in the history
* Decouple Worker class from Newrelic using new Reporter class
* Add new Reporter class to abstract Newrelic reporting
* Inject reporter into job from worker
* Test multiple python versions using GH actions
  • Loading branch information
hammady authored Dec 20, 2023
1 parent a4cc795 commit 495893f
Show file tree
Hide file tree
Showing 9 changed files with 358 additions and 133 deletions.
12 changes: 10 additions & 2 deletions .github/workflows/run-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,22 @@ on:

jobs:
test:
strategy:
matrix:
python-version:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
- "3.12"
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python 3.11
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: '3.11'
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: pip install -r requirements-test.txt
- name: Run tests
Expand Down
13 changes: 9 additions & 4 deletions pyworker/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class Job(object, metaclass=Meta):
"""docstring for Job"""
def __init__(self, class_name, database, logger,
job_id, queue, run_at, attempts=0, max_attempts=1,
attributes=None, abstract=False, extra_fields=None):
attributes=None, abstract=False, extra_fields=None,
reporter=None):
super(Job, self).__init__()
self.class_name = class_name
self.database = database
Expand All @@ -33,12 +34,14 @@ def __init__(self, class_name, database, logger,
self.attributes = attributes
self.abstract = abstract
self.extra_fields = extra_fields
self.reporter = reporter

def __str__(self):
return "%s: %s" % (self.__class__.__name__, str(self.__dict__))

@classmethod
def from_row(cls, job_row, max_attempts, database, logger, extra_fields=None):
def from_row(cls, job_row, max_attempts, database, logger,
extra_fields=None, reporter=None):
'''job_row is a tuple of (id, attempts, run_at, queue, handler, *extra_fields)'''
def extract_class_name(line):
regex = re.compile('object: !ruby/object:(.+)')
Expand Down Expand Up @@ -80,7 +83,8 @@ def extract_extra_fields(extra_fields, extra_field_values):
max_attempts=max_attempts,
job_id=job_id, attempts=attempts,
run_at=run_at, queue=queue, database=database,
abstract=True, extra_fields=extra_fields_dict)
abstract=True, extra_fields=extra_fields_dict,
reporter=reporter)

attributes = extract_attributes(handler[2:])
logger.debug("Found attributes: %s" % str(attributes))
Expand All @@ -94,7 +98,8 @@ def extract_extra_fields(extra_fields, extra_field_values):
run_at=run_at, queue=queue, database=database,
max_attempts=max_attempts,
attributes=payload['object']['attributes'],
abstract=False, extra_fields=extra_fields_dict)
abstract=False, extra_fields=extra_fields_dict,
reporter=reporter)

def before(self):
self.logger.debug("Running Job.before hook")
Expand Down
59 changes: 59 additions & 0 deletions pyworker/reporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import json
from contextlib import contextmanager
import newrelic.agent


class Reporter(object):

def __init__(self, attribute_prefix='', logger=None):
self._prefix = attribute_prefix
self._logger = logger
if self._logger:
self._logger.info('Reporter: initializing NewRelic')
newrelic.agent.initialize()
self._newrelic_app = newrelic.agent.register_application()

def report(self, **attributes):
# format attributes
attributes = self._format_attributes(attributes)
# report to NewRelic
self._report_newrelic(attributes)

@contextmanager
def recorder(self, name):
return newrelic.agent.BackgroundTask(
application=self._newrelic_app,
name=name,
group='DelayedJob')

def shutdown(self):
newrelic.agent.shutdown_agent()

def record_exception(self, exception):
newrelic.agent.record_exception(exception)

def _format_attributes(self, attributes):
# prefix then convert all attribute keys to camelCase
# ensure values types are supported or json dump them
return {
self._prefix + self._to_camel_case(key): self._convert_value(value)
for key, value in attributes.items()
if key is not None and value is not None
}

@staticmethod
def _to_camel_case(string):
return string[0]+string.title()[1:].replace("-","").replace("_","").replace(" ","")

@staticmethod
def _convert_value(value):
if type(value) not in [str, int, float, bool]:
return json.dumps(value)
return value

def _report_newrelic(self, attributes):
if self._logger:
self._logger.debug('Reporter: reporting to NewRelic: %s' % attributes)
# convert attributes dict to list of tuples
attributes = list(attributes.items())
newrelic.agent.add_custom_attributes(attributes)
61 changes: 27 additions & 34 deletions pyworker/worker.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import newrelic.agent

import os, signal, traceback
import time
import json
from contextlib import contextmanager
from pyworker.db import DBConnector
from pyworker.job import Job
from pyworker.logger import Logger
from pyworker.util import get_current_time, get_time_delta
from pyworker.reporter import Reporter

class TimeoutException(Exception): pass
class TerminatedException(Exception): pass

class Worker(object):
def __init__(self, dbstring, logger=None, extra_delayed_job_fields=None):
def __init__(self, dbstring, logger=None,
extra_delayed_job_fields=None,
reported_attributes_prefix=''):
super(Worker, self).__init__()
self.logger = Logger(logger)
self.logger.info('Starting pyworker...')
Expand All @@ -27,16 +27,15 @@ def __init__(self, dbstring, logger=None, extra_delayed_job_fields=None):
self.name = 'host:%s pid:%d' % (hostname, pid)
self.extra_delayed_job_fields = extra_delayed_job_fields

# Configure NewRelic if ENV variables set
self.newrelic_app = None
# Configure application reporter if ENV variables set
self.reporter = None
NEW_RELIC_LICENSE_KEY = os.environ.get("NEW_RELIC_LICENSE_KEY")
NEW_RELIC_APP_NAME = os.environ.get("NEW_RELIC_APP_NAME")

# Register Application in NewRelic if configured
# Register application reporter if configured
if NEW_RELIC_LICENSE_KEY and NEW_RELIC_APP_NAME:
self.logger.info('Initializing NewRelic Agent for: %s' % NEW_RELIC_APP_NAME)
newrelic.agent.initialize()
self.newrelic_app = newrelic.agent.register_application()
self.reporter = Reporter(
attribute_prefix=reported_attributes_prefix, logger=self.logger)

@contextmanager
def _time_limit(self, seconds):
Expand Down Expand Up @@ -72,30 +71,24 @@ def _latency(job_run_at):
# and when the job actually started running `now`
return (now - job_run_at).total_seconds()

if self.newrelic_app:
if self.reporter:
latency = _latency(job.run_at)

with newrelic.agent.BackgroundTask(
application=self.newrelic_app,
name=job.job_name,
group='DelayedJob') as task:
with self.reporter.recorder(job.job_name) as task:

# Record custom attributes for the job transaction
newrelic.agent.add_custom_attribute('job_id', job.job_id)
newrelic.agent.add_custom_attribute('job_name', job.job_name)
newrelic.agent.add_custom_attribute('job_queue', job.queue)
newrelic.agent.add_custom_attribute('job_latency', latency)
newrelic.agent.add_custom_attribute('job_attempts', job.attempts)
self.reporter.report(
job_id=job.job_id,
job_name=job.job_name,
job_queue=job.queue,
job_latency=latency,
job_attempts=job.attempts
)

# Record extra fields if configured
self.logger.debug('job extra fields: %s' % job.extra_fields)
if job.extra_fields is not None:
for key, value in job.extra_fields.items():
# NewRelic only supports string, int, float, bool
if value is not None:
if type(value) not in [str, int, float, bool]:
value = json.dumps(value)
newrelic.agent.add_custom_attribute(key, value)
self.reporter.report(**job.extra_fields)

yield task
else:
Expand All @@ -119,9 +112,9 @@ def run(self):

self.database.disconnect()

# If configured shutdown NewRelic Agent to upload data on shutdown
if self.newrelic_app:
newrelic.agent.shutdown_agent()
# If configured shutdown reporter to upload data on shutdown
if self.reporter:
self.reporter.shutdown()

def get_job(self):
def get_job_row():
Expand Down Expand Up @@ -152,7 +145,8 @@ def get_job_row():
if job_row:
return Job.from_row(job_row, max_attempts=self.max_attempts,
database=self.database, logger=self.logger,
extra_fields=self.extra_delayed_job_fields)
extra_fields=self.extra_delayed_job_fields,
reporter=self.reporter)
else:
return None

Expand Down Expand Up @@ -186,11 +180,10 @@ def handle_job(self, job):
raise exception
finally:
# report error status
if self.newrelic_app:
newrelic.agent.add_custom_attribute('error', error)
newrelic.agent.add_custom_attribute('job_failure', failed)
if self.reporter:
self.reporter.report(error=error, job_failure=failed)
if caught_exception:
newrelic.agent.record_exception(caught_exception)
self.reporter.record_exception(caught_exception)
time_diff = time.time() - start_time
self.logger.info('Job %d finished in %d seconds' % \
(job.job_id, time_diff))
33 changes: 0 additions & 33 deletions requirements.txt

This file was deleted.

8 changes: 4 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from setuptools import setup

requirements = [
'psycopg2',
'python-dateutil',
'PyYAML',
'newrelic'
'psycopg2-binary>=2',
'python-dateutil>=2',
'PyYAML>=3',
'newrelic>=7'
]

setup(
Expand Down
24 changes: 24 additions & 0 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ def load_unregistered_job(self):
def load_unregistered_job_with_extra_fields(self):
return self.load_job_with_extra_fields('handler_unregistered.yaml')

def load_unregistered_job_with_reporter(self, reporter):
job = self.load_unregistered_job()
job.reporter = reporter
return job

def load_registered_job(self):
job = self.load_job('handler_registered.yaml')
job.error = MagicMock()
Expand All @@ -76,6 +81,11 @@ def load_registered_job(self):
def load_registered_job_with_extra_fields(self):
return self.load_job_with_extra_fields('handler_registered.yaml')

def load_registered_job_with_reporter(self, reporter):
job = self.load_registered_job()
job.reporter = reporter
return job

def load_registered_job_with_attempts_exceeded(self):
job = self.load_registered_job()
job.attempts = self.mock_max_attempts - 1
Expand All @@ -99,12 +109,19 @@ def test_from_row_when_unregistered_class_returns_job_instance_without_attribute
self.assertEqual(job.max_attempts, self.mock_max_attempts)
self.assertIsNone(job.extra_fields)
self.assertIsNone(job.attributes)
self.assertIsNone(job.reporter)

def test_from_row_when_unregistered_class_returns_job_instance_with_extra_fields(self):
job = self.load_unregistered_job_with_extra_fields()

self.assertDictEqual(job.extra_fields, self.mock_extra_fields)

def test_from_row_when_unregistered_class_returns_abstract_job_instance_with_reporter(self):
mock_reporter = MagicMock()
job = self.load_unregistered_job_with_reporter(mock_reporter)

self.assertEqual(job.reporter, mock_reporter)

def test_from_row_when_registered_class_returns_concrete_job_instance(self):
job = self.load_registered_job()

Expand All @@ -128,12 +145,19 @@ def test_from_row_when_registered_class_returns_job_instance_with_attributes(sel
'total_articles': 1000,
'is_blind': True
})
self.assertIsNone(job.reporter)

def test_from_row_when_registered_class_returns_job_instance_with_extra_fields(self):
job = self.load_registered_job_with_extra_fields()

self.assertDictEqual(job.extra_fields, self.mock_extra_fields)

def test_from_row_when_registered_class_returns_concrete_job_instance_with_reporter(self):
mock_reporter = MagicMock()
job = self.load_registered_job_with_reporter(mock_reporter)

self.assertEqual(job.reporter, mock_reporter)

#********** .set_error_unlock tests **********#

def assert_job_updated_field(self, job, field, value):
Expand Down
Loading

0 comments on commit 495893f

Please sign in to comment.