diff --git a/Sources/ClientRuntime/Networking/Http/CRT/TLSConfiguration.swift b/Sources/ClientRuntime/Networking/Http/CRT/TLSConfiguration.swift new file mode 100644 index 000000000..d4ed04ece --- /dev/null +++ b/Sources/ClientRuntime/Networking/Http/CRT/TLSConfiguration.swift @@ -0,0 +1,29 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +/** + * Configuration settings about TLS set up. + * All settings are optional. + * Not specifying them will use the SDK defaults + */ +public protocol TLSConfiguration { + + // Optional path to a PEM certificate + var certificate: String? { get set } + + // Optional path to certificate directory + var certificateDir: String? { get set } + + // Optional path to a PEM format private key + var privateKey: String? { get set } + + // Optional path to PKCS #12 certificate , in PEM format + var pkcs12Path: String? { get set } + + // Optional PKCS#12 password + var pkcs12Password: String? { get set } +} diff --git a/Sources/ClientRuntime/Networking/Http/HttpClientConfiguration.swift b/Sources/ClientRuntime/Networking/Http/HttpClientConfiguration.swift index cd6debecb..6b267c4ea 100644 --- a/Sources/ClientRuntime/Networking/Http/HttpClientConfiguration.swift +++ b/Sources/ClientRuntime/Networking/Http/HttpClientConfiguration.swift @@ -6,14 +6,22 @@ // import struct Foundation.TimeInterval +import AwsCommonRuntimeKit public class HttpClientConfiguration { - /// The timeout for a request, in seconds. + /// The timeout for establishing a connection, in seconds. /// /// If none is provided, the client will use default values based on the platform. public var connectTimeout: TimeInterval? + /// The timeout for socket, in seconds. + /// Sets maximum time to wait between two data packets. + /// Used to close stale connections that have no activity. + /// + /// Defaults to 60 seconds if no value is provided. + public var socketTimeout: TimeInterval + /// HTTP headers to be submitted with every HTTP request. /// /// If none is provided, defaults to no extra headers. @@ -27,21 +35,33 @@ public class HttpClientConfiguration { /// If none is provided, the default protocol for the operation will be used public var protocolType: ProtocolType? + /// Custom TLS configuration for HTTPS connections. + /// + /// Enables specifying client certificates and trust stores for secure communication. + /// Defaults to system's TLS settings if `nil`. + public var tlsConfiguration: (any TLSConfiguration)? + /// Creates a configuration object for a SDK HTTP client. /// /// Not all configuration settings may be followed by all clients. /// - Parameters: - /// - connectTimeout: The maximum time to wait for a response without receiving any data. + /// - connectTimeout: The maximum time to wait for a connection to be established. + /// - socketTimeout: The maximum time to wait between data packets. /// - defaultHeaders: HTTP headers to be included with every HTTP request. /// Note that certain headers may cause your API request to fail. Defaults to no headers. /// - protocolType: The HTTP scheme (`http` or `https`) to be used for API requests. Defaults to the operation's standard configuration. + /// - tlsConfiguration: Optional custom TLS configuration for HTTPS requests. If `nil`, defaults to a standard configuration. public init( connectTimeout: TimeInterval? = nil, + socketTimeout: TimeInterval = 60.0, protocolType: ProtocolType = .https, - defaultHeaders: Headers = Headers() + defaultHeaders: Headers = Headers(), + tlsConfiguration: (any TLSConfiguration)? = nil ) { + self.socketTimeout = socketTimeout self.protocolType = protocolType self.defaultHeaders = defaultHeaders self.connectTimeout = connectTimeout + self.tlsConfiguration = tlsConfiguration } } diff --git a/Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift b/Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift index df95b5b36..aad956734 100644 --- a/Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift +++ b/Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift @@ -19,75 +19,130 @@ import class Foundation.Timer import struct Foundation.TimeInterval import protocol Foundation.StreamDelegate -/// Reads data from a smithy-swift native `ReadableStream` and streams the data to a Foundation `InputStream`. +/// Reads data from a smithy-swift native `ReadableStream` and streams the data through to a Foundation `InputStream`. +/// +/// A pair of Foundation "bound streams" is created. Data from the `ReadableStream` is transferred into the Foundation +/// `OutputStream` until the `ReadableStream` is closed and all data has been read from it. The Foundation +/// `InputStream` is exposed as a property, and may be used to stream the data to other components. /// /// Used to permit SDK streaming request bodies to be used with `URLSession`-based HTTP requests. class FoundationStreamBridge: NSObject, StreamDelegate { - /// The max number of bytes to buffer internally (and transfer) at any given time. - let bufferSize: Int + /// The max number of bytes to buffer between the `ReadableStream` and the Foundation `OutputStream` + /// at any given time. + let bridgeBufferSize: Int + + /// The max number of bytes to buffer between the Foundation `OutputStream` and the Foundation `InputStream` + /// at any given time. + let boundStreamBufferSize: Int - /// A buffer to hold data that has been read from the ReadableStream but not yet written to the OutputStream. + /// A buffer to hold data that has been read from the `ReadableStream` but not yet written to the + /// Foundation `OutputStream`. At most, it will contain `bridgeBufferSize` bytes. private var buffer: Data /// The `ReadableStream` that will serve as the input to this bridge. /// The bridge will read bytes from this stream and dump them to the Foundation stream - /// pair as they become available. + /// pair as they become available. let readableStream: ReadableStream /// A Foundation stream that will carry the bytes read from the readableStream as they become available. - let inputStream: InputStream + /// + /// May be replaced if needed by calling the `replaceStreams(_:)` method. + var inputStream: InputStream - /// A Foundation `OutputStream` that will read from the `ReadableStream` - private let outputStream: OutputStream + /// A Foundation `OutputStream` that will read from the `ReadableStream`. + /// + /// Will be replaced when `replaceStreams(_:)` is called to replace the input stream. + private var outputStream: OutputStream /// A Logger for logging events. private let logger: LogAgent - /// Actor used to ensure writes are performed in series. - actor WriteCoordinator { + /// Actor used to ensure writes are performed in series, one at a time. + private actor WriteCoordinator { var task: Task? - /// `true` if the readable stream has been found to be empty, `false` otherwise. Will flip to `true` if the readable stream is read, - /// and `nil` is returned. - var readableStreamIsEmpty = false - - /// Sets stream status to indicate the stream is empty. - func setReadableStreamIsEmpty() async { - readableStreamIsEmpty = true - } - /// Creates a new concurrent Task that executes the passed block, ensuring that the previous Task /// finishes before this task starts. /// /// Acts as a sort of "serial queue" of Swift concurrency tasks. /// - Parameter block: The code to be performed in this task. - func perform(_ block: @escaping @Sendable (WriteCoordinator) async throws -> Void) { - self.task = Task { [task] in + func perform(_ block: @escaping @Sendable () async throws -> Void) async throws { + let task = Task { [task] in _ = await task?.result - try await block(self) + try await block() } + self.task = task + _ = try await task.value } } /// Actor used to enforce the order of multiple concurrent stream writes. private let writeCoordinator = WriteCoordinator() - /// A shared serial DispatchQueue to run the stream operations. - /// Performing operations on an async queue allows Swift concurrency tasks to not block. + /// A serial `DispatchQueue` to run the stream operations for the Foundation `OutputStream`. + /// + /// Operations performed on the queue include: + /// - Opening the stream + /// - Closing the stream + /// - Writing to the stream + /// - Receiving `StreamDelegate` callbacks + /// + /// Queue operations are run in the order they are placed on the queue, and only one operation + /// runs at a time (i.e. this is a "serial queue".) private let queue = DispatchQueue(label: "AWSFoundationStreamBridge") + /// `true` if the readable stream has been closed, `false` otherwise. Will be flipped to `true` once the readable stream is read, + /// and `nil` is returned. + /// + /// Access this variable only during a write operation to ensure exclusive access. + private var readableStreamIsClosed = false + // MARK: - init & deinit - /// Creates a stream bridge taking the passed `ReadableStream` as its input + /// Creates a stream bridge taking the passed `ReadableStream` as its input., and exposing a Foundation `InputStream` + /// that may be used for streaming data on to Foundation components. /// /// Data will be buffered in an internal, in-memory buffer. The Foundation `InputStream` that exposes `readableStream` /// is exposed by the `inputStream` property after creation. /// - Parameters: /// - readableStream: The `ReadableStream` that serves as the input to the bridge. - /// - bufferSize: The number of bytes in the in-memory buffer. The buffer is allocated for this size no matter if in use or not. - /// Defaults to 65536 bytes. - init(readableStream: ReadableStream, bufferSize: Int = 65_536, logger: LogAgent) { + /// - bridgeBufferSize: The number of bytes in the in-memory buffer. The buffer is allocated for this size no matter if in use or not. + /// Defaults to 65536 bytes (64 kb). + /// - boundStreamBufferSize: The number of bytes in the buffer between the bound Foundation streams. If `nil`, uses the + /// same size as `bridgeBufferSize`. Defaults to `nil`. Primary use of this parameter is for testing. + /// - logger: A logger that can be used to log stream events. + init( + readableStream: ReadableStream, + bridgeBufferSize: Int = 65_536, + boundStreamBufferSize: Int? = nil, + logger: LogAgent + ) { + self.bridgeBufferSize = bridgeBufferSize + self.boundStreamBufferSize = boundStreamBufferSize ?? bridgeBufferSize + self.buffer = Data(capacity: bridgeBufferSize) + self.readableStream = readableStream + self.logger = logger + (inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: self.boundStreamBufferSize, queue: queue) + } + + func replaceStreams(completion: @escaping (InputStream?) -> Void) async { + // Close the current output stream, since it will accept no more data and is about to + // be replaced. + await close() + + // Replace the bound stream pair with new bound streams. + (inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: boundStreamBufferSize, queue: queue) + + // Call the completion block. When this method is called from `urlSession(_:task:needNewBodyStream:)`, + // the completion block will be that method's completion handler. + completion(inputStream) + + // Re-open the `OutputStream` for writing. + await open() + } + + private static func makeStreams(boundStreamBufferSize: Int, queue: DispatchQueue) -> (InputStream, OutputStream) { var inputStream: InputStream? var outputStream: OutputStream? @@ -95,28 +150,30 @@ class FoundationStreamBridge: NSObject, StreamDelegate { // Data written into the output stream will automatically flow to the inputStream for reading. // The bound streams have a buffer between them of size equal to the buffer held by this bridge. Foundation.Stream.getBoundStreams( - withBufferSize: bufferSize, inputStream: &inputStream, outputStream: &outputStream + withBufferSize: boundStreamBufferSize, + inputStream: &inputStream, + outputStream: &outputStream ) guard let inputStream, let outputStream else { // Fail with fatalError since this is not a failure that would happen in normal operation. fatalError("Get pair of bound streams failed. Please file a bug with AWS SDK for Swift.") } - self.bufferSize = bufferSize - self.buffer = Data(capacity: bufferSize) - self.readableStream = readableStream - self.inputStream = inputStream - self.outputStream = outputStream - self.logger = logger - // The stream is configured to deliver its callbacks on the dispatch queue. + // The Foundation `OutputStream` is configured to deliver its callbacks on the dispatch queue. // This precludes the need for a Thread with RunLoop. + // For safety, all interactions with the output stream will be performed on this queue. CFWriteStreamSetDispatchQueue(outputStream, queue) + + // Return the input & output streams to the caller in a tuple + return (inputStream, outputStream) } // MARK: - Opening & closing - /// Schedule the output stream on the queue for stream callbacks. - /// Do not wait to complete opening before returning. + /// Open the output stream and schedule this bridge to receive stream delegate callbacks. + /// + /// Stream operations are performed on the stream's queue. + /// Stream opening is completed before asynchronous return to the caller. func open() async { await withCheckedContinuation { continuation in queue.async { @@ -127,8 +184,10 @@ class FoundationStreamBridge: NSObject, StreamDelegate { } } - /// Unschedule the output stream on the special stream callback thread. - /// Do not wait to complete closing before returning. + /// Close the output stream and unschedule this bridge from receiving stream delegate callbacks. + /// + /// Stream operations are performed on the stream's queue. + /// Stream closing is completed before asynchronous return to the caller. func close() async { await withCheckedContinuation { continuation in queue.async { @@ -141,50 +200,97 @@ class FoundationStreamBridge: NSObject, StreamDelegate { // MARK: - Writing to bridge - /// Tries to read from the readable stream if possible, then transfer the data to the output stream. + /// Writes buffered data to the output stream. + /// If the buffer is empty, the `ReadableStream` will be read first to replenish the buffer. + /// + /// If the buffer is empty and the readable stream is closed, there is no more data to bridge, and the output stream is closed. private func writeToOutput() async throws { - await writeCoordinator.perform { [self] writeCoordinator in - var data = Data() - if await !writeCoordinator.readableStreamIsEmpty { - if let newData = try await readableStream.readAsync(upToCount: bufferSize) { - data = newData + + // Perform the write on the `WriteCoordinator` to ensure that writes happen in-order + // and one at a time. + // + // Note that it is safe to access `buffer` and `readableStreamIsClosed` instance vars + // from inside the block passed to `perform()` because this is the only place + // these instance vars are accessed, and the code in the `perform()` block runs + // in series with any other calls to `perform()`. + try await writeCoordinator.perform { [self] in + + // If there is no data in the buffer and the `ReadableStream` is still open, + // attempt to read the stream. Otherwise, skip reading the `ReadableStream` and + // write what's in the buffer immediately. + if !readableStreamIsClosed && buffer.isEmpty { + if let newData = try await readableStream.readAsync(upToCount: bridgeBufferSize - buffer.count) { + buffer.append(newData) } else { - await writeCoordinator.setReadableStreamIsEmpty() - await close() + readableStreamIsClosed = true } } - try await writeToOutputStream(data: data) + + // Write the previously buffered data and/or newly read data, if any, to the Foundation `OutputStream`. + // Capture the error from the stream write, if any. + var streamError: Error? + if !buffer.isEmpty { + streamError = await writeToOutputStream() + } + + // If the readable stream has closed and there is no data in the buffer, + // there is nothing left to forward to the output stream, so close it. + if readableStreamIsClosed && buffer.isEmpty { + await close() + } + + // If the output stream write produced an error, throw it now, else just return. + if let streamError { throw streamError } } } - /// Write the passed data to the output stream, using the reserved thread. - private func writeToOutputStream(data: Data) async throws { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + /// Using the output stream's callback queue, write the buffered data to the Foundation `OutputStream`. + /// + /// After writing, remove the written data from the buffer. + /// - Returns: The error resulting from the write to the Foundation `OutputStream`, or `nil` if no error occurred. + private func writeToOutputStream() async -> Error? { + + // Suspend the caller while the write is performed on the Foundation `OutputStream`'s queue. + await withCheckedContinuation { continuation in + + // Perform the write to the Foundation `OutputStream` on its queue. queue.async { [self] in - guard !buffer.isEmpty || !data.isEmpty else { continuation.resume(); return } - buffer.append(data) + + // Write to the output stream. It may not accept all data, so get the number of bytes + // it accepted in `writeCount`. var writeCount = 0 buffer.withUnsafeBytes { bufferPtr in guard let bytePtr = bufferPtr.bindMemory(to: UInt8.self).baseAddress else { return } writeCount = outputStream.write(bytePtr, maxLength: buffer.count) } + + // `writeCount` will be a positive number if bytes were written. + // Remove the written bytes from the front of the buffer. if writeCount > 0 { logger.info("FoundationStreamBridge: wrote \(writeCount) bytes to request body") buffer.removeFirst(writeCount) } - if let error = outputStream.streamError { - continuation.resume(throwing: error) - } else { - continuation.resume() - } + + // Resume the caller now that the write is complete, returning the stream error, if any. + continuation.resume(returning: outputStream.streamError) } } } // MARK: - StreamDelegate protocol - /// The stream places this callback when appropriate. Call will be delivered on the GCD queue for stream callbacks. - /// `.hasSpaceAvailable` prompts this type to query the readable stream for more data. + /// The stream places this callback when an event happens. + /// + /// The `FoundationStreamBridge` sets itself as the delegate of the Foundation `OutputStream` whenever the + /// `OutputStream` is open. Stream callbacks will be delivered on the GCD serial queue. + /// + /// `.hasSpaceAvailable` is the only event where the `FoundationStreamBridge` takes action; in response to + /// this event, the `FoundationStreamBridge` will write data to the `OutputStream`. + /// + /// This method is implemented for the Foundation `StreamDelegate` protocol. + /// - Parameters: + /// - aStream: The stream which experienced the event. + /// - eventCode: A code describing the type of event that happened. @objc func stream(_ aStream: Foundation.Stream, handle eventCode: Foundation.Stream.Event) { switch eventCode { case .openCompleted: @@ -198,7 +304,7 @@ class FoundationStreamBridge: NSObject, StreamDelegate { Task { try await writeToOutput() } case .errorOccurred: logger.info("FoundationStreamBridge: .errorOccurred event") - logger.info("FoundationStreamBridge: Stream error: \(String(describing: aStream.streamError))") + logger.info("FoundationStreamBridge: Stream error: \(aStream.streamError.debugDescription)") case .endEncountered: break default: diff --git a/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift index 9689fd084..9a683cee3 100644 --- a/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift +++ b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift @@ -13,9 +13,10 @@ extension URLSessionConfiguration { public static func from(httpClientConfiguration: HttpClientConfiguration) -> URLSessionConfiguration { let config = URLSessionConfiguration.default - if let connectTimeout = httpClientConfiguration.connectTimeout { - config.timeoutIntervalForRequest = connectTimeout - } + config.timeoutIntervalForRequest = httpClientConfiguration.socketTimeout + config.httpShouldSetCookies = false + config.httpCookieAcceptPolicy = .never + config.httpCookieStorage = nil return config } } diff --git a/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift index 2db4d7065..433409b50 100644 --- a/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift +++ b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift @@ -7,19 +7,26 @@ #if os(iOS) || os(macOS) || os(watchOS) || os(tvOS) || os(visionOS) +import class Foundation.Bundle import class Foundation.InputStream import class Foundation.NSObject import class Foundation.NSRecursiveLock +import var Foundation.NSURLAuthenticationMethodClientCertificate +import var Foundation.NSURLAuthenticationMethodServerTrust +import class Foundation.URLAuthenticationChallenge import struct Foundation.URLComponents +import class Foundation.URLCredential import struct Foundation.URLQueryItem import struct Foundation.URLRequest import class Foundation.URLResponse import class Foundation.HTTPURLResponse +import struct Foundation.TimeInterval import class Foundation.URLSession import class Foundation.URLSessionConfiguration import class Foundation.URLSessionTask import class Foundation.URLSessionDataTask import protocol Foundation.URLSessionDataDelegate +import Security import AwsCommonRuntimeKit /// A client that can be used to make requests to AWS services using `Foundation`'s `URLSession` HTTP client. @@ -46,8 +53,29 @@ public final class URLSessionHTTPClient: HTTPClient { /// The continuation for the asynchronous call that was made to initiate this request. /// /// Once the initial response is received, the continuation is called, and is subsequently set to `nil` so its - /// resources may be deallocated. - var continuation: CheckedContinuation? + /// resources may be deallocated and to prevent it from being resumed twice. + private var continuation: CheckedContinuation? + + /// Returns `true` once the continuation is set to `nil`, which will happen once it has been resumed. + var hasBeenResumed: Bool { continuation == nil } + + /// Resumes the continuation, returning the passed value. + /// + /// Calling this method and/or `resume(throwing:)` more than once has no effect. + /// - Parameter httpResponse: The HTTP response to be asynchronously returned to the caller. + func resume(returning httpResponse: HttpResponse) { + continuation?.resume(returning: httpResponse) + continuation = nil + } + + /// Resumes the continuation, throwing the passed error. + /// + /// Calling this method and/or `resume(returning:)` more than once has no effect. + /// - Parameter error: The error to be asynchronously thrown to the caller. + func resume(throwing error: Error) { + continuation?.resume(throwing: error) + continuation = nil + } /// Any error received during a delegate callback for this request. /// @@ -66,19 +94,23 @@ public final class URLSessionHTTPClient: HTTPClient { self.streamBridge = streamBridge self.continuation = continuation } - } - /// Provides thread-safe associative storage of `Connection`s keyed by their `URLSessionDataTask`. - private final class Storage: @unchecked Sendable { - - /// Ensure all continuations are resumed before deallocation. + /// Ensure continuation is resumed and stream is closed before deallocation. /// /// This should never happen in practice but is being done defensively. deinit { - connections.values.forEach { - $0.continuation?.resume(throwing: URLSessionHTTPClientError.unresumedConnection) + if let continuation { + continuation.resume(throwing: URLSessionHTTPClientError.unresumedConnection) + responseStream.close() + } else { + // This has no effect if the response stream was already closed + responseStream.closeWithError(URLSessionHTTPClientError.unclosedResponseStream) } } + } + + /// Provides thread-safe associative storage of `Connection`s keyed by their `URLSessionDataTask`. + private final class Storage: @unchecked Sendable { /// Lock used to enforce exclusive access to this `Storage` object. private let lock = NSRecursiveLock() @@ -120,8 +152,93 @@ public final class URLSessionHTTPClient: HTTPClient { /// Logger for HTTP-related events. let logger: LogAgent - init(logger: LogAgent) { + /// TLS options + let tlsOptions: URLSessionTLSOptions? + + init(logger: LogAgent, tlsOptions: URLSessionTLSOptions?) { self.logger = logger + self.tlsOptions = tlsOptions + } + + /// Handles server trust challenges by validating against a custom certificate. + func didReceive( + serverTrustChallenge challenge: URLAuthenticationChallenge, + completionHandler: @escaping (URLSession.AuthChallengeDisposition, URLCredential?) -> Void + ) { + guard let tlsOptions = tlsOptions, tlsOptions.useSelfSignedCertificate, + let certFile = tlsOptions.certificate, + let serverTrust = challenge.protectionSpace.serverTrust else { + logger.error( + "Either TLSOptions not set or missing values! Using default trust store." + ) + completionHandler(.performDefaultHandling, nil) + return + } + + guard let customRoot = Bundle.main.certificate(named: certFile) else { + logger.error("Certificate not found! Using default trust store.") + completionHandler(.performDefaultHandling, nil) + return + } + + do { + if try serverTrust.evaluateAllowing(rootCertificates: [customRoot]) { + completionHandler(.useCredential, URLCredential(trust: serverTrust)) + } else { + logger.error("Trust evaluation failed, cancelling authentication challenge.") + completionHandler(.cancelAuthenticationChallenge, nil) + } + } catch { + logger.error("Trust evaluation threw an error: \(error.localizedDescription)") + completionHandler(.cancelAuthenticationChallenge, nil) + } + } + + /// Handles client identity challenges by presenting a client certificate. + func didReceive( + clientIdentityChallenge challenge: URLAuthenticationChallenge, + completionHandler: @escaping (URLSession.AuthChallengeDisposition, URLCredential?) -> Void + ) { + guard let tlsOptions, tlsOptions.useProvidedKeystore, + let keystoreName = tlsOptions.pkcs12Path, + let keystorePasword = tlsOptions.pkcs12Password else { + logger.error( + "Either TLSOptions not set or missing values! Using default keystore." + ) + completionHandler(.performDefaultHandling, nil) + return + } + + guard let identity = Bundle.main.identity(named: keystoreName, password: keystorePasword) else { + logger.error( + "Error accessing keystore! Ensure keystore file exists and password is correct!" + + " Using default keystore." + ) + completionHandler(.performDefaultHandling, nil) + return + } + + completionHandler( + .useCredential, + URLCredential(identity: identity, certificates: nil, persistence: .forSession) + ) + } + + /// The URLSession delegate method where authentication challenges are handled. + func urlSession( + _ session: URLSession, + task: URLSessionTask, + didReceive challenge: URLAuthenticationChallenge, + completionHandler: @escaping (URLSession.AuthChallengeDisposition, URLCredential?) -> Void + ) { + switch challenge.protectionSpace.authenticationMethod { + case NSURLAuthenticationMethodServerTrust: + self.didReceive(serverTrustChallenge: challenge, completionHandler: completionHandler) + case NSURLAuthenticationMethodClientCertificate: + self.didReceive(clientIdentityChallenge: challenge, completionHandler: completionHandler) + default: + completionHandler(.performDefaultHandling, nil) + } } /// Called when the initial response to a HTTP request is received. @@ -130,13 +247,12 @@ public final class URLSessionHTTPClient: HTTPClient { func urlSession( _ session: URLSession, dataTask: URLSessionDataTask, didReceive response: URLResponse ) async -> URLSession.ResponseDisposition { - logger.debug("urlSession(_:dataTask:didReceive:) called") + logger.debug("urlSession(_:dataTask:didReceive response:) called") storage.modify(dataTask) { connection in guard let httpResponse = response as? HTTPURLResponse else { logger.error("Received non-HTTP urlResponse") let error = URLSessionHTTPClientError.responseNotHTTP - connection.continuation?.resume(throwing: error) - connection.continuation = nil + connection.resume(throwing: error) return } let statusCode = HttpStatusCode(rawValue: httpResponse.statusCode) ?? .insufficientStorage @@ -147,19 +263,41 @@ public final class URLSessionHTTPClient: HTTPClient { let headers = Headers(httpHeaders: httpHeaders) let body = ByteStream.stream(connection.responseStream) let response = HttpResponse(headers: headers, body: body, statusCode: statusCode) - connection.continuation?.resume(returning: response) - connection.continuation = nil + connection.resume(returning: response) } return .allow } + /// Called when the task needs a new `InputStream` to continue streaming the request body. + /// + /// The `FoundationStreamBridge` is called and told to replace its bound streams; the new `InputStream` is then passed + /// back through this method's `completionHandler` block. + /// + /// In practice, this seems to get called when multiple requests are made concurrently. + /// - Parameters: + /// - session: The `URLSession` the task belongs to. + /// - task: The `URLSessionTask` that needs a new body stream. + /// - completionHandler: A block to be called with the new `InputStream` when it is ready. + func urlSession( + _ session: URLSession, + task: URLSessionTask, + needNewBodyStream completionHandler: @escaping (InputStream?) -> Void + ) { + storage.modify(task) { connection in + guard let streamBridge = connection.streamBridge else { completionHandler(nil); return } + Task { await streamBridge.replaceStreams(completion: completionHandler) } + } + } + /// Called when response data is received. func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) { - logger.debug("urlSession(_:dataTask:didReceive:) called with \(data.count) bytes") + logger.debug("urlSession(_:dataTask:didReceive data:) called (\(data.count) bytes)") storage.modify(dataTask) { connection in do { try connection.responseStream.write(contentsOf: data) } catch { + // If the response stream errored on write, save the error for later return & + // cancel the HTTP request. connection.error = error dataTask.cancel() } @@ -171,26 +309,41 @@ public final class URLSessionHTTPClient: HTTPClient { /// If the error is returned prior to the initial response, the request fails with an error. /// If the error is returned after the initial response, the error is used to fail the response stream. func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) { - logger.debug("urlSession(_:task:didCompleteWithError:) called. \(error == nil ? "Success" : "Failure")") - if let error { logger.debug(" Error: \(error.localizedDescription)") } + if let error { + logger.error("urlSession(_:task:didCompleteWithError:) failed. Error: \(error.localizedDescription)") + } else { + logger.debug("urlSession(_:task:didCompleteWithError:) called. Success") + } + + // This connection is complete. No further data will be sent, and none will be received. + // Below, we ensure that, successful or not, before disposing of the connection: + // - The continuation has been resumed. + // - The response stream is closed. + // - The stream bridge is closed. + // This ensures that resources are freed and stream readers/writers are continued. storage.modify(task) { connection in if let error = connection.error ?? error { - if let continuation = connection.continuation { - continuation.resume(throwing: error) - connection.continuation = nil - } else { + if connection.hasBeenResumed { connection.responseStream.closeWithError(error) + } else { + connection.resume(throwing: error) + connection.responseStream.close() } } else { + if !connection.hasBeenResumed { + connection.resume(throwing: URLSessionHTTPClientError.closedBeforeResponse) + } connection.responseStream.close() } // Close the stream bridge so that its resources are deallocated - Task { await connection.streamBridge?.close() } - } + Task { + await connection.streamBridge?.close() - // Task is complete & no longer needed. Remove it from storage. - storage.remove(task) + // Task is complete & no longer needed. Remove it from storage. + storage.remove(task) + } + } } } @@ -206,20 +359,44 @@ public final class URLSessionHTTPClient: HTTPClient { /// The logger for this HTTP client. private var logger: LogAgent + /// The TLS options for this HTTP client. + private let tlsConfiguration: URLSessionTLSOptions? + + /// The initial connection timeout for this HTTP client. + let connectionTimeout: TimeInterval + // MARK: - init & deinit /// Creates a new `URLSessionHTTPClient`. /// /// The client is created with its own internal `URLSession`, which is configured with system defaults and with a private delegate for handling /// URL task lifecycle events. - /// - Parameter urlsessionConfiguration: The configuration to use for the client's `URLSession`. - public init(httpClientConfiguration: HttpClientConfiguration) { + /// - Parameters: + /// - httpClientConfiguration: The configuration to use for the client's `URLSession`. + public convenience init( + httpClientConfiguration: HttpClientConfiguration + ) { + self.init(httpClientConfiguration: httpClientConfiguration, sessionType: URLSession.self) + } + + /// Creates a new `URLSessionHTTPClient`. + /// + /// The client is created with its own internal `URLSession`. A mocked subclass may be substituted. + /// - Parameters: + /// - httpClientConfiguration: The configuration to use for the client's `URLSession`. + /// - SessionType: The type for the URLSession to be created. Used for testing. Defaults to `URLSession`. + init( + httpClientConfiguration: HttpClientConfiguration, + sessionType SessionType: URLSession.Type + ) { self.config = httpClientConfiguration self.logger = SwiftLogger(label: "URLSessionHTTPClient") - self.delegate = SessionDelegate(logger: logger) + self.tlsConfiguration = config.tlsConfiguration as? URLSessionTLSOptions + self.delegate = SessionDelegate(logger: logger, tlsOptions: tlsConfiguration) + self.connectionTimeout = httpClientConfiguration.connectTimeout ?? 60.0 var urlsessionConfiguration = URLSessionConfiguration.default urlsessionConfiguration = URLSessionConfiguration.from(httpClientConfiguration: httpClientConfiguration) - self.session = URLSession(configuration: urlsessionConfiguration, delegate: delegate, delegateQueue: nil) + self.session = SessionType.init(configuration: urlsessionConfiguration, delegate: delegate, delegateQueue: nil) } /// On deallocation, finish any in-process tasks before disposing of the `URLSession`. @@ -252,19 +429,27 @@ public final class URLSessionHTTPClient: HTTPClient { // If needed, create a stream bridge that streams data from a SDK stream to a Foundation InputStream // that URLSession can stream its request body from. - let streamBridge = requestStream.map { FoundationStreamBridge(readableStream: $0, bufferSize: 4096, logger: logger) } + // Allow 16kb of in-memory buffer for request body streaming + let streamBridge = requestStream.map { + FoundationStreamBridge(readableStream: $0, bridgeBufferSize: 16_384, logger: logger) + } // Create the request (with a streaming body when needed.) - let urlRequest = self.makeURLRequest(from: request, httpBodyStream: streamBridge?.inputStream) - - // Create the data task and associated connection object, then place them in storage. - let dataTask = session.dataTask(with: urlRequest) - let connection = Connection(streamBridge: streamBridge, continuation: continuation) - delegate.storage.set(connection, for: dataTask) + do { + let urlRequest = try self.makeURLRequest(from: request, httpBodyStream: streamBridge?.inputStream) + // Create the data task and associated connection object, then place them in storage. + let dataTask = session.dataTask(with: urlRequest) + let connection = Connection(streamBridge: streamBridge, continuation: continuation) + delegate.storage.set(connection, for: dataTask) + + // Start the HTTP connection and start streaming the request body data + dataTask.resume() + logger.info("start URLRequest(\(urlRequest.url?.absoluteString ?? "")) called") + Task { await streamBridge?.open() } + } catch { + continuation.resume(throwing: error) + } - // Start the HTTP connection and start streaming the request body data - dataTask.resume() - Task { await streamBridge?.open() } } } @@ -275,7 +460,7 @@ public final class URLSessionHTTPClient: HTTPClient { /// - request: The SDK-native, signed `SdkHttpRequest` ready to be transmitted. /// - httpBodyStream: A Foundation `InputStream` carrying the HTTP body for this request. /// - Returns: A `URLRequest` ready to be transmitted by `URLSession` for this operation. - private func makeURLRequest(from request: SdkHttpRequest, httpBodyStream: InputStream?) -> URLRequest { + private func makeURLRequest(from request: SdkHttpRequest, httpBodyStream: InputStream?) throws -> URLRequest { var components = URLComponents() components.scheme = config.protocolType?.rawValue ?? request.endpoint.protocolType?.rawValue ?? "https" components.host = request.endpoint.host @@ -286,8 +471,8 @@ public final class URLSessionHTTPClient: HTTPClient { Foundation.URLQueryItem(name: $0.name, value: $0.value) } } - guard let url = components.url else { fatalError("Invalid HTTP request. Please file a bug to report this.") } - var urlRequest = URLRequest(url: url) + guard let url = components.url else { throw URLSessionHTTPClientError.incompleteHTTPRequest } + var urlRequest = URLRequest(url: url, timeoutInterval: self.connectionTimeout) urlRequest.httpMethod = request.method.rawValue urlRequest.httpBodyStream = httpBodyStream for header in request.headers.headers + config.defaultHeaders.headers { @@ -312,13 +497,26 @@ public final class URLSessionHTTPClient: HTTPClient { /// Errors that are particular to the URLSession-based Smithy HTTP client. public enum URLSessionHTTPClientError: Error { + /// A URL could not be formed from the `SdkHttpRequest`. + /// Please file a bug with aws-sdk-swift if you experience this error. + case incompleteHTTPRequest + /// A non-HTTP response was returned by the server. /// Please file a bug with aws-sdk-swift if you experience this error. case responseNotHTTP - /// A connection was not ended + /// A HTTP connection was closed before a response could be returned, + /// and there was no Foundation error returned. + /// Please file a bug with aws-sdk-swift if you experience this error. + case closedBeforeResponse + + /// A connection was not ended before disposing the connection. /// Please file a bug with aws-sdk-swift if you experience this error. case unresumedConnection + + /// A response stream was not closed before disposing the connection. + /// Please file a bug with aws-sdk-swift if you experience this error. + case unclosedResponseStream } #endif diff --git a/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionTLSOptions.swift b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionTLSOptions.swift new file mode 100644 index 000000000..a0553632a --- /dev/null +++ b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionTLSOptions.swift @@ -0,0 +1,44 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +public struct URLSessionTLSOptions: TLSConfiguration { + + /// Filename of the turst certificate file in main bundle (.cer) + public var certificate: String? + + /// Not supported for URLSession HTTP Client + public var certificateDir: String? + + /// Not supported for URLSession HTTP Client + public var privateKey: String? + + /// Optional path to PKCS #12 certificate , in PEM format + public var pkcs12Path: String? + + /// Optional PKCS#12 password + public var pkcs12Password: String? + + /// Information is provided to use custom trust store + public var useSelfSignedCertificate: Bool { + return certificate != nil + } + + /// Information is provided to use custom key store + public var useProvidedKeystore: Bool { + return pkcs12Path != nil && pkcs12Password != nil + } + + public init( + certificate: String? = nil, // .cer + pkcs12Path: String? = nil, // .p12 + pkcs12Password: String? = nil + ) { + self.certificate = certificate + self.pkcs12Path = pkcs12Path + self.pkcs12Password = pkcs12Password + } +} diff --git a/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionTLSResolverUtils.swift b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionTLSResolverUtils.swift new file mode 100644 index 000000000..81afe191b --- /dev/null +++ b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionTLSResolverUtils.swift @@ -0,0 +1,80 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +#if os(iOS) || os(macOS) || os(watchOS) || os(tvOS) || os(visionOS) + +import class Foundation.Bundle +import Security + +extension Bundle { + func certificate(named name: String) -> SecCertificate? { + guard let cerURL = self.url(forResource: name, withExtension: "cer"), + let cerData = try? Data(contentsOf: cerURL) else { + return nil + } + return SecCertificateCreateWithData(nil, cerData as CFData) + } + + func identity(named name: String, password: String) -> SecIdentity? { + guard let p12URL = self.url(forResource: name, withExtension: "p12"), + let p12Data = try? Data(contentsOf: p12URL) else { + return nil + } + + let options = [kSecImportExportPassphrase as String: password] as CFDictionary + var items: CFArray? + let status = SecPKCS12Import(p12Data as CFData, options, &items) + + guard status == errSecSuccess, let itemsArray = items as? [[String: AnyObject]], + let firstItem = itemsArray.first, + let identity = firstItem[kSecImportItemIdentity as String] else { + return nil + } + + // Explanation of cross-platform behavior differences: + // - Linux and macOS treat Core Foundation types differently. + // - The `as? SecIdentity` casting causes a compiler error on Apple platforms as the cast is guaranteed. + // - Directly returning `identity` works on Linux but not on macOS due to strict type expectations. + // SwiftLint is temporarily disabled for the next line to allow a force cast, acknowledging the platform-specific behavior. + // swiftlint:disable:next force_cast + return (identity as! SecIdentity) + } +} + +extension SecTrust { + enum TrustEvaluationError: Error { + case evaluationFailed(error: CFError?) + case evaluationIssue(error: String) + } + + /// Evaluates the trust object synchronously and returns a Boolean value indicating whether the trust evaluation succeeded. + func evaluate() throws -> Bool { + var error: CFError? + let evaluationSucceeded = SecTrustEvaluateWithError(self, &error) + guard evaluationSucceeded else { + throw TrustEvaluationError.evaluationFailed(error: error) + } + return evaluationSucceeded + } + + /// Evaluates the trust object allowing custom root certificates, and returns a Boolean value indicating whether the evaluation succeeded. + func evaluateAllowing(rootCertificates: [SecCertificate]) throws -> Bool { + // Set the custom root certificates as trusted anchors. + let status = SecTrustSetAnchorCertificates(self, rootCertificates as CFArray) + guard status == errSecSuccess else { + throw TrustEvaluationError.evaluationIssue(error: "Failed to set anchor certificates!") + } + + // Consider any built-in anchors in the evaluation. + SecTrustSetAnchorCertificatesOnly(self, false) + + // Evaluate the trust object. + return try evaluate() + } +} + +#endif diff --git a/Tests/ClientRuntimeTests/NetworkingTests/URLSession/FoundationStreamBridgeTests.swift b/Tests/ClientRuntimeTests/NetworkingTests/URLSession/FoundationStreamBridgeTests.swift index d8c77670a..3fca1b579 100644 --- a/Tests/ClientRuntimeTests/NetworkingTests/URLSession/FoundationStreamBridgeTests.swift +++ b/Tests/ClientRuntimeTests/NetworkingTests/URLSession/FoundationStreamBridgeTests.swift @@ -5,13 +5,10 @@ // SPDX-License-Identifier: Apache-2.0 // -// Run this unit test on macOS only. -// -// `FoundationStreamBridge` is not usable on Linux because it uses ObjC-interop features. -// -// Unit tests of types that spawn threads are unreliable on Apple-platform simulators. -// Mac tests "run on metal", not a simulator, so they will run reliably. -#if os(macOS) +// `FoundationStreamBridge` is not usable on Linux because it uses ObjC-interop features, +// so this test is disabled there. + +#if os(iOS) || os(macOS) || os(watchOS) || os(tvOS) || os(visionOS) import Foundation import XCTest @@ -22,10 +19,7 @@ class FoundationStreamBridgeTests: XCTestCase { func test_open_streamsAllDataToOutputBuffer() async throws { // The maximum size of input streaming data in the tests - let maxDataSize = 16_384 - - // The max size of the buffer for data being streamed. - let maxBufferSize = 2 * maxDataSize + let maxDataSize = 65_536 // 64 kb // Create & fill a buffer with random bytes, for use in later test setup // Random buffer is reused because creating random data is slow @@ -39,7 +33,8 @@ class FoundationStreamBridgeTests: XCTestCase { // Create a temp buffer we can use to copy the input stream bytes // We are responsible for deallocating it - let tempBuffer = UnsafeMutablePointer.allocate(capacity: maxDataSize) + let tempBufferSize = maxDataSize + let tempBuffer = UnsafeMutablePointer.allocate(capacity: tempBufferSize) defer { tempBuffer.deallocate() } // FoundationStreamBridge is susceptible to spurious bugs due to data races & other @@ -52,15 +47,21 @@ class FoundationStreamBridgeTests: XCTestCase { // Run a test for every possible data size up to the maximum let dataSize = min(run, maxDataSize) - // The buffer may be as small as 1 byte, up to 2x as big as the data size capped by maxBufferSize - let bufferSize = Int.random(in: 1...min(2 * dataSize, maxBufferSize)) + // Sizes of the following buffers within the `FoundationStreamBridge` under test are randomized to + // try and elicit errors in the streaming process. + + // The bridge buffer may be as small as 1 byte, up to 2x as big as the data size + let bridgeBufferSize = Int.random(in: 1...(2 * dataSize)) + + // The bound stream buffer may be as small as 1 byte, up to 2x as big as the data size + let boundStreamBufferSize = Int.random(in: 1...(2 * dataSize)) // Fill a data buffer with dataSize random numbers let originalData = Data(bytes: randomBuffer, count: dataSize) // Create a stream bridge with our original data & open it let bufferedStream = BufferedStream(data: originalData, isClosed: true) - let subject = FoundationStreamBridge(readableStream: bufferedStream, bufferSize: bufferSize, logger: TestLogger()) + let subject = FoundationStreamBridge(readableStream: bufferedStream, bridgeBufferSize: bridgeBufferSize, boundStreamBufferSize: boundStreamBufferSize, logger: TestLogger()) await subject.open() // This will hold the data that is bridged from the ReadableStream to the Foundation InputStream @@ -71,7 +72,7 @@ class FoundationStreamBridgeTests: XCTestCase { while ![.atEnd, .error].contains(subject.inputStream.streamStatus) { // Copy the input stream to the temp buffer. When count is positive, bytes were read - let count = subject.inputStream.read(tempBuffer, maxLength: bufferSize) + let count = subject.inputStream.read(tempBuffer, maxLength: tempBufferSize) if count > 0 { // Add the read bytes onto the bridged data bridgedData.append(tempBuffer, count: count) @@ -89,7 +90,7 @@ class FoundationStreamBridgeTests: XCTestCase { XCTAssertNil(subject.inputStream.streamError, "Stream failed with error: \(subject.inputStream.streamError?.localizedDescription ?? "")") // Verify data was all bridged - XCTAssertEqual(bridgedData, originalData, "Run \(run) failed (dataSize: \(dataSize), bufferSize: \(bufferSize)") + XCTAssertEqual(bridgedData, originalData, "Run \(run) failed (dataSize: \(dataSize), bridgeBufferSize: \(bridgeBufferSize), boundStreamBufferSize: \(boundStreamBufferSize)") } } }