diff --git a/js/gulp/argv.js b/js/gulp/argv.js index 5c49fc5155a80..967644eabc736 100644 --- a/js/gulp/argv.js +++ b/js/gulp/argv.js @@ -22,6 +22,7 @@ export const argv = args([ { name: `target`, type: String, defaultValue: `` }, { name: `module`, type: String, defaultValue: `` }, { name: `coverage`, type: Boolean, defaultValue: false }, + { name: `tests`, type: String, multiple: true, defaultValue: [`test/unit/`] }, { name: `targets`, alias: `t`, type: String, multiple: true, defaultValue: [] }, { name: `modules`, alias: `m`, type: String, multiple: true, defaultValue: [] }, ], { partial: true }); diff --git a/js/gulp/test-task.js b/js/gulp/test-task.js index e2263049b3d76..f5126ffb8d4c4 100644 --- a/js/gulp/test-task.js +++ b/js/gulp/test-task.js @@ -36,19 +36,6 @@ import { createRequire } from 'node:module'; const require = createRequire(import.meta.url); const jestArgv = []; -const testFiles = [ - `test/unit/`, - // `test/unit/bit-tests.ts`, - // `test/unit/int-tests.ts`, - // `test/unit/bn-tests.ts`, - // `test/unit/math-tests.ts`, - // `test/unit/table-tests.ts`, - // `test/unit/generated-data-tests.ts`, - // `test/unit/builders/`, - // `test/unit/recordbatch/`, - // `test/unit/table/`, - // `test/unit/ipc/`, -]; if (argv.verbose) { jestArgv.push(`--verbose`); @@ -80,8 +67,10 @@ export const testTask = ((cache, execArgv, testOptions) => memoizeTask(cache, fu args.push(`-c`, `jestconfigs/jest.coverage.config.js`); } else { const cfgname = [target, format].filter(Boolean).join('.'); - args.push(`-c`, `jestconfigs/jest.${cfgname}.config.js`, ...testFiles); + args.push(`-c`, `jestconfigs/jest.${cfgname}.config.js`); } + args.push(...(argv._unknown || []).filter((x) => x !== 'test')); + args.push(...argv.tests); opts.env = { ...opts.env, TEST_TARGET: target, diff --git a/js/src/ipc/reader.ts b/js/src/ipc/reader.ts index 0b9b3fbe42a96..f84fe83db08d4 100644 --- a/js/src/ipc/reader.ts +++ b/js/src/ipc/reader.ts @@ -363,14 +363,11 @@ abstract class RecordBatchReaderImpl implements RecordB const { id, isDelta } = header; const { dictionaries, schema } = this; const dictionary = dictionaries.get(id); - if (isDelta || !dictionary) { - const type = schema.dictionaries.get(id)!; - const data = this._loadVectors(header.data, body, [type]); - return (dictionary && isDelta ? dictionary.concat( - new Vector(data)) : - new Vector(data)).memoize() as Vector; - } - return dictionary.memoize(); + const type = schema.dictionaries.get(id)!; + const data = this._loadVectors(header.data, body, [type]); + return (dictionary && isDelta ? dictionary.concat( + new Vector(data)) : + new Vector(data)).memoize() as Vector; } protected _loadVectors(header: metadata.RecordBatch, body: any, types: (Field | DataType)[]) { return new VectorLoader(body, header.nodes, header.buffers, this.dictionaries, this.schema.metadataVersion).visitMany(types); diff --git a/js/src/ipc/writer.ts b/js/src/ipc/writer.ts index 8d924ab64c8f9..cb74fe6fb6705 100644 --- a/js/src/ipc/writer.ts +++ b/js/src/ipc/writer.ts @@ -84,6 +84,7 @@ export class RecordBatchWriter extends ReadableInterop< protected _schema: Schema | null = null; protected _dictionaryBlocks: FileBlock[] = []; protected _recordBatchBlocks: FileBlock[] = []; + protected _seenDictionaries = new Map(); protected _dictionaryDeltaOffsets = new Map(); public toString(sync: true): string; @@ -144,6 +145,7 @@ export class RecordBatchWriter extends ReadableInterop< this._started = false; this._dictionaryBlocks = []; this._recordBatchBlocks = []; + this._seenDictionaries = new Map(); this._dictionaryDeltaOffsets = new Map(); if (!schema || !(compareSchemas(schema, this._schema))) { @@ -259,7 +261,6 @@ export class RecordBatchWriter extends ReadableInterop< } protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta = false) { - this._dictionaryDeltaOffsets.set(id, dictionary.length + (this._dictionaryDeltaOffsets.get(id) || 0)); const { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(new Vector([dictionary])); const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions); const dictionaryBatch = new metadata.DictionaryBatch(recordBatch, id, isDelta); @@ -284,14 +285,21 @@ export class RecordBatchWriter extends ReadableInterop< } protected _writeDictionaries(batch: RecordBatch) { - for (let [id, dictionary] of batch.dictionaries) { - let offset = this._dictionaryDeltaOffsets.get(id) || 0; - if (offset === 0 || (dictionary = dictionary?.slice(offset)).length > 0) { - for (const data of dictionary.data) { - this._writeDictionaryBatch(data, id, offset > 0); - offset += data.length; - } + for (const [id, dictionary] of batch.dictionaries) { + const chunks = dictionary?.data ?? []; + const prevDictionary = this._seenDictionaries.get(id); + const offset = this._dictionaryDeltaOffsets.get(id) ?? 0; + // * If no previous dictionary was written, write an initial DictionaryMessage. + // * If the current dictionary does not share chunks with the previous dictionary, write a replacement DictionaryMessage. + if (!prevDictionary || prevDictionary.data[0] !== chunks[0]) { + // * If `index > 0`, then `isDelta` is true. + // * If `index = 0`, then `isDelta` is false, because this is either the initial or a replacement DictionaryMessage. + for (const [index, chunk] of chunks.entries()) this._writeDictionaryBatch(chunk, id, index > 0); + } else if (offset < chunks.length) { + for (const chunk of chunks.slice(offset)) this._writeDictionaryBatch(chunk, id, true); } + this._seenDictionaries.set(id, dictionary); + this._dictionaryDeltaOffsets.set(id, chunks.length); } return this; } @@ -342,6 +350,13 @@ export class RecordBatchFileWriter extends RecordBatchW return this._writeMagic()._writePadding(2); } + protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta = false) { + if (!isDelta && this._seenDictionaries.has(id)) { + throw new Error('The Arrow File format does not support replacement dictionaries. '); + } + return super._writeDictionaryBatch(dictionary, id, isDelta); + } + protected _writeFooter(schema: Schema) { const buffer = Footer.encode(new Footer( schema, MetadataVersion.V5, @@ -369,13 +384,13 @@ export class RecordBatchJSONWriter extends RecordBatchW } private _recordBatches: RecordBatch[]; - private _dictionaries: RecordBatch[]; + private _recordBatchesWithDictionaries: RecordBatch[]; constructor() { super(); this._autoDestroy = true; this._recordBatches = []; - this._dictionaries = []; + this._recordBatchesWithDictionaries = []; } protected _writeMessage() { return this; } @@ -386,12 +401,11 @@ export class RecordBatchJSONWriter extends RecordBatchW } protected _writeDictionaries(batch: RecordBatch) { if (batch.dictionaries.size > 0) { - this._dictionaries.push(batch); + this._recordBatchesWithDictionaries.push(batch); } return this; } protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta = false) { - this._dictionaryDeltaOffsets.set(id, dictionary.length + (this._dictionaryDeltaOffsets.get(id) || 0)); this._write(this._dictionaryBlocks.length === 0 ? ` ` : `,\n `); this._write(dictionaryBatchToJSON(dictionary, id, isDelta)); this._dictionaryBlocks.push(new FileBlock(0, 0, 0)); @@ -403,9 +417,9 @@ export class RecordBatchJSONWriter extends RecordBatchW return this; } public close() { - if (this._dictionaries.length > 0) { + if (this._recordBatchesWithDictionaries.length > 0) { this._write(`,\n "dictionaries": [\n`); - for (const batch of this._dictionaries) { + for (const batch of this._recordBatchesWithDictionaries) { super._writeDictionaries(batch); } this._write(`\n ]`); @@ -424,7 +438,7 @@ export class RecordBatchJSONWriter extends RecordBatchW this._write(`\n}`); } - this._dictionaries = []; + this._recordBatchesWithDictionaries = []; this._recordBatches = []; return super.close(); diff --git a/js/test/unit/ipc/message-reader-tests.ts b/js/test/unit/ipc/message-reader-tests.ts index 741f4e7aa9009..6b770674d122c 100644 --- a/js/test/unit/ipc/message-reader-tests.ts +++ b/js/test/unit/ipc/message-reader-tests.ts @@ -24,11 +24,16 @@ for (const table of generateRandomTables([10, 20, 30])) { const io = ArrowIOTestHelper.stream(table); const name = `[\n ${table.schema.fields.join(',\n ')}\n]`; - const numMessages = table.batches.reduce((numMessages, batch) => { - return numMessages + - /* recordBatch message */ 1 + - /* dictionary messages */ batch.dictionaries.size; - }, /* schema message */ 1); + + const numDictionaries = table.batches.reduce((dictionaries, batch) => { + return [...batch.dictionaries.values()] + .flatMap((dictionary) => dictionary.data) + .reduce((dictionaries, data) => dictionaries.add(data), dictionaries); + }, new Set()).size; + + const numMessages = /* schema message */ 1 + + /* recordBatch messages */ table.batches.length + + /* dictionary messages */ numDictionaries; const validate = validateMessageReader.bind(0, numMessages); const validateAsync = validateAsyncMessageReader.bind(0, numMessages); diff --git a/js/test/unit/ipc/writer/file-writer-tests.ts b/js/test/unit/ipc/writer/file-writer-tests.ts index 7b75dd8d4fab3..2b99d0f725f2f 100644 --- a/js/test/unit/ipc/writer/file-writer-tests.ts +++ b/js/test/unit/ipc/writer/file-writer-tests.ts @@ -18,9 +18,20 @@ import { generateDictionaryTables, generateRandomTables } from '../../../data/tables.js'; +import * as generate from '../../../generate-test-data.js'; import { validateRecordBatchIterator } from '../validate.js'; -import { RecordBatchFileWriter, RecordBatchReader, Table } from 'apache-arrow'; +import { + builderThroughIterable, + Dictionary, + Int32, + RecordBatch, + RecordBatchFileWriter, + RecordBatchReader, + Table, + Uint32, + Vector +} from 'apache-arrow'; describe('RecordBatchFileWriter', () => { for (const table of generateRandomTables([10, 20, 30])) { @@ -29,6 +40,55 @@ describe('RecordBatchFileWriter', () => { for (const table of generateDictionaryTables([10, 20, 30])) { testFileWriter(table, `${table.schema.fields[0]}`); } + + it('should throw if attempting to write replacement dictionary batches', async () => { + const type = new Dictionary(new Uint32, new Int32, 0); + const writer = new RecordBatchFileWriter(); + writer.write(new RecordBatch({ + // Clone the data with the original Dictionary type so the cloned chunk has id 0 + dictionary_encoded_uint32: generate.dictionary(50, 20, new Uint32, new Int32).vector.data[0].clone(type) + })); + expect(() => { + writer.write(new RecordBatch({ + // Clone the data with the original Dictionary type so the cloned chunk has id 0 + dictionary_encoded_uint32: generate.dictionary(50, 20, new Uint32, new Int32).vector.data[0].clone(type) + })); + }).toThrow(); + }); + + it('should write delta dictionary batches', async () => { + + const name = 'dictionary_encoded_uint32'; + const resultChunks: Vector>[] = []; + const { + vector: sourceVector, values: sourceValues, + } = generate.dictionary(1000, 20, new Uint32(), new Int32()); + + const writer = RecordBatchFileWriter.writeAll((function* () { + const transform = builderThroughIterable({ + type: sourceVector.type, nullValues: [null], + queueingStrategy: 'count', highWaterMark: 50, + }); + for (const chunk of transform(sourceValues())) { + resultChunks.push(chunk); + yield new RecordBatch({ [name]: chunk.data[0] }); + } + })()); + + expect(new Vector(resultChunks)).toEqualVector(sourceVector); + + type T = { [name]: Dictionary }; + const sourceTable = new Table({ [name]: sourceVector }); + const resultTable = new Table(RecordBatchReader.from(await writer.toUint8Array())); + + const child = resultTable.getChild(name)!; + const dicts = child.data.map(({ dictionary }) => dictionary!); + const dictionary = dicts[child.data.length - 1]; + + expect(resultTable).toEqualTable(sourceTable); + expect(dictionary).toBeInstanceOf(Vector); + expect(dictionary.data).toHaveLength(20); + }); }); function testFileWriter(table: Table, name: string) { diff --git a/js/test/unit/ipc/writer/stream-writer-tests.ts b/js/test/unit/ipc/writer/stream-writer-tests.ts index 8425395393bf2..11bbe7362d88e 100644 --- a/js/test/unit/ipc/writer/stream-writer-tests.ts +++ b/js/test/unit/ipc/writer/stream-writer-tests.ts @@ -25,6 +25,7 @@ import { validateRecordBatchIterator } from '../validate.js'; import type { RecordBatchStreamWriterOptions } from 'apache-arrow/ipc/writer'; import { builderThroughIterable, + Data, Dictionary, Field, Int32, @@ -81,10 +82,46 @@ describe('RecordBatchStreamWriter', () => { await validate; }); + it('should write replacement dictionary batches', async () => { + + const name = 'dictionary_encoded_uint32'; + const type = new Dictionary(new Uint32, new Int32, 0); + const sourceChunks: Data>[] = []; + const resultChunks: Data>[] = []; + + const writer = RecordBatchStreamWriter.writeAll((function* () { + for (let i = 0; i < 1000; i += 50) { + const { vector: { data: [chunk] } } = generate.dictionary(50, 20, type.dictionary, type.indices); + sourceChunks.push(chunk); + // Clone the data with the original Dictionary type so the cloned chunk has id 0 + resultChunks.push(chunk.clone(type)); + yield new RecordBatch({ [name]: resultChunks.at(-1)! }); + } + })()); + + expect(new Vector(resultChunks)).toEqualVector(new Vector(sourceChunks)); + + type T = { [name]: Dictionary }; + const sourceTable = new Table({ [name]: new Vector(sourceChunks) }); + const resultTable = new Table(RecordBatchReader.from(await writer.toUint8Array())); + + // 1000 / 50 = 20 + expect(sourceTable.batches).toHaveLength(20); + expect(resultTable.batches).toHaveLength(20); + expect(resultTable).toEqualTable(sourceTable); + + for (const batch of resultTable.batches) { + for (const [_, dictionary] of batch.dictionaries) { + expect(dictionary).toBeInstanceOf(Vector); + expect(dictionary.data).toHaveLength(1); + } + } + }); + it('should write delta dictionary batches', async () => { const name = 'dictionary_encoded_uint32'; - const chunks: Vector>[] = []; + const resultChunks: Vector>[] = []; const { vector: sourceVector, values: sourceValues, } = generate.dictionary(1000, 20, new Uint32(), new Int32()); @@ -95,12 +132,12 @@ describe('RecordBatchStreamWriter', () => { queueingStrategy: 'count', highWaterMark: 50, }); for (const chunk of transform(sourceValues())) { - chunks.push(chunk); + resultChunks.push(chunk); yield new RecordBatch({ [name]: chunk.data[0] }); } })()); - expect(new Vector(chunks)).toEqualVector(sourceVector); + expect(new Vector(resultChunks)).toEqualVector(sourceVector); type T = { [name]: Dictionary }; const sourceTable = new Table({ [name]: sourceVector });