Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Make the ReadRows service in tests more modular #1462

Merged
merged 10 commits into from
Sep 23, 2024
216 changes: 125 additions & 91 deletions test/readrows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,50 @@
// limitations under the License.

import {before, describe, it} from 'mocha';
import {Bigtable, protos, Row, Table} from '../src';
import {Bigtable, Row, Table} from '../src';
import * as assert from 'assert';
import {Transform, PassThrough, pipeline} from 'stream';

import {GoogleError} from 'google-gax';
import {MockServer} from '../src/util/mock-servers/mock-server';
import {BigtableClientMockService} from '../src/util/mock-servers/service-implementations/bigtable-client-mock-service';
import {MockService} from '../src/util/mock-servers/mock-service';
import {debugLog, readRowsImpl} from './utils/readRowsImpl';
import {ServerWritableStream, UntypedHandleCall} from '@grpc/grpc-js';
import {readRowsImpl2} from './utils/readRowsImpl2';
import {ReadRowsImpl} from './utils/readRowsImpl';

import {
ReadRowsServiceParameters,
ReadRowsWritableStream,
} from '../test/utils/readRowsServiceParameters';
import * as mocha from 'mocha';

const DEBUG = process.env.BIGTABLE_TEST_DEBUG === 'true';

function debugLog(text: string) {
if (DEBUG) {
console.log(text);
}
}

// Define parameters for a standard Bigtable Mock service
const VALUE_SIZE = 1024 * 1024;
// we want each row to be split into 2 chunks of different sizes
const CHUNK_SIZE = 1023 * 1024 - 1;
const CHUNKS_PER_RESPONSE = 10;
const STANDARD_KEY_FROM = 0;
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const STANDARD_KEY_TO = 1000;
const STANDARD_SERVICE_WITHOUT_ERRORS: ReadRowsServiceParameters = {
keyFrom: STANDARD_KEY_FROM,
keyTo: STANDARD_KEY_TO,
valueSize: VALUE_SIZE,
chunkSize: CHUNK_SIZE,
chunksPerResponse: CHUNKS_PER_RESPONSE,
debugLog,
};

type PromiseVoid = Promise<void>;
interface ServerImplementationInterface {
(
server: ServerWritableStream<
protos.google.bigtable.v2.IReadRowsRequest,
protos.google.bigtable.v2.IReadRowsResponse
>
): PromiseVoid;
(server: ReadRowsWritableStream): PromiseVoid;
}

describe('Bigtable/ReadRows', () => {
Expand All @@ -53,15 +77,21 @@ describe('Bigtable/ReadRows', () => {
service = new BigtableClientMockService(server);
});

it('should create read stream and read synchronously', function (done) {
this.timeout(60000);
// helper function because some tests run slower
// on Windows and need a longer timeout
function setWindowsTestTimeout(test: mocha.Context) {
if (process.platform === 'win32') {
test.timeout(60000); // it runs much slower on Windows!
}
}

// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = 0;
const keyTo = 1000;
it('should create read stream and read synchronously', function (done) {
setWindowsTestTimeout(this);

service.setService({
ReadRows: readRowsImpl(keyFrom, keyTo) as any,
ReadRows: ReadRowsImpl.createService(
STANDARD_SERVICE_WITHOUT_ERRORS
) as ServerImplementationInterface,
});

let receivedRowCount = 0;
Expand All @@ -81,19 +111,17 @@ describe('Bigtable/ReadRows', () => {
debugLog(`received row key ${key}`);
});
readStream.on('end', () => {
assert.strictEqual(receivedRowCount, keyTo - keyFrom);
assert.strictEqual(lastKeyReceived, keyTo - 1);
assert.strictEqual(receivedRowCount, STANDARD_KEY_TO - STANDARD_KEY_FROM);
assert.strictEqual(lastKeyReceived, STANDARD_KEY_TO - 1);
done();
});
});

it('should create read stream and read synchronously using Transform stream', done => {
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = 0;
const keyTo = 1000;

service.setService({
ReadRows: readRowsImpl(keyFrom, keyTo) as any,
ReadRows: ReadRowsImpl.createService(
STANDARD_SERVICE_WITHOUT_ERRORS
) as ServerImplementationInterface,
});

let receivedRowCount = 0;
Expand Down Expand Up @@ -128,25 +156,20 @@ describe('Bigtable/ReadRows', () => {
debugLog(`received row key ${key}`);
});
passThrough.on('end', () => {
assert.strictEqual(receivedRowCount, keyTo - keyFrom);
assert.strictEqual(lastKeyReceived, keyTo - 1);
assert.strictEqual(receivedRowCount, STANDARD_KEY_TO - STANDARD_KEY_FROM);
assert.strictEqual(lastKeyReceived, STANDARD_KEY_TO - 1);
done();
});

pipeline(readStream, transform, passThrough, () => {});
});

it('should create read stream and read asynchronously using Transform stream', function (done) {
if (process.platform === 'win32') {
this.timeout(60000); // it runs much slower on Windows!
}

// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = 0;
const keyTo = 1000;

setWindowsTestTimeout(this);
service.setService({
ReadRows: readRowsImpl(keyFrom, keyTo) as any,
ReadRows: ReadRowsImpl.createService(
STANDARD_SERVICE_WITHOUT_ERRORS
) as ServerImplementationInterface,
});

let receivedRowCount = 0;
Expand Down Expand Up @@ -183,23 +206,22 @@ describe('Bigtable/ReadRows', () => {
debugLog(`received row key ${key}`);
});
passThrough.on('end', () => {
assert.strictEqual(receivedRowCount, keyTo - keyFrom);
assert.strictEqual(lastKeyReceived, keyTo - 1);
assert.strictEqual(receivedRowCount, STANDARD_KEY_TO - STANDARD_KEY_FROM);
assert.strictEqual(lastKeyReceived, STANDARD_KEY_TO - 1);
done();
});

pipeline(readStream, transform, passThrough, () => {});
});

it('should be able to stop reading from the read stream', done => {
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = 0;
const keyTo = 1000;
// pick any key to stop after
const stopAfter = 42;

service.setService({
ReadRows: readRowsImpl(keyFrom, keyTo) as any,
ReadRows: ReadRowsImpl.createService(
STANDARD_SERVICE_WITHOUT_ERRORS
) as ServerImplementationInterface,
});

let receivedRowCount = 0;
Expand Down Expand Up @@ -232,18 +254,14 @@ describe('Bigtable/ReadRows', () => {

// TODO: enable after https://github.com/googleapis/nodejs-bigtable/issues/1286 is fixed
it('should be able to stop reading from the read stream when reading asynchronously', function (done) {
if (process.platform === 'win32') {
this.timeout(600000); // it runs much slower on Windows!
}

// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = 0;
const keyTo = 1000;
setWindowsTestTimeout(this);
// pick any key to stop after
const stopAfter = 420;

service.setService({
ReadRows: readRowsImpl(keyFrom, keyTo) as any,
ReadRows: ReadRowsImpl.createService(
STANDARD_SERVICE_WITHOUT_ERRORS
) as ServerImplementationInterface,
});

let receivedRowCount = 0;
Expand Down Expand Up @@ -294,61 +312,77 @@ describe('Bigtable/ReadRows', () => {
pipeline(readStream, transform, passThrough, () => {});
});

it('should silently resume after server or network error', done => {
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = 0;
const keyTo = 1000;
// the server will error after sending this chunk (not row)
const errorAfterChunkNo = 423;

service.setService({
ReadRows: readRowsImpl(keyFrom, keyTo, errorAfterChunkNo) as any,
});

let receivedRowCount = 0;
let lastKeyReceived: number | undefined;

const readStream = table.createReadStream();
readStream.on('error', (err: GoogleError) => {
done(err);
});
readStream.on('data', (row: Row) => {
++receivedRowCount;
const key = parseInt(row.id);
if (lastKeyReceived && key <= lastKeyReceived) {
done(new Error('Test error: keys are not in order'));
}
lastKeyReceived = key;
debugLog(`received row key ${key}`);
describe('should silently resume after server or network error', () => {
function runTest(done: Mocha.Done, errorAfterChunkNo: number) {
service.setService({
ReadRows: ReadRowsImpl.createService({
keyFrom: STANDARD_KEY_FROM,
keyTo: STANDARD_KEY_TO,
valueSize: VALUE_SIZE,
chunkSize: CHUNK_SIZE,
chunksPerResponse: CHUNKS_PER_RESPONSE,
errorAfterChunkNo,
debugLog,
}) as ServerImplementationInterface,
});
let receivedRowCount = 0;
let lastKeyReceived: number | undefined;

const readStream = table.createReadStream();
readStream.on('error', (err: GoogleError) => {
done(err);
});
readStream.on('data', (row: Row) => {
++receivedRowCount;
const key = parseInt(row.id);
if (lastKeyReceived && key <= lastKeyReceived) {
done(new Error('Test error: keys are not in order'));
}
lastKeyReceived = key;
debugLog(`received row key ${key}`);
});
readStream.on('end', () => {
assert.strictEqual(
receivedRowCount,
STANDARD_KEY_TO - STANDARD_KEY_FROM
);
assert.strictEqual(lastKeyReceived, STANDARD_KEY_TO - 1);
done();
});
}
it('with an error at a fixed position', function (done) {
setWindowsTestTimeout(this);
// Emits an error after enough chunks have been pushed to create back pressure
runTest(done, 423);
});
readStream.on('end', () => {
assert.strictEqual(receivedRowCount, keyTo - keyFrom);
assert.strictEqual(lastKeyReceived, keyTo - 1);
done();
it('with an error at a random position', function (done) {
this.timeout(200000);
// Emits an error after a random number of chunks.
const errorAfterChunkNo = Math.floor(Math.random() * 1000);
runTest(done, errorAfterChunkNo);
});
});

it('should return row data in the right order', done => {
// 150 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = undefined;
const keyTo = undefined;
// the server will error after sending this chunk (not row)
const errorAfterChunkNo = 100;
it('should return row data in the right order', function (done) {
setWindowsTestTimeout(this);
const dataResults = [];

// TODO: Do not use `any` here, make it a more specific type and address downstream implications on the mock server.
// keyTo and keyFrom are not provided so they will be determined from
// the request that is passed in.
service.setService({
ReadRows: readRowsImpl2(
keyFrom,
keyTo,
errorAfterChunkNo
) as ServerImplementationInterface,
ReadRows: ReadRowsImpl.createService({
errorAfterChunkNo: 100, // the server will error after sending this chunk (not row)
valueSize: 1,
chunkSize: 1,
chunksPerResponse: 1,
debugLog,
}) as ServerImplementationInterface,
});
const sleep = (ms: number) => {
return new Promise(resolve => setTimeout(resolve, ms));
};
(async () => {
try {
// 150 rows must be enough to reproduce issues with losing the data and to create backpressure
const stream = table.createReadStream({
start: '00000000',
end: '00000150',
Expand Down
Loading
Loading