diff --git a/CHANGELOG.md b/CHANGELOG.md index 05ffb5f41..4e52385d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes to the Imperative package will be documented in this file. +## Recent Changes + +- BugFix: Fixed normalization on stream chunk boundaries [#1815](https://github.com/zowe/zowe-cli/issues/1815) + ## `5.18.1` - BugFix: Fixed merging of profile properties in `ProfileInfo.createSession`. [#1008](https://github.com/zowe/imperative/issues/1008) diff --git a/packages/rest/__tests__/client/AbstractRestClient.test.ts b/packages/rest/__tests__/client/AbstractRestClient.test.ts index 72b590239..d777e6f34 100644 --- a/packages/rest/__tests__/client/AbstractRestClient.test.ts +++ b/packages/rest/__tests__/client/AbstractRestClient.test.ts @@ -561,6 +561,46 @@ describe("AbstractRestClient tests", () => { expect(caughtError).toBeUndefined(); }); + it("should not error when streaming normalized data", async () => { + const fakeRequestStream = new PassThrough(); + const emitter = new MockHttpRequestResponse(); + const receivedArray: string[] = []; + jest.spyOn(emitter, "write").mockImplementation((data) => { + receivedArray.push(data.toString()); + }); + const requestFnc = jest.fn((options, callback) => { + ProcessUtils.nextTick(async () => { + const newEmit = new MockHttpRequestResponse(); + callback(newEmit); + await ProcessUtils.nextTick(() => { + newEmit.emit("end"); + }); + }); + return emitter; + }); + (https.request as any) = requestFnc; + let caughtError; + try { + await ProcessUtils.nextTick(() => { + fakeRequestStream.write(Buffer.from("ChunkOne\r", "utf8")); + }); + await ProcessUtils.nextTick(() => { + fakeRequestStream.write(Buffer.from("\nChunkTwo\r", "utf8")); + }); + await ProcessUtils.nextTick(() => { + fakeRequestStream.end(); + }); + await RestClient.putStreamed(new Session({ + hostname: "test", + }), "/resource", [Headers.APPLICATION_JSON], null, fakeRequestStream, false, true); + } catch (error) { + caughtError = error; + } + expect(caughtError).toBeUndefined(); + expect(receivedArray.length).toEqual(3); + expect(receivedArray).toEqual(["ChunkOne", "\nChunkTwo", "\r"]); + }); + it("should return full response when requested", async () => { const emitter = new MockHttpRequestResponse(); const requestFnc = jest.fn((options, callback) => { diff --git a/packages/rest/src/client/AbstractRestClient.ts b/packages/rest/src/client/AbstractRestClient.ts index e2cee7496..612e370db 100644 --- a/packages/rest/src/client/AbstractRestClient.ts +++ b/packages/rest/src/client/AbstractRestClient.ts @@ -335,11 +335,21 @@ export abstract class AbstractRestClient { // if the user requested streaming write of data to the request, // write the data chunk by chunk to the server let bytesUploaded = 0; + let heldByte: string; options.requestStream.on("data", (data: Buffer) => { this.log.debug("Writing data chunk of length %d from requestStream to clientRequest", data.byteLength); if (this.mNormalizeRequestNewlines) { this.log.debug("Normalizing new lines in request chunk to \\n"); - data = Buffer.from(data.toString().replace(/\r?\n/g, "\n")); + let dataString = data.toString(); + if (heldByte != null) { + dataString = heldByte + dataString; + heldByte = undefined; + } + if (dataString.charAt(dataString.length - 1) === "\r") { + heldByte = dataString.charAt(dataString.length - 1); + dataString = dataString.slice(0,-1); + } + data = Buffer.from(dataString.replace(/\r?\n/g, "\n")); } if (this.mTask != null) { bytesUploaded += data.byteLength; @@ -361,6 +371,10 @@ export abstract class AbstractRestClient { })); }); options.requestStream.on("end", () => { + if (heldByte != null) { + clientRequest.write(Buffer.from(heldByte)); + heldByte = undefined; + } this.log.debug("Finished reading requestStream"); // finish the request clientRequest.end();