From a55d1cd0857f1565800d949507917367d36ded51 Mon Sep 17 00:00:00 2001 From: Klaus Deja Date: Thu, 12 Mar 2020 19:57:35 +0100 Subject: [PATCH] Elastic update only modified --- manage.py | 2 +- .../ElasticsearchMeetingIndex.py | 10 +++++++++- .../ElasticsearchOrganizationIndex.py | 12 +++++++++++- .../elasticsearch_import/ElasticsearchPaperIndex.py | 10 +++++++++- .../ElasticsearchPaperLocationIndex.py | 13 +++++++++++-- .../ElasticsearchPersonIndex.py | 12 +++++++++++- oparlsync/maintenance/Maintenance.py | 9 +++++---- oparlsync/maintenance/MaintenanceRegionElastic.py | 4 ++-- 8 files changed, 59 insertions(+), 13 deletions(-) diff --git a/manage.py b/manage.py index c1d5f49..0569adc 100644 --- a/manage.py +++ b/manage.py @@ -17,7 +17,7 @@ daemon_choices = ['start', 'start-foreground', 'stop', 'status'] queue_choices = ['add', 'clear', 'list', 'stats'] module_choices = ['download', 'backref', 'elastic', 'thumbnails', 'fulltext', 'georef', 'sitemap', 'misc', 'worker'] -worker_region_choices = ['region-download', 'region-elastic', 'sync-region', 'sync-regions'] +worker_region_choices = ['region-download', 'elasticsearch_regions', 'sync-region', 'sync-regions', 'generate_regions'] worker_body_choices = ['remove-body', 'sync-bodies', 'sync-body', 'remove-locations', 'reset-georef', 'reset-lastsync'] worker_misc_choices = ['migrate-ids', 'fix-oparl-11', 'sitemap-master'] diff --git a/oparlsync/elasticsearch_import/ElasticsearchMeetingIndex.py b/oparlsync/elasticsearch_import/ElasticsearchMeetingIndex.py index 07ebe6f..981d61c 100644 --- a/oparlsync/elasticsearch_import/ElasticsearchMeetingIndex.py +++ b/oparlsync/elasticsearch_import/ElasticsearchMeetingIndex.py @@ -21,6 +21,7 @@ class ElasticsearchMeetingIndex: def meeting_index(self): + self.datalog.info('Starting meeting indexing...') if not self.es.indices.exists_alias(name='meeting-latest'): now = datetime.utcnow() index_name = 'meeting-' + now.strftime('%Y%m%d-%H%M') @@ -52,7 +53,13 @@ def meeting_index(self): regions.append(str(region.id)) region = region.parent - for meeting in Meeting.objects(body=self.body).no_cache(): + last_index_timestamp = Option.get('last_index_meeting') + + query_args = {'body': self.body} + if last_index_timestamp: + query_args['modified__gt'] = last_index_timestamp + + for meeting in Meeting.objects(**query_args).no_cache(): if meeting.deleted: self.es.delete( index=index_name, @@ -78,4 +85,5 @@ def meeting_index(self): self.statistics['created'], self.statistics['updated'] )) + Option.set('last_index_meeting', datetime.utcnow(), 'datetime') diff --git a/oparlsync/elasticsearch_import/ElasticsearchOrganizationIndex.py b/oparlsync/elasticsearch_import/ElasticsearchOrganizationIndex.py index 05f2a9a..0ae692b 100644 --- a/oparlsync/elasticsearch_import/ElasticsearchOrganizationIndex.py +++ b/oparlsync/elasticsearch_import/ElasticsearchOrganizationIndex.py @@ -21,6 +21,8 @@ class ElasticsearchOrganizationIndex: def organization_index(self): + self.datalog.info('Starting organization indexing...') + if not self.es.indices.exists_alias(name='organization-latest'): now = datetime.utcnow() index_name = 'organization-' + now.strftime('%Y%m%d-%H%M') @@ -52,7 +54,14 @@ def organization_index(self): regions.append(str(region.id)) region = region.parent - for organization in Organization.objects(body=self.body).no_cache(): + + last_index_timestamp = Option.get('last_index_organization') + + query_args = {'body': self.body} + if last_index_timestamp: + query_args['modified__gt'] = last_index_timestamp + + for organization in Organization.objects(**query_args).no_cache(): if organization.deleted: self.es.delete( index=index_name, @@ -78,4 +87,5 @@ def organization_index(self): self.statistics['created'], self.statistics['updated'] )) + Option.set('last_index_organization', datetime.utcnow(), 'datetime') diff --git a/oparlsync/elasticsearch_import/ElasticsearchPaperIndex.py b/oparlsync/elasticsearch_import/ElasticsearchPaperIndex.py index f5c5535..1ce130e 100644 --- a/oparlsync/elasticsearch_import/ElasticsearchPaperIndex.py +++ b/oparlsync/elasticsearch_import/ElasticsearchPaperIndex.py @@ -21,6 +21,7 @@ class ElasticsearchPaperIndex: def paper_index(self): + self.datalog.info('Starting paper indexing...') if not self.es.indices.exists_alias(name='paper-latest'): now = datetime.utcnow() index_name = 'paper-' + now.strftime('%Y%m%d-%H%M') @@ -46,13 +47,19 @@ def paper_index(self): else: index_name = list(self.es.indices.get_alias('paper-latest'))[0] + last_index_timestamp = Option.get('last_index_paper') + regions = [] region = self.body.region while (region): regions.append(str(region.id)) region = region.parent - for paper in Paper.objects(body=self.body).no_cache(): + query_args = {'body': self.body} + if last_index_timestamp: + query_args['modified__gt'] = last_index_timestamp + + for paper in Paper.objects(**query_args).no_cache(): if paper.deleted: self.es.delete( index=index_name, @@ -78,4 +85,5 @@ def paper_index(self): self.statistics['created'], self.statistics['updated'] )) + Option.set('last_index_paper', datetime.utcnow(), 'datetime') diff --git a/oparlsync/elasticsearch_import/ElasticsearchPaperLocationIndex.py b/oparlsync/elasticsearch_import/ElasticsearchPaperLocationIndex.py index e08e9e4..490068a 100644 --- a/oparlsync/elasticsearch_import/ElasticsearchPaperLocationIndex.py +++ b/oparlsync/elasticsearch_import/ElasticsearchPaperLocationIndex.py @@ -22,6 +22,7 @@ class ElasticsearchPaperLocationIndex: def paper_location_index(self): + self.datalog.info('Starting paper location indexing...') if not self.es.indices.exists_alias(name='paper-location-latest'): now = datetime.utcnow() index_name = 'paper-location-' + now.strftime('%Y%m%d-%H%M') @@ -57,7 +58,13 @@ def paper_location_index(self): regions.append(str(region.id)) region = region.parent - for location in Location.objects(body=self.body).no_cache(): + last_index_timestamp = Option.get('last_index_paper_location') + + query_args = {'body': self.body} + if last_index_timestamp: + query_args['modified__gt'] = last_index_timestamp + + for location in Location.objects(**query_args).no_cache(): if location.deleted: self.es.delete( index=index_name, @@ -92,7 +99,8 @@ def paper_location_index(self): location_dict['geotype'] = location_dict['geojson']['geometry']['type'] location_dict['geojson'] = json.dumps(location_dict['geojson']) - location_dict['legacy'] = bool(location.region.legacy) + if location.region: + location_dict['legacy'] = bool(location.region.legacy) try: new_doc = self.es.index( index=index_name, @@ -110,3 +118,4 @@ def paper_location_index(self): self.statistics['created'], self.statistics['updated'] )) + Option.set('last_index_paper_location', datetime.utcnow(), 'datetime') diff --git a/oparlsync/elasticsearch_import/ElasticsearchPersonIndex.py b/oparlsync/elasticsearch_import/ElasticsearchPersonIndex.py index 83f64c2..55aa63c 100644 --- a/oparlsync/elasticsearch_import/ElasticsearchPersonIndex.py +++ b/oparlsync/elasticsearch_import/ElasticsearchPersonIndex.py @@ -21,6 +21,8 @@ class ElasticsearchPersonIndex: def person_index(self): + self.datalog.info('Starting person indexing...') + if not self.es.indices.exists_alias(name='person-latest'): now = datetime.utcnow() index_name = 'person-' + now.strftime('%Y%m%d-%H%M') @@ -52,7 +54,13 @@ def person_index(self): regions.append(str(region.id)) region = region.parent - for person in Person.objects(body=self.body).no_cache(): + last_index_timestamp = Option.get('last_index_person') + + query_args = {'body': self.body} + if last_index_timestamp: + query_args['modified__gt'] = last_index_timestamp + + for person in Person.objects(**query_args).no_cache(): if person.deleted: self.es.delete( index=index_name, @@ -78,4 +86,6 @@ def person_index(self): self.statistics['created'], self.statistics['updated'] )) + Option.set('last_index_person', datetime.utcnow(), 'datetime') + diff --git a/oparlsync/maintenance/Maintenance.py b/oparlsync/maintenance/Maintenance.py index 7effdb8..e061235 100644 --- a/oparlsync/maintenance/Maintenance.py +++ b/oparlsync/maintenance/Maintenance.py @@ -32,8 +32,8 @@ class Maintenance(BaseTask, MaintenanceRemove, MaintenanceRegion, MaintenanceReg 'elasticsearch' ] - def __init__(self, body_id): - self.body_id = body_id + def __init__(self, **kwargs): + self.body_id = kwargs['region'] super().__init__() self.valid_objects = [ Body, @@ -48,6 +48,7 @@ def __init__(self, body_id): File, Location ] + self.run(self.body_id, kwargs['job']) def run(self, body_id, *args): if len(args) < 1: @@ -58,7 +59,7 @@ def run(self, body_id, *args): elif args[0] == 'generate_regions': self.generate_regions() elif args[0] == 'elasticsearch_regions': - self.elasticsearch_regions() + self.elasticsearch_regions(body_id) elif args[0] == 'update_street_locality': self.update_street_locality() elif args[0] == 'sync_bodies': @@ -86,7 +87,7 @@ def activate_body(self, body_id): self.sync_body(body_id) self.generate_regions() self.sync_body(body_id) - self.elasticsearch_regions() + self.elasticsearch_regions(body_id) def fix_oparl_11(self): count_delete = 0 diff --git a/oparlsync/maintenance/MaintenanceRegionElastic.py b/oparlsync/maintenance/MaintenanceRegionElastic.py index acb06ee..eaf9df5 100644 --- a/oparlsync/maintenance/MaintenanceRegionElastic.py +++ b/oparlsync/maintenance/MaintenanceRegionElastic.py @@ -17,12 +17,12 @@ class MaintenanceRegionElastic: - def elasticsearch_regions(self): + def elasticsearch_regions(self, body_id): if not self.es.indices.exists_alias(name='region-latest'): now = datetime.utcnow() index_name = 'region-' + now.strftime('%Y%m%d-%H%M') - es_import = ElasticsearchImport(self) + es_import = ElasticsearchImport(**{'body': body_id}) mapping = es_import.es_mapping_generator(Region, deref='deref_region', delete='delete_region') mapping['properties']['body_count'] = {