Skip to content

Commit

Permalink
SQS Support
Browse files Browse the repository at this point in the history
  • Loading branch information
proggeramlug committed Oct 23, 2021
1 parent 01d6c4f commit 13de56d
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 0 deletions.
152 changes: 152 additions & 0 deletions Sources/VaporAWSLambdaRuntime/SQS.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//
// File.swift
//
//
// Created by Ralph Küpper on 10/23/21.
//


import AWSLambdaEvents
import AWSLambdaRuntimeCore
import ExtrasBase64
import NIO
import NIOHTTP1
import Vapor

// MARK: - Handler -

struct SQSHandler: EventLoopLambdaHandler {

typealias In = SQS.Event
typealias Out = SQSResponse

private let application: Application
private let responder: Responder

init(application: Application, responder: Responder) {
self.application = application
print("responder: ", responder)
self.responder = responder
}

public func handle(context: Lambda.Context, event: SQS.Event)
-> EventLoopFuture<SQSResponse>
{
let vaporRequest: Vapor.Request
do {
vaporRequest = try Vapor.Request(req: event, in: context, for: self.application)
} catch {
return context.eventLoop.makeFailedFuture(error)
}

return self.responder.respond(to: vaporRequest).flatMap { SQSResponse.from(response: $0, in: context) }
}
}

// MARK: - Request -

extension Vapor.Request {
private static let bufferAllocator = ByteBufferAllocator()

convenience init(req: SQS.Event, in ctx: Lambda.Context, for application: Application) throws {
let event = req.records.first!
/*var buffer: NIO.ByteBuffer?
switch (req.body, req.isBase64Encoded) {
case (let .some(string), true):
let bytes = try string.base64decoded()
buffer = Vapor.Request.bufferAllocator.buffer(capacity: bytes.count)
buffer!.writeBytes(bytes)
case (let .some(string), false):
buffer = Vapor.Request.bufferAllocator.buffer(capacity: string.utf8.count)
buffer!.writeString(string)
case (.none, _):
break
}
var nioHeaders = NIOHTTP1.HTTPHeaders()
req.headers?.forEach { key, value in
nioHeaders.add(name: key, value: value)
}
/*if let cookies = req., cookies.count > 0 {
nioHeaders.add(name: "Cookie", value: cookies.joined(separator: "; "))
}*/
var url: String = req.path
if req.queryStringParameters.count > 0 {
url += "?"
for key in req.queryStringParameters.keys {
// It leaves an ampersand (&) at the end, but who cares?
url += key + "=" + (req.queryStringParameters[key] ?? "") + "&"
}
}*/
var buffer: NIO.ByteBuffer?
buffer = Vapor.Request.bufferAllocator.buffer(capacity: event.body.utf8.count)
buffer!.writeString(event.body)

let url = "/sqs"

ctx.logger.debug("The constructed URL is: \(url)")

self.init(
application: application,
method: NIOHTTP1.HTTPMethod.POST,
url: Vapor.URI(path: url),
version: HTTPVersion(major: 1, minor: 1),
headers: [:],
collectedBody: buffer,
remoteAddress: nil,
logger: ctx.logger,
on: ctx.eventLoop
)

storage[SQS.Event] = req
}
}

extension SQS.Event: Vapor.StorageKey {
public typealias Value = SQS.Event
}

// MARK: - Response -

struct SQSResponse: Codable {
public var statusCode: HTTPResponseStatus
public var statusDescription: String?
public var headers: HTTPHeaders?
public var multiValueHeaders: HTTPMultiValueHeaders?
public var body: String
public var isBase64Encoded: Bool

public init(
statusCode: HTTPResponseStatus,
statusDescription: String? = nil,
headers: HTTPHeaders? = nil,
multiValueHeaders: HTTPMultiValueHeaders? = nil,
body: String = "",
isBase64Encoded: Bool = false
) {
self.statusCode = statusCode
self.statusDescription = statusDescription
self.headers = headers
self.multiValueHeaders = multiValueHeaders
self.body = body
self.isBase64Encoded = isBase64Encoded
}

static func from(response: Vapor.Response, in context: Lambda.Context) -> EventLoopFuture<SQSResponse> {
// Create the headers
var headers: HTTPHeaders = [:]

// Can we access the body right away?
let string = response.body.string ?? ""
return context.eventLoop.makeSucceededFuture(.init(
statusCode: HTTPResponseStatus.ok,
headers: headers,
body: string,
isBase64Encoded: false
))
}
}
79 changes: 79 additions & 0 deletions Tests/VaporAWSLambdaRuntimeTests/SQSTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//
// File.swift
//
//
// Created by Ralph Küpper on 10/23/21.
//

import AWSLambdaEvents
@testable import AWSLambdaRuntimeCore
import Logging
import NIO
import Vapor
@testable import VaporAWSLambdaRuntime
import XCTest

final class SQSTests: XCTestCase {
func testSQSRequest() throws {
let requestdata = """
{
"Records": [
{
"messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
"receiptHandle": "MessageReceiptHandle",
"body": "Hello from SQS!",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1523232000000",
"SenderId": "123456789012",
"ApproximateFirstReceiveTimestamp": "1523232000001"
},
"messageAttributes": {},
"md5OfBody": "{{{md5_of_body}}}",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue",
"awsRegion": "us-east-1"
}
]
}
"""
let decoder = JSONDecoder()
let request = try decoder.decode(SQS.Event.self, from: requestdata.data(using: .utf8)!)
print("F: ", request)
}

func testCreateALBResponse() {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let eventLoop = eventLoopGroup.next()
let allocator = ByteBufferAllocator()
let logger = Logger(label: "test")

let body = #"{"hello": "world"}"#
let vaporResponse = Vapor.Response(
status: .ok,
headers: HTTPHeaders([
("Content-Type", "application/json"),
]),
body: .init(string: body)
)

let context = Lambda.Context(
requestID: "abc123",
traceID: AmazonHeaders.generateXRayTraceID(),
invokedFunctionARN: "function-arn",
deadline: .now() + .seconds(3),
logger: logger,
eventLoop: eventLoop,
allocator: allocator
)

var response: ALB.TargetGroupResponse?
XCTAssertNoThrow(response = try ALB.TargetGroupResponse.from(response: vaporResponse, in: context).wait())

XCTAssertEqual(response?.body, body)
XCTAssertEqual(response?.headers?.count, 2)
XCTAssertEqual(response?.headers?["Content-Type"], "application/json")
XCTAssertEqual(response?.headers?["content-length"], String(body.count))
}
}

0 comments on commit 13de56d

Please sign in to comment.