Skip to content

Commit

Permalink
Python2.7 custom reporting (#24)
Browse files Browse the repository at this point in the history
* Instrument using custom attributes + add Worker tests (#18)
* Add tests to the Job class (#19)
* Add support for reporting extra fields if configured (#20)
* Add Reporter class for Newrelic abstraction and job utility (#21)
* Fix integration bugs + add report_raw and flatten dicts (#23)

* Patch GH action to work on python 2.7 only
* Downgrade pytest, PyYAML, psycopg2-binary to work with python 2.7
* Fix tests and job to be python 2.7 compatible
  • Loading branch information
hammady authored Dec 21, 2023
1 parent bb9bfdb commit 00f25eb
Show file tree
Hide file tree
Showing 17 changed files with 1,077 additions and 137 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/run-tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: Run tests

on:
push:
branches: [ python2.7 ]
pull_request:
branches: [ python2.7 ]

jobs:
test:
runs-on: ubuntu-20.04
container:
image: python:2.7.18-buster
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Install dependencies
run: pip install -r requirements-test.txt
- name: Run tests
run: pytest
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
*.pyc
/dist
/build
*.egg-info
*.egg-info
.venv

16 changes: 9 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
FROM python:2.7-jessie
FROM python:2.7.18-buster

LABEL maintainer="Hossam Hammady <[email protected]>"
LABEL maintainer="Hossam Hammady <[email protected]>"

WORKDIR /home

COPY / /home/
# install deps first
RUN pip install --upgrade pip
COPY requirements-test.txt /home/
RUN pip install -r requirements-test.txt

RUN pip install --upgrade pip && \
pip install twine && \
python setup.py sdist bdist_wheel
# copy rest of files
COPY / /home/

CMD ["twine", "upload", "dist/*"]
CMD ["pytest"]
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ Youc an also provide a logger class (from `logging` module) to have full control
- No access to your Ruby classes, you should implement all your logic from scratch in Python
- Reads only raw attributes of jobs from the database (job table columns), no relations
- Assumes you only need to call the `run` method in your job with no arguments
- No unit tests

## Contribute

Expand All @@ -126,6 +125,11 @@ Install the code for development:

Do your changes, then send a pull request.

## Test

pip install -r requirements-test.txt
pytest

## Publish

### Using Python
Expand Down
63 changes: 47 additions & 16 deletions pyworker/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,32 @@ class Job(object):
"""docstring for Job"""
__metaclass__ = Meta
def __init__(self, class_name, database, logger,
job_id, queue, run_at, attempts=0, max_attempts=1, attributes=None):
job_id, queue, run_at, attempts=0, max_attempts=1,
attributes=None, abstract=False, extra_fields=None,
reporter=None):
super(Job, self).__init__()
self.class_name = class_name
self.database = database
self.logger = logger
self.job_id = job_id
self.job_name = '%s#run' % class_name
self.attempts = attempts
self.run_at = run_at
self.queue = queue
self.max_attempts = max_attempts
self.attributes = attributes
self.abstract = abstract
self.extra_fields = extra_fields
self.reporter = reporter

def __str__(self):
return "%s: %s" % (self.__class__.__name__, str(self.__dict__))

@classmethod
def from_row(cls, job_row, max_attempts, database, logger):
'''job_row is a tuple of (id, attempts, run_at, queue, handler)'''
def from_row(cls, job_row, max_attempts, database, logger,
extra_fields=None, reporter=None):
'''job_row is a tuple of (id, attempts, run_at, queue, handler, *extra_fields)'''
def extract_class_name(line):
# TODO cache regex
regex = re.compile('object: !ruby/object:(.+)')
match = regex.match(line)
if match:
Expand All @@ -59,7 +65,18 @@ def extract_attributes(lines):
attributes.append(line)
return attributes

job_id, attempts, run_at, queue, handler = job_row
def extract_extra_fields(extra_fields, extra_field_values):
if extra_fields is None or extra_field_values is None:
return None

return dict(zip(extra_fields, extra_field_values))

# job_id, attempts, run_at, queue, handler, *extra_field_values = job_row
# The extended iterable unpacking (using *) does not work under python 2.7
job_id, attempts, run_at, queue, handler = job_row[:5]
extra_field_values = job_row[5:]

extra_fields_dict = extract_extra_fields(extra_fields, extra_field_values)
handler = handler.splitlines()

class_name = extract_class_name(handler[1])
Expand All @@ -70,7 +87,9 @@ def extract_attributes(lines):
return Job(class_name=class_name, logger=logger,
max_attempts=max_attempts,
job_id=job_id, attempts=attempts,
run_at=run_at, queue=queue, database=database)
run_at=run_at, queue=queue, database=database,
abstract=True, extra_fields=extra_fields_dict,
reporter=reporter)

attributes = extract_attributes(handler[2:])
logger.debug("Found attributes: %s" % str(attributes))
Expand All @@ -83,7 +102,9 @@ def extract_attributes(lines):
job_id=job_id, attempts=attempts,
run_at=run_at, queue=queue, database=database,
max_attempts=max_attempts,
attributes=payload['object']['attributes'])
attributes=payload['object']['attributes'],
abstract=False, extra_fields=extra_fields_dict,
reporter=reporter)

def before(self):
self.logger.debug("Running Job.before hook")
Expand All @@ -101,21 +122,26 @@ def success(self):
self.logger.debug("Running Job.success hook")

def set_error_unlock(self, error):
failed = False
self.logger.error('Job %d raised error: %s' % (self.job_id, error))
# run error hook
self.error(error)
self.attempts += 1
now = get_current_time()
setters = [
'locked_at = null',
'locked_by = null',
'attempts = %d' % self.attempts,
'locked_at = %s',
'locked_by = %s',
'attempts = %s',
'last_error = %s'
]
values = [
None,
None,
self.attempts,
error
]
if self.attempts >= self.max_attempts:
failed = True
# set failed_at = now
setters.append('failed_at = %s')
values.append(now)
Expand All @@ -125,15 +151,20 @@ def set_error_unlock(self, error):
setters.append('run_at = %s')
delta = (self.attempts**4) + 5
values.append(str(now + get_time_delta(seconds=delta)))
query = 'UPDATE delayed_jobs SET %s WHERE id = %d' % \
(', '.join(setters), self.job_id)
self.logger.debug('set error query: %s' % query)
self.logger.debug('set error values: %s' % str(values))
self.database.cursor().execute(query, tuple(values))
self.database.commit()

self._update_job(setters, values)
return failed

def remove(self):
self.logger.debug('Job %d finished successfully' % self.job_id)
query = 'DELETE FROM delayed_jobs WHERE id = %d' % self.job_id
self.database.cursor().execute(query)
self.database.commit()

def _update_job(self, setters, values):
query = 'UPDATE delayed_jobs SET %s WHERE id = %d' % \
(', '.join(setters), self.job_id)
self.logger.debug('update query: %s' % query)
self.logger.debug('update values: %s' % str(values))
self.database.cursor().execute(query, tuple(values))
self.database.commit()
76 changes: 76 additions & 0 deletions pyworker/reporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import json
from contextlib import contextmanager
import newrelic.agent


class Reporter(object):

def __init__(self, attribute_prefix='', logger=None):
self._prefix = attribute_prefix
self._logger = logger
if self._logger:
self._logger.info('Reporter: initializing NewRelic')
newrelic.agent.initialize()
self._newrelic_app = newrelic.agent.register_application()

def report(self, **attributes):
# flatten attributes
attributes = self._flatten_attributes(attributes)
# format attributes
attributes = self._format_attributes(attributes)
self.report_raw(**attributes)

def report_raw(self, **attributes):
# report to NewRelic
self._report_newrelic(attributes)

@contextmanager
def recorder(self, name):
with newrelic.agent.BackgroundTask(
application=self._newrelic_app,
name=name,
group='DelayedJob') as task:
yield task

def shutdown(self):
newrelic.agent.shutdown_agent()

def record_exception(self, exc_info):
newrelic.agent.notice_error(error=exc_info)

def _flatten_attributes(self, attributes):
# flatten nested dict attributes
flattened_attributes = {}
for key, value in attributes.items():
if type(value) == dict:
for nested_key, nested_value in value.items():
flattened_attributes[nested_key] = nested_value
else:
flattened_attributes[key] = value
return flattened_attributes

def _format_attributes(self, attributes):
# prefix then convert all attribute keys to camelCase
# ensure values types are supported or json dump them
return {
self._prefix + self._to_camel_case(key): self._convert_value(value)
for key, value in attributes.items()
if key is not None and value is not None
}

@staticmethod
def _to_camel_case(string):
return string[0]+string.title()[1:].replace("-","").replace("_","").replace(" ","")

@staticmethod
def _convert_value(value):
if type(value) not in [str, int, float, bool]:
return json.dumps(value)
return value

def _report_newrelic(self, attributes):
if self._logger:
self._logger.debug('Reporter: reporting to NewRelic: %s' % attributes)
# convert attributes dict to list of tuples
attributes = list(attributes.items())
newrelic.agent.add_custom_attributes(attributes)
Loading

0 comments on commit 00f25eb

Please sign in to comment.