From 59441b97a5873c82abc0e36b1c9310968b73e033 Mon Sep 17 00:00:00 2001 From: Matias Vallejos Date: Wed, 8 May 2024 15:49:05 -0300 Subject: [PATCH] Refactoring code making lambda more readable and reusable --- serverless/common/utils/encoders.py | 3 +- serverless/common/utils/error_handler.py | 22 ++++ serverless/common/utils/lambda_utils.py | 5 +- serverless/common/utils/response_utils.py | 15 +++ serverless/serverless.yml | 4 + .../services/files/get_upload_url_s3.py | 74 ++++++------ serverless/services/gpt/ask.py | 106 ++++++------------ serverless/services/health/health_check.py | 19 +--- .../services/memory/add_session_message.py | 84 +++++--------- serverless/services/memory/clear_messages.py | 62 ++++------ serverless/services/memory/delete_session.py | 59 +++++----- serverless/services/memory/get_session.py | 100 ++++++++--------- .../services/memory/get_sessions_user_id.py | 73 +++++------- .../memory/update_session_metadata.py | 67 +++++------ 14 files changed, 306 insertions(+), 387 deletions(-) create mode 100644 serverless/common/utils/error_handler.py create mode 100644 serverless/common/utils/response_utils.py diff --git a/serverless/common/utils/encoders.py b/serverless/common/utils/encoders.py index 3414845..132e9a1 100644 --- a/serverless/common/utils/encoders.py +++ b/serverless/common/utils/encoders.py @@ -1,6 +1,7 @@ import json from decimal import Decimal + # Custom JSON Encoder for handling Decimal types from DynamoDB class DecimalEncoder(json.JSONEncoder): def default(self, obj): @@ -9,4 +10,4 @@ def default(self, obj): return float(obj) else: return int(obj) - return super(DecimalEncoder, self).default(obj) \ No newline at end of file + return super(DecimalEncoder, self).default(obj) diff --git a/serverless/common/utils/error_handler.py b/serverless/common/utils/error_handler.py new file mode 100644 index 0000000..e23e12a --- /dev/null +++ b/serverless/common/utils/error_handler.py @@ -0,0 +1,22 @@ +# error_handlers.py +import json + + +def error_response(message, status_code=400): + return { + "statusCode": status_code, + "headers": { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Headers": "*", + "Content-Type": "application/json", + }, + "body": json.dumps({"message": message}), + } + + +def not_found_error(): + return error_response("The specified resource was not found.", 404) + + +def internal_server_error(): + return error_response("Internal server error.", 500) diff --git a/serverless/common/utils/lambda_utils.py b/serverless/common/utils/lambda_utils.py index d90c568..7a56364 100644 --- a/serverless/common/utils/lambda_utils.py +++ b/serverless/common/utils/lambda_utils.py @@ -1,5 +1,6 @@ import json + def load_body_from_event(event): # Attempt to load JSON from the request body body = event.get("body", "{}") # Default to empty JSON string if body is None @@ -8,9 +9,9 @@ def load_body_from_event(event): body = json.loads(body) except json.JSONDecodeError: body = {} - print("Failed to parse JSON from request body") return body + def load_path_parameter_from_event(event, param_name): # Extract a path parameter from the event - return event.get("pathParameters", {}).get(param_name) \ No newline at end of file + return event.get("pathParameters", {}).get(param_name) diff --git a/serverless/common/utils/response_utils.py b/serverless/common/utils/response_utils.py new file mode 100644 index 0000000..749fd9c --- /dev/null +++ b/serverless/common/utils/response_utils.py @@ -0,0 +1,15 @@ +# response_utils.py +import json +from common.utils.encoders import DecimalEncoder + + +def success_response(data, status_code=200): + return { + "statusCode": status_code, + "headers": { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Headers": "*", + "Content-Type": "application/json", + }, + "body": json.dumps(data, cls=DecimalEncoder), + } diff --git a/serverless/serverless.yml b/serverless/serverless.yml index a1efb0f..be0b562 100644 --- a/serverless/serverless.yml +++ b/serverless/serverless.yml @@ -49,6 +49,7 @@ functions: patterns: - "!**" - services/health/** + - common/** upload_url_s3: handler: services/files/get_upload_url_s3.lambda_handler description: Get expires URL to upload a file to AWS bucket. @@ -64,6 +65,7 @@ functions: patterns: - "!**" - services/files/** + - common/** gpt_ask: handler: services/gpt/ask.lambda_handler description: Given a set of messages ask to chat gpt and respond with a message. @@ -115,6 +117,7 @@ functions: patterns: - "!**" - services/memory/** + - common/** update_session_metadata: handler: services/memory/update_session_metadata.lambda_handler description: Update session metadata like 'title' or 'user_id'. @@ -174,6 +177,7 @@ functions: patterns: - "!**" - services/memory/** + - common/** resources: Resources: diff --git a/serverless/services/files/get_upload_url_s3.py b/serverless/services/files/get_upload_url_s3.py index 588d932..84b5022 100644 --- a/serverless/services/files/get_upload_url_s3.py +++ b/serverless/services/files/get_upload_url_s3.py @@ -1,49 +1,50 @@ import os -import json import datetime import boto3 +from common.utils.error_handler import error_response, internal_server_error +from common.utils.response_utils import success_response + +s3_client = boto3.client("s3") +s3_bucket_name = os.getenv("S3_BUCKET_NAME") -def lambda_handler(event, context): - if not event.get("queryStringParameters"): - return { - "statusCode": 400, - "body": json.dumps({"message": "Query parameters are missing."}), - } - # Extract parameters from the query string - query_params = event["queryStringParameters"] +def parse_and_validate(event): + query_params = event.get("queryStringParameters", {}) s3_key = query_params.get("key") s3_expiresin = query_params.get("expiresIn") - # Check if required parameters are provided - if not s3_key or not s3_expiresin: - return { - "statusCode": 400, - "body": json.dumps( - { - "message": "You must include 'key' and 'expiresIn' in your query request." - } - ), - } + if not s3_key or s3_expiresin is None: + raise ValueError( + "Both 'key' and 'expiresIn' parameters are required in the query." + ) try: - # Ensure expiresIn is an integer s3_expiresin = int(s3_expiresin) except ValueError: - return { - "statusCode": 400, - "body": json.dumps({"message": "expiresIn must be a valid integer."}), - } + raise ValueError("'expiresIn' must be a valid integer.") + + if ( + s3_expiresin <= 0 or s3_expiresin > 86400 + ): # Example: limits expiresIn to 24 hours + raise ValueError("'expiresIn' must be between 1 and 86400 seconds.") - # Get the S3 bucket name from environment variables - s3_bucket_name = os.getenv("S3_BUCKET_NAME") + return s3_key, s3_expiresin - # Generate the presigned URL - s3_client = boto3.client("s3") - url = s3_client.generate_presigned_post( - Bucket=s3_bucket_name, Key=s3_key, ExpiresIn=s3_expiresin - ) + +def lambda_handler(event, context): + try: + s3_key, s3_expiresin = parse_and_validate(event) + except ValueError as e: + return error_response(str(e)) + + try: + # Generate the presigned URL + url = s3_client.generate_presigned_post( + Bucket=s3_bucket_name, Key=s3_key, ExpiresIn=s3_expiresin + ) + except Exception as e: + return internal_server_error() body = { "url": url, @@ -52,13 +53,4 @@ def lambda_handler(event, context): "created_at": datetime.datetime.now().isoformat(), } - response = { - "statusCode": 200, - "headers": { - "Access-Control-Allow-Origin": "*", - "Access-Control-Allow-Headers": "*", - "Content-Type": "application/json", - }, - "body": json.dumps(body), - } - return response + return success_response(body) diff --git a/serverless/services/gpt/ask.py b/serverless/services/gpt/ask.py index 0510b80..fec43bb 100644 --- a/serverless/services/gpt/ask.py +++ b/serverless/services/gpt/ask.py @@ -1,52 +1,40 @@ import os -import json import openai -from typing import List +from typing import List, Dict, Any from .src.models import OpenAIModel + from common.utils.lambda_utils import load_body_from_event +from common.utils.error_handler import error_response, internal_server_error +from common.utils.response_utils import success_response -def lambda_handler(event, context): - if not event.get("body"): - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps( - {"message": "Body parameters are missing. You must include 'messages'."} - ), - } +API_KEY = os.getenv("OPENAI_API_KEY") +MODEL_ENGINE = os.getenv("OPENAI_GPTMODEL") +TEMPERATURE = int(os.getenv("OPENAI_TEMPERATURE", 0)) +MAX_TOKENS = int(os.getenv("OPENAI_TOKENS", 0)) - body = load_body_from_event(event) - messages = body.get("messages") - if len(messages) == 0: - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "Body 'messages' must not be empty."}), - } +def parse_and_validate(event: Dict[str, Any]) -> List[Dict[str, str]]: + messages = load_body_from_event(event).get("messages") - if isinstance(messages, List) is False: - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "Body 'messages' must be a list."}), - } + if not messages or len(messages) == 0: + raise ValueError("Body 'messages' must not be empty.") + + if not isinstance(messages, list): + raise ValueError("Body 'messages' must be a list.") for msg in messages: if not msg.get("content") or not msg.get("role"): - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps( - { - "message": "Messages should not be empty. It should have 'role' and 'content' keys." - } - ), - } + raise ValueError( + "Messages should not be empty. It should have 'role' and 'content' keys." + ) + return messages + +def lambda_handler(event, context): try: + messages = parse_and_validate(event) model = OpenAIModel( api_key=os.getenv("OPENAI_API_KEY"), model_engine=os.getenv("OPENAI_GPTMODEL"), @@ -56,43 +44,23 @@ def lambda_handler(event, context): res = model.chat_completion(messages) if res.get("status") != "success": - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"role": res["role"], "content": res["content"]}), - } + error_content = res["content"] + return error_response(error_content) else: messages.append({"role": res["role"], "content": res["content"]}) - return { - "statusCode": 200, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps( - { - "context": model.__str__(), - "last_message": { - "role": res["role"], - "content": res["content"], - }, - "memory": messages, - "memory_count": len(messages), - } - ), + body = { + "context": str(model), + "last_message": { + "role": res["role"], + "content": res["content"], + }, + "memory": messages, + "memory_count": len(messages), } - except ValueError as ve: - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"role": ve["role"], "content": ve["content"]}), - } + return success_response(body) + except ValueError as e: + return error_response(str(e)) except openai.OpenAIError as e: - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"role": ve["role"], "content": ve["content"]}), - } + return error_response(str(e)) except Exception as e: - return { - "statusCode": 500, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "Internal Server Error."}), - } + return internal_server_error() diff --git a/serverless/services/health/health_check.py b/serverless/services/health/health_check.py index cfaa751..77601cc 100644 --- a/serverless/services/health/health_check.py +++ b/serverless/services/health/health_check.py @@ -1,26 +1,11 @@ -import json +from common.utils.response_utils import success_response def lambda_handler(event, context): - """Health check function to test if the Lambda function is running. - - Args: - event (obj): Event data passed to the function. - context (obj): Runtime information provided by AWS Lambda. - - Returns: - json: A JSON object containing the health check message. - """ body = { "message": "Health check successful", "region": context.invoked_function_arn.split(":")[3], "function_name": context.function_name, } - return { - "statusCode": 200, - "headers": { - "Content-Type": "application/json", - }, - "body": json.dumps(body), - } + return success_response(body) \ No newline at end of file diff --git a/serverless/services/memory/add_session_message.py b/serverless/services/memory/add_session_message.py index 2c9e206..8067fc1 100644 --- a/serverless/services/memory/add_session_message.py +++ b/serverless/services/memory/add_session_message.py @@ -1,62 +1,49 @@ import os -import json import boto3 import datetime from boto3.dynamodb.conditions import Key, Attr -from common.utils.lambda_utils import load_body_from_event, load_path_parameter_from_event +from common.utils.lambda_utils import ( + load_body_from_event, + load_path_parameter_from_event, +) +from common.utils.error_handler import error_response, internal_server_error +from common.utils.response_utils import success_response +table_name = os.getenv("MEMORY_TABLE_NAME") +memory_table = boto3.resource("dynamodb").Table(table_name) -def lambda_handler(event, context): - # Extract path parameters + +def parse_and_validate(event): session_id = load_path_parameter_from_event(event, "session_id") if not session_id: - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "session_id is required"}), - } - + raise ValueError("session_id is required") session_id = f"SESSION#{session_id}" - body = load_body_from_event(event) - # Check if the message fields are present in the request body + body = load_body_from_event(event) message = body.get("message") user_id = body.get("user_id") if not message or not user_id: - return { - "statusCode": 400, - "body": json.dumps({"message": "'message' and 'user_id' is required."}), - } + raise ValueError("'message' and 'user_id' is required.") - # Check the format of the message field - if "role" not in message or "content" not in message: - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps( - {"message": "Both role and content are required in the message"} - ), - } + if not message.get("role") or not message.get("content"): + raise ValueError("Both 'role' and 'content' are required in the message") + return session_id, message, user_id - # Prepare dynamodb resource and table - dynamodb = boto3.resource("dynamodb") - memory_table = dynamodb.Table(os.getenv("MEMORY_TABLE_NAME")) - # Generate a unique message ID, e.g., based on the current timestamp - message_id = f"MESSAGE_{datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')}" +def lambda_handler(event, context): + try: + session_id, message, user_id = parse_and_validate(event) + message_id = f"MESSAGE_{datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')}" - # Prepare the item to be inserted into the table - item = { - "pk": session_id, # Use the session_id as the partition key "pk - "sk": message_id, - "message": {"role": message["role"], "content": message["content"]}, - "created_at": datetime.datetime.now().isoformat(), - } + item = { + "pk": session_id, + "sk": message_id, + "message": {"role": message["role"], "content": message["content"]}, + "created_at": datetime.datetime.now().isoformat(), + } - try: - # Check if the session exists if not create a new session metadata_response = memory_table.query( KeyConditionExpression=Key("pk").eq(session_id) & Key("sk").eq("METADATA"), FilterExpression=Attr("is_deleted").eq(False) & Attr("is_deleted").exists(), @@ -72,20 +59,9 @@ def lambda_handler(event, context): } memory_table.put_item(Item=metadata_item) - # Insert the item into DynamoDB memory_table.put_item(Item=item) + return success_response(item) + except ValueError as e: + return error_response(str(e)) except Exception as e: - return { - "statusCode": 500, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps( - {"message": "Error querying session metadata", "error": str(e)} - ), - } - - # Return success response - return { - "statusCode": 200, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "Message added successfully!", "item": item}), - } + return internal_server_error() diff --git a/serverless/services/memory/clear_messages.py b/serverless/services/memory/clear_messages.py index 17ddc82..8eae4b1 100644 --- a/serverless/services/memory/clear_messages.py +++ b/serverless/services/memory/clear_messages.py @@ -3,53 +3,39 @@ import boto3 from boto3.dynamodb.conditions import Key +from common.utils.lambda_utils import load_path_parameter_from_event +from common.utils.error_handler import error_response, internal_server_error +from common.utils.response_utils import success_response + +table_name = os.getenv("MEMORY_TABLE_NAME") +memory_table = boto3.resource("dynamodb").Table(table_name) -def lambda_handler(event, context): - # Extract path parameters - session_id = event.get("pathParameters", {}).get("session_id") - if not session_id: - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "session_id is required"}), - } +def parse_and_validate(event): + session_id = load_path_parameter_from_event(event, "session_id") + if not session_id: + raise ValueError("session_id is required") session_id = f"SESSION#{session_id}" - dynamodb = boto3.resource("dynamodb") - table = dynamodb.Table(os.getenv("MEMORY_TABLE_NAME")) + return session_id + - # Step 1: Query to find all messages for the session +def lambda_handler(event, context): try: - response = table.query( + session_id = parse_and_validate(event) + + response = memory_table.query( KeyConditionExpression=Key("pk").eq(session_id) & Key("sk").begins_with("MESSAGE_") ) messages = response.get("Items", []) - except Exception as e: - return { - "statusCode": 500, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps( - {"message": "Failed to query messages", "error": str(e)} - ), - } - - # Step 2: Delete each message item - try: - with table.batch_writer() as batch: + + with memory_table.batch_writer() as batch: for message in messages: batch.delete_item(Key={"pk": message["pk"], "sk": message["sk"]}) + + body = {"message": "All messages deleted successfully!"} + return success_response(body) + except ValueError as e: + return error_response(str(e)) except Exception as e: - return { - "statusCode": 500, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps( - {"message": "Failed to delete messages", "error": str(e)} - ), - } - - return { - "statusCode": 200, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "All messages deleted successfully"}), - } + return internal_server_error() diff --git a/serverless/services/memory/delete_session.py b/serverless/services/memory/delete_session.py index 785f8f4..85bb0db 100644 --- a/serverless/services/memory/delete_session.py +++ b/serverless/services/memory/delete_session.py @@ -4,23 +4,30 @@ from boto3.dynamodb.conditions import Key, Attr +from common.utils.lambda_utils import load_path_parameter_from_event +from common.utils.error_handler import ( + error_response, + internal_server_error, + not_found_error, +) +from common.utils.response_utils import success_response -def lambda_handler(event, context): - session_id = event.get("pathParameters", {}).get("session_id") - if not session_id: - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "session_id is required"}), - } +table_name = os.getenv("MEMORY_TABLE_NAME") +memory_table = boto3.resource("dynamodb").Table(table_name) - # Initialize variables + +def parse_and_validate(event): + session_id = load_path_parameter_from_event(event, "session_id") + if not session_id: + raise ValueError("session_id is required") session_id = f"SESSION#{session_id}" - table_name = os.getenv("MEMORY_TABLE_NAME") - memory_table = boto3.resource("dynamodb").Table(table_name) + return session_id - # Delete it updating the is_deleted attribute + +def lambda_handler(event, context): try: + session_id = parse_and_validate(event) + metadata_response = memory_table.query( KeyConditionExpression=Key("pk").eq(session_id) & Key("sk").eq("METADATA"), FilterExpression=Attr("is_deleted").eq(False) & Attr("is_deleted").exists(), @@ -28,11 +35,7 @@ def lambda_handler(event, context): metadata_items = metadata_response.get("Items", []) if not metadata_items: - return { - "statusCode": 404, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "Session not found"}), - } + return not_found_error() res = memory_table.update_item( Key={"pk": session_id, "sk": "METADATA"}, @@ -40,17 +43,13 @@ def lambda_handler(event, context): ExpressionAttributeValues={":val": True}, ) if res["ResponseMetadata"]["HTTPStatusCode"] != 200: - raise Exception("Error deleting session!") + raise boto3.exceptions.Boto3Error("Failed to delete session!") + else: + body = {"message": "Session deleted successfully"} + return success_response(body) + except boto3.exceptions.Boto3Error as e: + return error_response(str(e)) + except ValueError as e: + return error_response(str(e)) except Exception as e: - return { - "statusCode": 500, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps( - {"message": "Failed to delete session", "error": str(e)} - ), - } - return { - "statusCode": 200, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "Session deleted successfully"}), - } + return internal_server_error() diff --git a/serverless/services/memory/get_session.py b/serverless/services/memory/get_session.py index a5dc8c0..7d04431 100644 --- a/serverless/services/memory/get_session.py +++ b/serverless/services/memory/get_session.py @@ -1,38 +1,46 @@ -from importlib import metadata import os -import json + import boto3 from boto3.dynamodb.conditions import Key, Attr -from decimal import Decimal +from common.utils.lambda_utils import load_path_parameter_from_event +from common.utils.error_handler import ( + error_response, + internal_server_error, + not_found_error, +) +from common.utils.response_utils import success_response -class DecimalEncoder(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, Decimal): - if obj % 1 > 0: - return float(obj) - else: - return int(obj) - return super(DecimalEncoder, self).default(obj) +table_name = os.getenv("MEMORY_TABLE_NAME") +memory_table = boto3.resource("dynamodb").Table(table_name) -def lambda_handler(event, context): - # Extracting user_id from pathParameters safely - session_id = event.get("pathParameters", {}).get("session_id") +def parse_and_validate(event): + session_id = load_path_parameter_from_event(event, "session_id") if not session_id: - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "session_id is required"}), - } + raise ValueError("session_id is required") session_id = f"SESSION#{session_id}" + return session_id + + +def clean_messages_and_metadata_items(message_items, metadata_items): + # Extract metadata while filtering out the 'pk' key + metadata = ( + {key: val for key, val in metadata_items[0].items() if key != "pk"} + if metadata_items + else {} + ) + + # Extract messages focusing only on the content inside the 'message' dictionary + messages = [msg["message"] for msg in message_items if "message" in msg] + + return messages, metadata - # Initialize DynamoDB resource - dynamodb = boto3.resource("dynamodb") - memory_table = dynamodb.Table(os.getenv("MEMORY_TABLE_NAME")) +def lambda_handler(event, context): try: - # Query DynamoDB for metadata + session_id = parse_and_validate(event) + metadata_response = memory_table.query( KeyConditionExpression=Key("pk").eq(session_id) & Key("sk").eq("METADATA"), FilterExpression=Attr("is_deleted").eq(False) & Attr("is_deleted").exists(), @@ -40,45 +48,25 @@ def lambda_handler(event, context): metadata_items = metadata_response.get("Items", []) if not metadata_items: - return { - "statusCode": 404, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "Session not found"}), - } + return not_found_error() - # Query DynamoDB for messages message_response = memory_table.query( KeyConditionExpression=Key("pk").eq(session_id) & Key("sk").begins_with("MESSAGE_") ) message_items = message_response.get("Items", []) - # perform metadata cleanup - metadata = ( - {key: val for key, val in metadata_items[0].items() if key != "pk"} - if metadata_items - else {} + messages, metadata = clean_messages_and_metadata_items( + message_items, metadata_items ) - messages = [ - {key: val for key, val in msg.items() if key == "message" or key == "sk"} - for msg in message_items - ] - except Exception as e: - # Handle possible exceptions during the query operation - return { - "statusCode": 500, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": str(e)}), - } - # Return the query results as JSON - response_body = { - "session_id": session_id, - "metadata": metadata, - "messages": messages, - } - return { - "statusCode": 200, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps(response_body, cls=DecimalEncoder), - } + body = { + "session_id": session_id, + "metadata": metadata, + "messages": messages, + } + return success_response(body) + except ValueError as e: + return error_response(str(e)) + except Exception as e: + return internal_server_error() diff --git a/serverless/services/memory/get_sessions_user_id.py b/serverless/services/memory/get_sessions_user_id.py index 24e9bc9..1e182e2 100644 --- a/serverless/services/memory/get_sessions_user_id.py +++ b/serverless/services/memory/get_sessions_user_id.py @@ -1,25 +1,37 @@ import os -import json import boto3 from boto3.dynamodb.conditions import Attr, Key -from common.utils.encoders import DecimalEncoder +from common.utils.lambda_utils import load_path_parameter_from_event +from common.utils.error_handler import ( + error_response, + internal_server_error, + not_found_error, +) +from common.utils.response_utils import success_response -def lambda_handler(event, context): +table_name = os.getenv("MEMORY_TABLE_NAME") +memory_table = boto3.resource("dynamodb").Table(table_name) + + +def parse_and_validate(event): user_id = int(event.get("pathParameters", {}).get("user_id")) if not user_id: - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "session_id is required"}), - } + raise ValueError("user_id is required") + user_id = int(user_id) + return user_id + + +def clean_messages_items(message_items): + messages = [msg["message"] for msg in message_items if "message" in msg] + return messages - table_name = os.getenv("MEMORY_TABLE_NAME") - memory_table = boto3.resource("dynamodb").Table(table_name) +def lambda_handler(event, context): try: - # Scan the table for all sessions where is_deleted is false + user_id = parse_and_validate(event) + response = memory_table.scan( FilterExpression=Attr("is_deleted").eq(False) & Attr("sk").eq("METADATA") @@ -27,15 +39,9 @@ def lambda_handler(event, context): ) sessions_items = response.get("Items", []) - # If no sessions are found, return a 404 if not sessions_items or len(sessions_items) == 0: - return { - "statusCode": 404, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "No sessions found for this user"}), - } + return not_found_error() - # Optionally append messages for each session if needed for session in sessions_items: session_id = session["pk"] session.pop("user_id", None) @@ -44,14 +50,7 @@ def lambda_handler(event, context): & Key("sk").begins_with("MESSAGE_") ) messages_items = messages_response.get("Items", []) - messages = [ - { - key: val - for key, val in msg.items() - if key == "message" or key == "sk" - } - for msg in messages_items - ] + messages = clean_messages_items(messages_items) session["messages"] = messages data = ( @@ -63,21 +62,9 @@ def lambda_handler(event, context): if sessions_items else {} ) + body = {"user_id": user_id, "sessions": data} + return success_response(body) + except ValueError as e: + return error_response(str(e)) except Exception as e: - return { - "statusCode": 500, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "Error fetching sessions: " + str(e)}), - } - - # Format the response body with the sessions data - response_body = json.dumps( - {"user_id": user_id, "sessions": data, "session_count": len(data)}, - cls=DecimalEncoder, - ) - - return { - "statusCode": 200, - "headers": {"Content-Type": "application/json"}, - "body": response_body, - } + return internal_server_error() diff --git a/serverless/services/memory/update_session_metadata.py b/serverless/services/memory/update_session_metadata.py index 3134713..8ecbf69 100644 --- a/serverless/services/memory/update_session_metadata.py +++ b/serverless/services/memory/update_session_metadata.py @@ -1,63 +1,58 @@ import os -import json import boto3 from boto3.dynamodb.conditions import Key from common.utils.lambda_utils import load_body_from_event +from common.utils.lambda_utils import load_path_parameter_from_event +from common.utils.error_handler import error_response, internal_server_error +from common.utils.response_utils import success_response + +table_name = os.getenv("MEMORY_TABLE_NAME") +memory_table = boto3.resource("dynamodb").Table(table_name) -def lambda_handler(event, context): - # Extract session_id from the request and validate - session_id = event.get("pathParameters", {}).get("session_id") - if not session_id: - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "session_id is required"}), - } - # Initialize the DynamoDB resource - dynamodb = boto3.resource("dynamodb") - memory_table = dynamodb.Table(os.getenv("MEMORY_TABLE_NAME")) +def parse_and_validate(event): + session_id = load_path_parameter_from_event(event, "session_id") + if not session_id: + raise ValueError("session_id is required") + session_id = f"SESSION#{session_id}" - # Load body from the event body = load_body_from_event(event) if not body: - return { - "statusCode": 400, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"message": "Body is required"}), - } + raise ValueError("Body is required") + return session_id, body - session_id = f"SESSION#{session_id}" - # Construct the update expression and attribute values +def build_update_expression(body): update_expression = "SET " expression_attribute_values = {} for key, value in body.items(): update_expression += f"{key} = :{key}, " expression_attribute_values[f":{key}"] = value update_expression = update_expression.rstrip(", ") + return update_expression, expression_attribute_values + +def lambda_handler(event, context): try: - response = memory_table.update_item( + session_id, body = parse_and_validate(event) + update_expression, expression_attribute_values = build_update_expression(body) + + res = memory_table.update_item( Key={"pk": session_id, "sk": "METADATA"}, UpdateExpression=update_expression, ExpressionAttributeValues=expression_attribute_values, ReturnValues="UPDATED_NEW", ) + if res["ResponseMetadata"]["HTTPStatusCode"] != 200: + raise boto3.exceptions.Boto3Error("Failed to update session metadata!") + + body = {"message": "Session metadata updated successfully"} + return success_response(body) + except ValueError as e: + return error_response(str(e)) + except boto3.exceptions.Boto3Error as e: + return error_response(str(e)) except Exception as e: - return { - "statusCode": 500, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps( - {"message": "Error updating session metadata", "error": str(e)} - ), - } - - response = {"message": "Session metadata updated successfully"} - return { - "statusCode": 200, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps(response), - } + return internal_server_error()