diff --git a/certego_saas/ext/upload/elastic.py b/certego_saas/ext/upload/elastic.py index 1e3a634..fdbf14f 100644 --- a/certego_saas/ext/upload/elastic.py +++ b/certego_saas/ext/upload/elastic.py @@ -1,42 +1,40 @@ import datetime import logging -from typing import Any, Dict, Tuple +from typing import Tuple +from django.utils.timezone import now from elasticsearch.helpers import bulk +from rest_framework.fields import Field logger = logging.getLogger(__name__) import elasticsearch -class __BIDocumentInterface: - index: str - creation_date: datetime - category: str - count: int - kwargs: Dict[str, str] - - def to_json(self) -> Dict[str, Any]: - res = { - "timestamp": self.creation_date, - "bi_category": self.category, - "count": self.count, - **self.kwargs, - } - logger.debug(f"Json document: {res}") - return res +class __AbstractBISerializer: + application: Field + environment: Field + timestamp: Field - def to_bulk(self) -> Dict[str, Any]: + @staticmethod + def to_elastic_dict(data, index): return { + "_source": data, + "_index": index + "-" + data["environment"] + "-" + now().strftime("%Y.%m"), "_op_type": "index", - "_index": self.index, - "_source": self.to_json(), } + +class __BIDocumentInterface: + timestamp: datetime + application: str + environment: str + @classmethod def upload( cls, client: elasticsearch.Elasticsearch, + serializer, index: str = None, timeout: int = 30, max_number: int = None, @@ -44,7 +42,7 @@ def upload( qs = cls.objects if index: qs.filter(index=index) - docs = qs.order_by("+creation_date") + docs = qs.order_by("+timestamp") if max_number: if max_number > 10000: return False, [ @@ -52,7 +50,8 @@ def upload( ] docs = docs[:max_number] logger.info(f"Uploading {docs.count()} documents") - jsons = map(lambda x: x.to_bulk(), docs) + jsons = serializer(instance=docs, many=True).data + logger.info(f"Documents to upload: {jsons}") success, errors = bulk(client, jsons, request_timeout=timeout) logger.info("Finished Upload. Starting deletion documents") docs.delete() @@ -65,7 +64,7 @@ def clean(self): } def __repr__(self): - return f"|{self.index=}, {self.category=}, {self.count=}, {self.kwargs=}" + return f"|{self.timestamp=}, {self.application=}, {self.environment=}" try: @@ -75,25 +74,53 @@ def __repr__(self): except ImportError: from django.db.models import Index, JSONField, Model from django.db.models import fields as django_fields + from rest_framework.fields import SerializerMethodField + from rest_framework.serializers import ModelSerializer class BIDocument(__BIDocumentInterface, Model): index = django_fields.CharField(max_length=100) - creation_date = django_fields.DateTimeField(auto_now_add=True) - category = django_fields.CharField(max_length=100) - count = django_fields.PositiveIntegerField() + timestamp = django_fields.DateTimeField(auto_now_add=True) + application = django_fields.CharField(max_length=100) + environment = django_fields.CharField(max_length=100) kwargs = JSONField() class Meta: - indexes = [Index(fields=["index"]), Index(fields=["creation_date"])] + indexes = [Index(fields=["index"]), Index(fields=["timestamp"])] + + class BISerializer(__AbstractBISerializer, ModelSerializer): + index = SerializerMethodField(method_name="get_index") + + class Meta: + fields = [ + "application", + "environment", + "timestamp", + ] else: + from rest_framework_mongoengine.serializers import DocumentSerializer class BIDocument(__BIDocumentInterface, Document): index = mongo_fields.StringField(required=True) - creation_date = mongo_fields.DateTimeField( + timestamp = mongo_fields.DateTimeField( required=True, default=datetime.datetime.now ) - category = mongo_fields.StringField(required=True) - count = mongo_fields.IntField(required=True, min_value=0) + application = mongo_fields.StringField(required=True) + environment = mongo_fields.StringField(required=True) kwargs = mongo_fields.DictField(required=False) - meta = {"indexes": ["index", "creation_date"]} + meta = {"indexes": ["index", "timestamp"]} + + class BISerializer(__AbstractBISerializer, DocumentSerializer): + class Meta: + model = BIDocument + fields = [ + "application", + "environment", + "timestamp", + ] + + def to_representation(self, instance: BIDocument): + data = super().to_representation(instance) + for key, value in instance.kwargs.items(): + data[key] = value + return self.to_elastic_dict(data, instance.index)