Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update BI #67

Merged
merged 23 commits into from
May 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 58 additions & 31 deletions certego_saas/ext/upload/elastic.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,57 @@
import datetime
import logging
from typing import Any, Dict, Tuple
from typing import Tuple

from django.utils.timezone import now
from elasticsearch.helpers import bulk
from rest_framework.fields import Field

logger = logging.getLogger(__name__)

import elasticsearch


class __BIDocumentInterface:
index: str
creation_date: datetime
category: str
count: int
kwargs: Dict[str, str]

def to_json(self) -> Dict[str, Any]:
res = {
"timestamp": self.creation_date,
"bi_category": self.category,
"count": self.count,
**self.kwargs,
}
logger.debug(f"Json document: {res}")
return res
class __AbstractBISerializer:
application: Field
environment: Field
timestamp: Field

def to_bulk(self) -> Dict[str, Any]:
@staticmethod
def to_elastic_dict(data, index):
return {
"_source": data,
"_index": index + "-" + data["environment"] + "-" + now().strftime("%Y.%m"),
"_op_type": "index",
"_index": self.index,
"_source": self.to_json(),
}


class __BIDocumentInterface:
timestamp: datetime
application: str
environment: str

@classmethod
def upload(
cls,
client: elasticsearch.Elasticsearch,
serializer,
index: str = None,
timeout: int = 30,
max_number: int = None,
) -> Tuple:
qs = cls.objects
if index:
qs.filter(index=index)
docs = qs.order_by("+creation_date")
docs = qs.order_by("+timestamp")
if max_number:
if max_number > 10000:
return False, [
"max_number can't be higher than 10000 for performance reasons."
]
docs = docs[:max_number]
logger.info(f"Uploading {docs.count()} documents")
jsons = map(lambda x: x.to_bulk(), docs)
jsons = serializer(instance=docs, many=True).data
logger.info(f"Documents to upload: {jsons}")
success, errors = bulk(client, jsons, request_timeout=timeout)
logger.info("Finished Upload. Starting deletion documents")
docs.delete()
Expand All @@ -65,7 +64,7 @@ def clean(self):
}

def __repr__(self):
return f"|{self.index=}, {self.category=}, {self.count=}, {self.kwargs=}"
return f"|{self.timestamp=}, {self.application=}, {self.environment=}"


try:
Expand All @@ -75,25 +74,53 @@ def __repr__(self):
except ImportError:
from django.db.models import Index, JSONField, Model
from django.db.models import fields as django_fields
from rest_framework.fields import SerializerMethodField
from rest_framework.serializers import ModelSerializer

class BIDocument(__BIDocumentInterface, Model):
index = django_fields.CharField(max_length=100)
creation_date = django_fields.DateTimeField(auto_now_add=True)
category = django_fields.CharField(max_length=100)
count = django_fields.PositiveIntegerField()
timestamp = django_fields.DateTimeField(auto_now_add=True)
application = django_fields.CharField(max_length=100)
environment = django_fields.CharField(max_length=100)
kwargs = JSONField()

class Meta:
indexes = [Index(fields=["index"]), Index(fields=["creation_date"])]
indexes = [Index(fields=["index"]), Index(fields=["timestamp"])]

class BISerializer(__AbstractBISerializer, ModelSerializer):
index = SerializerMethodField(method_name="get_index")

class Meta:
fields = [
"application",
"environment",
"timestamp",
]

else:
from rest_framework_mongoengine.serializers import DocumentSerializer

class BIDocument(__BIDocumentInterface, Document):
index = mongo_fields.StringField(required=True)
creation_date = mongo_fields.DateTimeField(
timestamp = mongo_fields.DateTimeField(
required=True, default=datetime.datetime.now
)
category = mongo_fields.StringField(required=True)
count = mongo_fields.IntField(required=True, min_value=0)
application = mongo_fields.StringField(required=True)
environment = mongo_fields.StringField(required=True)
kwargs = mongo_fields.DictField(required=False)
meta = {"indexes": ["index", "creation_date"]}
meta = {"indexes": ["index", "timestamp"]}

class BISerializer(__AbstractBISerializer, DocumentSerializer):
class Meta:
model = BIDocument
fields = [
"application",
"environment",
"timestamp",
]

def to_representation(self, instance: BIDocument):
data = super().to_representation(instance)
for key, value in instance.kwargs.items():
data[key] = value
return self.to_elastic_dict(data, instance.index)
Loading