From 0686ca87c64e3ac41c610fb503ff31b1e4fcabf6 Mon Sep 17 00:00:00 2001 From: Ben Outram Date: Thu, 7 Dec 2023 14:32:06 +0000 Subject: [PATCH] =?UTF-8?q?EES-4727:=20Tidying=20up=20the=20data=5Fingesti?= =?UTF-8?q?on=20code=20by=20adding=20types=20and=20spli=E2=80=A6=20(#29)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * EES-4727: Tidying up the data_ingestion code by adding types and splitting up methods for parsing content. * EES-4727 Rename data_upsertion to upsert_data * EES-4727 Rename get_collection to ensure_collection_exists and inspect client error for 404 * EES-4727 Add content_utils.get_content_block_text and rename utils.py to text_utils.py * EES-4727 Handle empty headlines and key statistics text --- chatbot-prototype.code-workspace | 6 +- data_ingestion/routers/maintenance.py | 20 ++-- data_ingestion/routers/methodologies.py | 8 +- data_ingestion/routers/publications.py | 8 +- .../services/methodology_service.py | 70 ++++++------ .../services/publication_service.py | 24 ++--- data_ingestion/services/release_service.py | 100 +++++++++--------- .../services/tablebuilder_service.py | 22 ++-- data_ingestion/services/vector_db_client.py | 41 ++++--- data_ingestion/utils.py | 17 --- data_ingestion/utils/__init__.py | 0 data_ingestion/utils/content_utils.py | 13 +++ data_ingestion/utils/text_utils.py | 12 +++ data_ingestion_tests/utils/__init__.py | 0 .../text_utils_test.py} | 2 +- 15 files changed, 176 insertions(+), 167 deletions(-) delete mode 100644 data_ingestion/utils.py create mode 100644 data_ingestion/utils/__init__.py create mode 100644 data_ingestion/utils/content_utils.py create mode 100644 data_ingestion/utils/text_utils.py create mode 100644 data_ingestion_tests/utils/__init__.py rename data_ingestion_tests/{utils_test.py => utils/text_utils_test.py} (70%) diff --git a/chatbot-prototype.code-workspace b/chatbot-prototype.code-workspace index 8147958..57e9be5 100644 --- a/chatbot-prototype.code-workspace +++ b/chatbot-prototype.code-workspace @@ -52,7 +52,11 @@ "Response Automater Tests", "Data Ingestion API", "Data Ingestion API Tests" - ] + ], + "python.analysis.autoImportCompletions": true, + "python.analysis.inlayHints.functionReturnTypes": true, + "python.analysis.inlayHints.variableTypes": true, + "python.analysis.inlayHints.callArgumentNames": "partial" }, "extensions": { "recommendations": [ diff --git a/data_ingestion/routers/maintenance.py b/data_ingestion/routers/maintenance.py index c2c63b8..41b69e4 100644 --- a/data_ingestion/routers/maintenance.py +++ b/data_ingestion/routers/maintenance.py @@ -7,31 +7,29 @@ ) from ..services.publication_service import fetch_publication_slugs from ..services.release_service import extract_releases -from ..services.vector_db_client import data_upsertion, recreate_collection +from ..services.vector_db_client import recreate_collection, upsert_data router = APIRouter(prefix="/api/maintenance") -@router.post("/publications/build") -async def build_publications(): - slugs = fetch_publication_slugs() +@router.post(path="/publications/build") +async def build_publications() -> JSONResponse: try: - data_upsertion(slugs, extract_releases) + upsert_data(records=extract_releases(slugs=fetch_publication_slugs())) except Exception as e: return JSONResponse(status_code=500, content={"Content": e}) return JSONResponse(status_code=200, content={"Content": "Successful"}) -@router.post("/methodologies/build") -async def build_methodologies(): - slugs = fetch_methodology_slugs() +@router.post(path="/methodologies/build") +async def build_methodologies() -> JSONResponse: try: - data_upsertion(slugs, extract_methodologies) + upsert_data(records=extract_methodologies(slugs=fetch_methodology_slugs())) except Exception as e: return JSONResponse(status_code=500, content={"Content": e}) return JSONResponse(status_code=200, content={"Content": "Successful"}) -@router.delete("/clear", status_code=status.HTTP_204_NO_CONTENT) -async def clear(): +@router.delete(path="/clear", status_code=status.HTTP_204_NO_CONTENT) +async def clear() -> None: recreate_collection() diff --git a/data_ingestion/routers/methodologies.py b/data_ingestion/routers/methodologies.py index e1eac34..bcf6b51 100644 --- a/data_ingestion/routers/methodologies.py +++ b/data_ingestion/routers/methodologies.py @@ -2,16 +2,16 @@ from fastapi.responses import JSONResponse from ..services.methodology_service import delete_methodology, extract_methodologies -from ..services.vector_db_client import data_upsertion +from ..services.vector_db_client import upsert_data router = APIRouter(prefix="/api/methodologies") -@router.post("/{slug}/update") -def update(slug: str): +@router.post(path="/{slug}/update") +def update(slug: str) -> JSONResponse: try: delete_methodology(slug=slug) - data_upsertion([slug], extract_methodologies) + upsert_data(records=extract_methodologies(slugs=[slug])) except Exception as e: return JSONResponse(status_code=500, content={"Content": e}) return JSONResponse(status_code=200, content={"Content": "Succesful"}) diff --git a/data_ingestion/routers/publications.py b/data_ingestion/routers/publications.py index 5d1bdd8..b5420e3 100644 --- a/data_ingestion/routers/publications.py +++ b/data_ingestion/routers/publications.py @@ -3,16 +3,16 @@ from ..services.publication_service import delete_publication from ..services.release_service import extract_releases -from ..services.vector_db_client import data_upsertion +from ..services.vector_db_client import upsert_data router = APIRouter(prefix="/api/publications") -@router.post("/{slug}/update") -async def update(slug: str): +@router.post(path="/{slug}/update") +async def update(slug: str) -> JSONResponse: try: delete_publication(slug=slug) - data_upsertion([slug], extract_releases) + upsert_data(records=extract_releases(slugs=[slug])) except Exception as e: return JSONResponse(status_code=500, content={"Content": e}) return JSONResponse(status_code=200, content={"Content": "Successful"}) diff --git a/data_ingestion/services/methodology_service.py b/data_ingestion/services/methodology_service.py index ec94a66..52801a2 100644 --- a/data_ingestion/services/methodology_service.py +++ b/data_ingestion/services/methodology_service.py @@ -1,59 +1,49 @@ -import json import logging import requests -from bs4 import BeautifulSoup from ..config import settings +from ..utils.content_utils import get_content_block_text from .vector_db_client import delete_url logger = logging.getLogger(__name__) -def delete_methodology(slug: str): +def delete_methodology(slug: str) -> None: delete_url(url=f"{settings.ees_url_api_content}/methodology{slug}") -def extract_methodologies(slugs): - texts = [] - for slug in slugs: - methodology_info = {} - content = fetch_methodology(slug) - methodology_info["text"] = content["data"] - methodology_info["link"] = content["link"] - texts.append(methodology_info) - return texts +def extract_methodologies(slugs: list[str]) -> list[dict[str, str]]: + return list(map(fetch_methodology, slugs)) -def fetch_methodology(slug: str): - methodology_content = {} - methodology_content["link"] = f"{settings.ees_url_public_ui}/methodology/{slug}" - res = requests.get(f"{settings.ees_url_api_content}/methodologies/{slug}") - text = json.loads(res.text) +def fetch_methodology(slug: str) -> dict[str, str]: try: - methodology_content["data"] = "Headlines Section: " - methodology_content["data"] += BeautifulSoup( - text["headlinesSection"]["content"][0]["body"], "html.parser" - ).get_text() - except KeyError as e: - logger.error(f" Error: Key '{e.args[0]}' not found whilst reading content for methodology with slug: '{slug}'") - - methodology_content["data"] += "Content Section" - for i in range(len(text["content"])): - for j in range(len(text["content"][i]["content"])): - try: - methodology_content["data"] += BeautifulSoup( - text["content"][i]["content"][j]["body"], "html.parser" - ).get_text() - except KeyError: - logger.debug(f"Key does not exist for {slug} at {i}") - return methodology_content - - -def fetch_methodology_slugs(): - data = requests.get(f"{settings.ees_url_api_content}/methodology-themes").json() - slugs = [] - for item in data: + response = requests.get(url=f"{settings.ees_url_api_content}/methodologies/{slug}") + response.raise_for_status() + response_json = response.json() + methodology_version_id = response_json["id"] + + logger.debug(f"Processing content for methodology version: {methodology_version_id}") + + return { + "link": f"{settings.ees_url_public_ui}/methodology/{slug}", + "text": get_content_block_text(res=response_json), + } + except requests.exceptions.HTTPError as err: + if err.response.status_code == 404: + logger.error(f"Methodology version for slug {slug} was not found") + return {} + else: + raise + + +def fetch_methodology_slugs() -> list[str]: + response = requests.get(url=f"{settings.ees_url_api_content}/methodology-themes") + response.raise_for_status() + response_json = response.json() + slugs: list[str] = [] + for item in response_json: for topic in item["topics"]: for publication in topic["publications"]: for methodology in publication["methodologies"]: diff --git a/data_ingestion/services/publication_service.py b/data_ingestion/services/publication_service.py index dc1d959..df66e5c 100644 --- a/data_ingestion/services/publication_service.py +++ b/data_ingestion/services/publication_service.py @@ -1,4 +1,3 @@ -import json import logging import requests @@ -9,20 +8,15 @@ logger = logging.getLogger(__name__) -def delete_publication(slug: str): +def delete_publication(slug: str) -> None: delete_url(url=f"{settings.ees_url_public_ui}/find-statistics/{slug}") -def fetch_publication_slugs(): - try: - response = requests.get( - f"{settings.ees_url_api_content}/publications?page=1&pageSize=9999&sort=published&order=asc" - ) - response.raise_for_status() - publications = json.loads(response.text)["results"] - slugs = [publications[i]["slug"] for i in range(len(publications))] - return slugs - except requests.HTTPError as http_err: - logger.error(f"HTTP error occurred: {http_err}") - except Exception as err: - logger.error(f"Other error occurred: {err}") +def fetch_publication_slugs() -> list[str]: + response = requests.get( + url=f"{settings.ees_url_api_content}/publications?page=1&pageSize=9999&sort=published&order=asc" + ) + response.raise_for_status() + response_json = response.json() + publications = response_json["results"] + return list(map(lambda publication: publication["slug"], publications)) diff --git a/data_ingestion/services/release_service.py b/data_ingestion/services/release_service.py index c2725bb..4034c41 100644 --- a/data_ingestion/services/release_service.py +++ b/data_ingestion/services/release_service.py @@ -1,60 +1,60 @@ import logging -from typing import Dict, List import requests from bs4 import BeautifulSoup from ..config import settings -from .tablebuilder_service import fetch_key_stat +from ..utils.content_utils import get_content_block_text +from .tablebuilder_service import fetch_data_block logger = logging.getLogger(__name__) -def extract_releases(slugs: str) -> List[Dict]: - texts = [] - for slug in slugs: - slug_info = {} - res = requests.get(f"{settings.ees_url_api_content}/publications/{slug}/releases/latest") - key_stats = {} - response_json = res.json() - release_id = response_json["publication"]["releases"][0]["id"] - try: - key_statistics = response_json["keyStatistics"] - if key_statistics != []: - data_strings = [] - for i, statistic in enumerate(key_statistics): - data_strings.append(fetch_key_stat(statistic, release_id, i)) - key_stats["data"] = "Key Statistics section: ".join(data_strings) - except KeyError: - logger.warn(f"{slug} doesnt contain key stats") - try: - slug_info["text"] = key_stats["data"] - content = fetch_release(slug, response_json) - slug_info["text"] += content["data"] - slug_info["link"] = content["link"] - except Exception: - logger.warn(f"{slug} doesnt contain key stats") - content = fetch_release(slug, response_json) - slug_info["text"] = content["data"] - slug_info["link"] = content["link"] - texts.append(slug_info) - return texts - - -def fetch_release(slug: str, res: dict) -> dict: - slug_content = {} - slug_content["link"] = f"{settings.ees_url_public_ui}/find-statistics/{slug}" - try: - slug_content["data"] = "Headlines Section: " - slug_content["data"] += BeautifulSoup(res["headlinesSection"]["content"][0]["body"], "html.parser").get_text() - except Exception as e: - logger.info(f" Error: {e}. For {slug} the headlines section doesnt exist") - - slug_content["data"] += "Content Section" - for i in range(len(res["content"])): - for j in range(len(res["content"][i]["content"])): - try: - slug_content["data"] += BeautifulSoup(res["content"][i]["content"][j]["body"], "html.parser").get_text() - except KeyError: - logger.debug(f"Key does not exist for {slug} at {i}") - return slug_content +def extract_releases(slugs: list[str]) -> list[dict[str, str]]: + return list(map(fetch_release, slugs)) + + +def fetch_release(slug: str) -> dict[str, str]: + response = requests.get(url=f"{settings.ees_url_api_content}/publications/{slug}/releases/latest") + response.raise_for_status() + response_json = response.json() + release_id = response_json["id"] + + logger.debug(f"Processing content for release id: {release_id}") + + headlines_text = get_headlines_text(res=response_json) or "" + key_stats_text = get_key_statistics_text(release_id=release_id, res=response_json) or "" + content_block_text = get_content_block_text(res=response_json) + + return { + "link": f"{settings.ees_url_public_ui}/find-statistics/{slug}", + "text": f"{headlines_text}{key_stats_text}{content_block_text}", + } + + +def get_headlines_text(res: dict) -> str | None: + headlines_section = res["headlinesSection"]["content"] + if headlines_section: + headlines_content_block = headlines_section[0] + headlines = BeautifulSoup(markup=headlines_content_block["body"], features="html.parser").get_text() + return f"Headline: {headlines}" + + +def get_key_statistics_text(release_id: str, res: dict) -> str | None: + key_statistics = res["keyStatistics"] + if key_statistics: + key_statistics_content = list( + map( + lambda item: get_key_statistic_text(release_id=release_id, index_and_key_statistic=item), + enumerate(key_statistics), + ) + ) + return "Key statistic ".join(key_statistics_content) + + +def get_key_statistic_text(release_id: str, index_and_key_statistic: tuple[int, dict[str, str]]) -> str: + index, key_statistic = index_and_key_statistic + data_block_id = key_statistic["dataBlockId"] + return fetch_data_block( + release_id=release_id, data_block_id=data_block_id, key_statistic=key_statistic, index=index + ) diff --git a/data_ingestion/services/tablebuilder_service.py b/data_ingestion/services/tablebuilder_service.py index 47bcfbc..fb79bcc 100644 --- a/data_ingestion/services/tablebuilder_service.py +++ b/data_ingestion/services/tablebuilder_service.py @@ -7,25 +7,27 @@ logger = logging.getLogger(__name__) -def fetch_key_stat(statistic: dict, release_id: str, i: int) -> str: +def fetch_data_block(release_id: str, data_block_id: str, key_statistic: dict[str, str], index: int) -> str: try: - data_block_id = statistic["dataBlockId"] - res = requests.get(f"{settings.ees_url_api_data}/tablebuilder/release/{release_id}/data-block/{data_block_id}") - response_json = res.json() + response = requests.get( + url=f"{settings.ees_url_api_data}/tablebuilder/release/{release_id}/data-block/{data_block_id}" + ) + response.raise_for_status() + response_json = response.json() label = response_json["subjectMeta"]["indicators"][0]["label"] measure = list(response_json["results"][0]["measures"].values())[0] try: unit = response_json["subjectMeta"]["indicators"][0]["unit"] - measure = f"{measure} {unit}" + measure = f"{measure}{unit}" except KeyError: logger.error("No unit found") except Exception: - label = statistic["title"] - measure = statistic["statistic"] + label = key_statistic["title"] + measure = key_statistic["statistic"] try: - trend = statistic["trend"] - data_string = f"{i + 1}: {label}-{measure} {trend}." + trend = key_statistic["trend"] + data_string: str = f"{index + 1}: {label}-{measure} {trend}." except Exception: - data_string = f"{i +1}: {label}-{measure}." + data_string = f"{index + 1}: {label}-{measure}." return data_string diff --git a/data_ingestion/services/vector_db_client.py b/data_ingestion/services/vector_db_client.py index 769980e..96b5871 100644 --- a/data_ingestion/services/vector_db_client.py +++ b/data_ingestion/services/vector_db_client.py @@ -1,4 +1,5 @@ import logging +from itertools import chain import openai import qdrant_client.models as models @@ -6,17 +7,18 @@ from qdrant_client.http.exceptions import UnexpectedResponse from ..config import settings -from ..utils import chunk_text +from ..utils.text_utils import chunk_text logger = logging.getLogger(__name__) -client = QdrantClient(settings.qdrant_host, port=settings.qdrant_port) +client = QdrantClient(location=settings.qdrant_host, port=settings.qdrant_port) -def data_upsertion(slugs, func, batch_size=100) -> None: - get_collection() - text = func(slugs) - chunks = chunk_text(text) +def upsert_data(records: list[dict[str, str]], batch_size: int = 100) -> None: + ensure_collection_exists() + + chunks = list(chain.from_iterable(map(create_url_text_map, records))) + for i in range(0, len(chunks), batch_size): end_index = min(i + batch_size, len(chunks)) batch_meta = chunks[i:end_index] @@ -27,6 +29,7 @@ def data_upsertion(slugs, func, batch_size=100) -> None: logger.error(f"An error occured within embedding model: {e}") formatted_embeddings = [embeds["data"][j]["embedding"] for j in range(len(embeds["data"]))] + client.upsert( collection_name=settings.qdrant_collection, points=models.Batch( @@ -36,27 +39,37 @@ def data_upsertion(slugs, func, batch_size=100) -> None: ), ) - logger.info("Batch upserted") + logger.debug("Batch upserted") + + logger.info("Data upsertion completed") + - logger.info("Text upserted") +def create_url_text_map(record: dict[str, str]) -> list[dict[str, str]]: + if record is None: + return [] + chunks = chunk_text(text=record["text"]) + return list(map(lambda text: {"url": record["link"], "text": text}, chunks)) -def recreate_collection(): +def recreate_collection() -> bool: return client.recreate_collection( collection_name=settings.qdrant_collection, vectors_config=models.VectorParams(distance=models.Distance.COSINE, size=1536), ) -def get_collection(): +def ensure_collection_exists() -> None: try: client.get_collection(collection_name=settings.qdrant_collection) - except UnexpectedResponse: - logger.debug("EES database doesn't exist - need to create database") - recreate_collection() + except UnexpectedResponse as e: + if e.status_code == 404: + logger.debug("Collection doesn't exist - recreating collection") + recreate_collection() + else: + raise -def delete_url(url: str): +def delete_url(url: str) -> None: client.delete( collection_name=settings.qdrant_collection, points_selector=models.FilterSelector( diff --git a/data_ingestion/utils.py b/data_ingestion/utils.py deleted file mode 100644 index 5594810..0000000 --- a/data_ingestion/utils.py +++ /dev/null @@ -1,17 +0,0 @@ -from langchain.text_splitter import RecursiveCharacterTextSplitter - - -def chunk_text(text): - text_splitter = RecursiveCharacterTextSplitter( - chunk_size=700, chunk_overlap=100, separators=["\n\n", "\n", " ", ""] - ) - chunks = [] - for record in text: - text_temp = text_splitter.split_text(record["text"]) - chunks.extend([{"url": record["link"], "text": text_temp[i]} for i in range(len(text_temp))]) - - return chunks - - -def temp_method_for_proof_of_concept_tests(some_number): - return 2 * some_number diff --git a/data_ingestion/utils/__init__.py b/data_ingestion/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/data_ingestion/utils/content_utils.py b/data_ingestion/utils/content_utils.py new file mode 100644 index 0000000..27d196c --- /dev/null +++ b/data_ingestion/utils/content_utils.py @@ -0,0 +1,13 @@ +from bs4 import BeautifulSoup + + +def get_content_block_text(res: dict) -> str: + content_sections = res["content"] + result = "Content: " + for section_index in range(len(content_sections)): + content_blocks = content_sections[section_index]["content"] + for block_index in range(len(content_blocks)): + content_block = content_blocks[block_index] + if content_block["type"] == "HtmlBlock": + result += BeautifulSoup(markup=content_block["body"], features="html.parser").get_text() + return result diff --git a/data_ingestion/utils/text_utils.py b/data_ingestion/utils/text_utils.py new file mode 100644 index 0000000..948771c --- /dev/null +++ b/data_ingestion/utils/text_utils.py @@ -0,0 +1,12 @@ +from langchain.text_splitter import RecursiveCharacterTextSplitter + + +def chunk_text(text: str, chunk_size: int = 700, chunk_overlap: int = 100) -> list[str]: + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=chunk_size, chunk_overlap=chunk_overlap, separators=["\n\n", "\n", " ", ""] + ) + return text_splitter.split_text(text) + + +def temp_method_for_proof_of_concept_tests(some_number): + return 2 * some_number diff --git a/data_ingestion_tests/utils/__init__.py b/data_ingestion_tests/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/data_ingestion_tests/utils_test.py b/data_ingestion_tests/utils/text_utils_test.py similarity index 70% rename from data_ingestion_tests/utils_test.py rename to data_ingestion_tests/utils/text_utils_test.py index 5f545bd..4bb2c2e 100644 --- a/data_ingestion_tests/utils_test.py +++ b/data_ingestion_tests/utils/text_utils_test.py @@ -1,4 +1,4 @@ -from data_ingestion.utils import temp_method_for_proof_of_concept_tests +from data_ingestion.utils.text_utils import temp_method_for_proof_of_concept_tests def test_that_tests_run():