Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elastic update only modified #12

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
daemon_choices = ['start', 'start-foreground', 'stop', 'status']
queue_choices = ['add', 'clear', 'list', 'stats']
module_choices = ['download', 'backref', 'elastic', 'thumbnails', 'fulltext', 'georef', 'sitemap', 'misc', 'worker']
worker_region_choices = ['region-download', 'region-elastic', 'sync-region', 'sync-regions']
worker_region_choices = ['region-download', 'elasticsearch_regions', 'sync-region', 'sync-regions', 'generate_regions']
worker_body_choices = ['remove-body', 'sync-bodies', 'sync-body', 'remove-locations', 'reset-georef', 'reset-lastsync']
worker_misc_choices = ['migrate-ids', 'fix-oparl-11', 'sitemap-master']

Expand Down
10 changes: 9 additions & 1 deletion oparlsync/elasticsearch_import/ElasticsearchMeetingIndex.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class ElasticsearchMeetingIndex:

def meeting_index(self):
self.datalog.info('Starting meeting indexing...')
if not self.es.indices.exists_alias(name='meeting-latest'):
now = datetime.utcnow()
index_name = 'meeting-' + now.strftime('%Y%m%d-%H%M')
Expand Down Expand Up @@ -52,7 +53,13 @@ def meeting_index(self):
regions.append(str(region.id))
region = region.parent

for meeting in Meeting.objects(body=self.body).no_cache():
last_index_timestamp = Option.get('last_index_meeting')

query_args = {'body': self.body}
if last_index_timestamp:
query_args['modified__gt'] = last_index_timestamp

for meeting in Meeting.objects(**query_args).no_cache():
if meeting.deleted:
self.es.delete(
index=index_name,
Expand All @@ -78,4 +85,5 @@ def meeting_index(self):
self.statistics['created'],
self.statistics['updated']
))
Option.set('last_index_meeting', datetime.utcnow(), 'datetime')

12 changes: 11 additions & 1 deletion oparlsync/elasticsearch_import/ElasticsearchOrganizationIndex.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
class ElasticsearchOrganizationIndex:

def organization_index(self):
self.datalog.info('Starting organization indexing...')

if not self.es.indices.exists_alias(name='organization-latest'):
now = datetime.utcnow()
index_name = 'organization-' + now.strftime('%Y%m%d-%H%M')
Expand Down Expand Up @@ -52,7 +54,14 @@ def organization_index(self):
regions.append(str(region.id))
region = region.parent

for organization in Organization.objects(body=self.body).no_cache():

last_index_timestamp = Option.get('last_index_organization')

query_args = {'body': self.body}
if last_index_timestamp:
query_args['modified__gt'] = last_index_timestamp

for organization in Organization.objects(**query_args).no_cache():
if organization.deleted:
self.es.delete(
index=index_name,
Expand All @@ -78,4 +87,5 @@ def organization_index(self):
self.statistics['created'],
self.statistics['updated']
))
Option.set('last_index_organization', datetime.utcnow(), 'datetime')

10 changes: 9 additions & 1 deletion oparlsync/elasticsearch_import/ElasticsearchPaperIndex.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class ElasticsearchPaperIndex:

def paper_index(self):
self.datalog.info('Starting paper indexing...')
if not self.es.indices.exists_alias(name='paper-latest'):
now = datetime.utcnow()
index_name = 'paper-' + now.strftime('%Y%m%d-%H%M')
Expand All @@ -46,13 +47,19 @@ def paper_index(self):
else:
index_name = list(self.es.indices.get_alias('paper-latest'))[0]

last_index_timestamp = Option.get('last_index_paper')

regions = []
region = self.body.region
while (region):
regions.append(str(region.id))
region = region.parent

for paper in Paper.objects(body=self.body).no_cache():
query_args = {'body': self.body}
if last_index_timestamp:
query_args['modified__gt'] = last_index_timestamp

for paper in Paper.objects(**query_args).no_cache():
if paper.deleted:
self.es.delete(
index=index_name,
Expand All @@ -78,4 +85,5 @@ def paper_index(self):
self.statistics['created'],
self.statistics['updated']
))
Option.set('last_index_paper', datetime.utcnow(), 'datetime')

13 changes: 11 additions & 2 deletions oparlsync/elasticsearch_import/ElasticsearchPaperLocationIndex.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
class ElasticsearchPaperLocationIndex:

def paper_location_index(self):
self.datalog.info('Starting paper location indexing...')
if not self.es.indices.exists_alias(name='paper-location-latest'):
now = datetime.utcnow()
index_name = 'paper-location-' + now.strftime('%Y%m%d-%H%M')
Expand Down Expand Up @@ -57,7 +58,13 @@ def paper_location_index(self):
regions.append(str(region.id))
region = region.parent

for location in Location.objects(body=self.body).no_cache():
last_index_timestamp = Option.get('last_index_paper_location')

query_args = {'body': self.body}
if last_index_timestamp:
query_args['modified__gt'] = last_index_timestamp

for location in Location.objects(**query_args).no_cache():
if location.deleted:
self.es.delete(
index=index_name,
Expand Down Expand Up @@ -92,7 +99,8 @@ def paper_location_index(self):
location_dict['geotype'] = location_dict['geojson']['geometry']['type']
location_dict['geojson'] = json.dumps(location_dict['geojson'])

location_dict['legacy'] = bool(location.region.legacy)
if location.region:
location_dict['legacy'] = bool(location.region.legacy)
try:
new_doc = self.es.index(
index=index_name,
Expand All @@ -110,3 +118,4 @@ def paper_location_index(self):
self.statistics['created'],
self.statistics['updated']
))
Option.set('last_index_paper_location', datetime.utcnow(), 'datetime')
12 changes: 11 additions & 1 deletion oparlsync/elasticsearch_import/ElasticsearchPersonIndex.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
class ElasticsearchPersonIndex:

def person_index(self):
self.datalog.info('Starting person indexing...')

if not self.es.indices.exists_alias(name='person-latest'):
now = datetime.utcnow()
index_name = 'person-' + now.strftime('%Y%m%d-%H%M')
Expand Down Expand Up @@ -52,7 +54,13 @@ def person_index(self):
regions.append(str(region.id))
region = region.parent

for person in Person.objects(body=self.body).no_cache():
last_index_timestamp = Option.get('last_index_person')

query_args = {'body': self.body}
if last_index_timestamp:
query_args['modified__gt'] = last_index_timestamp

for person in Person.objects(**query_args).no_cache():
if person.deleted:
self.es.delete(
index=index_name,
Expand All @@ -78,4 +86,6 @@ def person_index(self):
self.statistics['created'],
self.statistics['updated']
))
Option.set('last_index_person', datetime.utcnow(), 'datetime')


9 changes: 5 additions & 4 deletions oparlsync/maintenance/Maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class Maintenance(BaseTask, MaintenanceRemove, MaintenanceRegion, MaintenanceReg
'elasticsearch'
]

def __init__(self, body_id):
self.body_id = body_id
def __init__(self, **kwargs):
self.body_id = kwargs['region']
super().__init__()
self.valid_objects = [
Body,
Expand All @@ -48,6 +48,7 @@ def __init__(self, body_id):
File,
Location
]
self.run(self.body_id, kwargs['job'])

def run(self, body_id, *args):
if len(args) < 1:
Expand All @@ -58,7 +59,7 @@ def run(self, body_id, *args):
elif args[0] == 'generate_regions':
self.generate_regions()
elif args[0] == 'elasticsearch_regions':
self.elasticsearch_regions()
self.elasticsearch_regions(body_id)
elif args[0] == 'update_street_locality':
self.update_street_locality()
elif args[0] == 'sync_bodies':
Expand Down Expand Up @@ -86,7 +87,7 @@ def activate_body(self, body_id):
self.sync_body(body_id)
self.generate_regions()
self.sync_body(body_id)
self.elasticsearch_regions()
self.elasticsearch_regions(body_id)

def fix_oparl_11(self):
count_delete = 0
Expand Down
4 changes: 2 additions & 2 deletions oparlsync/maintenance/MaintenanceRegionElastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@


class MaintenanceRegionElastic:
def elasticsearch_regions(self):
def elasticsearch_regions(self, body_id):
if not self.es.indices.exists_alias(name='region-latest'):
now = datetime.utcnow()
index_name = 'region-' + now.strftime('%Y%m%d-%H%M')

es_import = ElasticsearchImport(self)
es_import = ElasticsearchImport(**{'body': body_id})

mapping = es_import.es_mapping_generator(Region, deref='deref_region', delete='delete_region')
mapping['properties']['body_count'] = {
Expand Down