Skip to content

Commit

Permalink
PQ Api improvements #288
Browse files Browse the repository at this point in the history
- Add drop_quantization API that deletes all PQ-quantization resources associated with a table
- Make sure all quantization resources and table hooks are deleted when the lantern extension is deleted
- Current api has quantize_vector but decompress_vector. switch to quantize/dequantize language
- Make quantize_table work with all kinds of table names and formattings
- Allow quantizing subset of a table
  • Loading branch information
var77 committed Feb 20, 2024
1 parent 9ffc347 commit 5d7da85
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 46 deletions.
62 changes: 47 additions & 15 deletions sql/lantern.sql
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ CREATE FUNCTION _lantern_internal.failure_point_enable(func TEXT, name TEXT, don
CREATE FUNCTION _lantern_internal.continue_blockmap_group_initialization(index regclass) RETURNS VOID
AS 'MODULE_PATHNAME', 'lantern_internal_continue_blockmap_group_initialization' LANGUAGE C STABLE STRICT PARALLEL UNSAFE;

CREATE FUNCTION _lantern_internal.create_pq_codebook(REGCLASS, NAME, INT, INT, TEXT) RETURNS REAL[][][]
CREATE FUNCTION _lantern_internal.create_pq_codebook(REGCLASS, NAME, INT, INT, TEXT, INT) RETURNS REAL[][][]
AS 'MODULE_PATHNAME', 'create_pq_codebook' LANGUAGE C STABLE STRICT PARALLEL UNSAFE;
-- operator classes
CREATE OR REPLACE FUNCTION _lantern_internal._create_ldb_operator_classes(access_method_name TEXT) RETURNS BOOLEAN AS $$
Expand Down Expand Up @@ -235,9 +235,11 @@ END;
$$
LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION create_pq_codebook(tbl REGCLASS, col NAME, cluster_cnt INT, subvector_count INT, distance_metric TEXT)
CREATE OR REPLACE FUNCTION create_pq_codebook(p_tbl REGCLASS, p_col NAME, cluster_cnt INT, subvector_count INT, distance_metric TEXT, dataset_size_limit INT DEFAULT 0)
RETURNS NAME AS $$
DECLARE
tbl NAME;
col NAME;
stmt TEXT;
res REAL[];
codebooks REAL[][][];
Expand All @@ -246,12 +248,14 @@ DECLARE
codebook_table NAME;
dim INT;
BEGIN

tbl := regexp_replace(trim(both '"' FROM p_tbl::TEXT), '^.*\.', '');
col := trim(both '"' FROM p_col);

stmt := format('SELECT array_length(%I, 1) FROM %I WHERE %1$I IS NOT NULL LIMIT 1', col, tbl);
EXECUTE stmt INTO dim;

-- Get codebooks
codebooks := _lantern_internal.create_pq_codebook(tbl, col, cluster_cnt, subvector_count, distance_metric);
codebooks := _lantern_internal.create_pq_codebook(p_tbl, col, cluster_cnt, subvector_count, distance_metric, dataset_size_limit);

-- Create codebook table
codebook_table := format('_lantern_internal."_codebook_%s_%s"', tbl, col);
Expand Down Expand Up @@ -338,8 +342,8 @@ BEGIN
END;
$$ LANGUAGE plpgsql;

-- Decompress vector using codebook
CREATE OR REPLACE FUNCTION decompress_vector(v pqvec, codebook regclass)
-- Dequantize vector using codebook
CREATE OR REPLACE FUNCTION dequantize_vector(v pqvec, codebook regclass)
RETURNS REAL[] AS $$
DECLARE
res REAL[];
Expand Down Expand Up @@ -371,12 +375,14 @@ END;
$$ LANGUAGE plpgsql;

-- Quantize table
CREATE OR REPLACE FUNCTION quantize_table(tbl regclass, col NAME, cluster_cnt INT,subvector_count INT, distance_metric TEXT)
CREATE OR REPLACE FUNCTION quantize_table(p_tbl regclass, p_col NAME, cluster_cnt INT,subvector_count INT, distance_metric TEXT, dataset_size_limit INT DEFAULT 0)
RETURNS VOID AS $$
DECLARE
subvector REAL[];
id INT;
stmt TEXT;
tbl NAME;
col NAME;
pq_col_name NAME;
codebook_table NAME;
trigger_func_name NAME;
Expand All @@ -385,16 +391,19 @@ DECLARE
pg_version INT;
column_exists BOOLEAN;
BEGIN
tbl := regexp_replace(trim(both '"' FROM p_tbl::TEXT), '^.*\.', '');
col := trim(both '"' FROM p_col);

pg_version := (SELECT setting FROM pg_settings WHERE name = 'server_version_num');
pq_col_name := format('%I_pq', col);
pq_col_name := format('%s_pq', col);

column_exists := (SELECT true FROM pg_attribute WHERE attrelid = tbl AND attname = pq_col_name AND NOT attisdropped);
column_exists := (SELECT true FROM pg_attribute WHERE attrelid = p_tbl AND attname = pq_col_name AND NOT attisdropped);

IF column_exists THEN
RAISE EXCEPTION 'Column % already exists in table', pq_col_name;
END IF;
-- Create codebook
codebook_table := create_pq_codebook(tbl, col, cluster_cnt, subvector_count, distance_metric);
codebook_table := create_pq_codebook(p_tbl, col, cluster_cnt, subvector_count, distance_metric, dataset_size_limit);

-- Compress vectors
RAISE INFO 'Compressing vectors...';
Expand All @@ -406,13 +415,13 @@ BEGIN
stmt := format('ALTER TABLE %I ADD COLUMN %I PQVEC', tbl, pq_col_name);
EXECUTE stmt;

stmt := format('UPDATE %1$I SET %2$I_pq=_lantern_internal.quantize_vector(%2$I, %3$L, %4$L::regclass, %5$L)', tbl, col, subvector_count, codebook_table, distance_metric);
stmt := format('UPDATE %1$I SET "%2$s_pq"=_lantern_internal.quantize_vector(%2$I, %3$L, %4$L::regclass, %5$L)', tbl, col, subvector_count, codebook_table, distance_metric);
EXECUTE stmt;

-- Create trigger to update pq values based on vector value
trigger_func_name := format('_set_pq_col_%s', md5(tbl || col));
trigger_func_name := format('"_lantern_internal"._set_pq_col_%s', md5(tbl || col));
stmt := format('
CREATE OR REPLACE FUNCTION %I()
CREATE OR REPLACE FUNCTION %s()
RETURNS trigger
LANGUAGE plpgsql AS
$body$
Expand All @@ -435,7 +444,7 @@ BEGIN
stmt := format('DROP TRIGGER IF EXISTS %I ON %I', update_trigger_name, tbl);
EXECUTE stmt;

stmt := format('CREATE TRIGGER %I BEFORE INSERT ON %I FOR EACH ROW WHEN (NEW.%I IS NOT NULL) EXECUTE FUNCTION %I()',
stmt := format('CREATE TRIGGER %I BEFORE INSERT ON %I FOR EACH ROW WHEN (NEW.%I IS NOT NULL) EXECUTE FUNCTION %s()',
insert_trigger_name,
tbl,
col,
Expand All @@ -444,7 +453,7 @@ BEGIN

EXECUTE stmt;

stmt := format('CREATE TRIGGER %1$I BEFORE UPDATE OF %2$I ON %3$I FOR EACH ROW WHEN (NEW.%2$I IS NOT NULL) EXECUTE FUNCTION %4$I()',
stmt := format('CREATE TRIGGER %1$I BEFORE UPDATE OF %2$I ON %3$I FOR EACH ROW WHEN (NEW.%2$I IS NOT NULL) EXECUTE FUNCTION %4$s()',
update_trigger_name,
col,
tbl,
Expand All @@ -454,3 +463,26 @@ BEGIN
END IF;
END;
$$ LANGUAGE plpgsql;

CREATE FUNCTION drop_quantization(p_tbl regclass, p_col NAME)
RETURNS VOID AS $$
DECLARE
tbl NAME;
col NAME;
pq_col_name NAME;
codebook_table NAME;
trigger_func_name NAME;
BEGIN
tbl := regexp_replace(trim(both '"' FROM p_tbl::TEXT), '^.*\.', '');
col := trim(both '"' FROM p_col);
codebook_table := format('_lantern_internal."_codebook_%s_%s"', tbl, col);
pq_col_name := format('%s_pq', col);
trigger_func_name := format('"_lantern_internal"._set_pq_col_%s', md5(tbl || col));

EXECUTE format('DROP TABLE IF EXISTS %s CASCADE', codebook_table);

EXECUTE format('ALTER TABLE %I DROP COLUMN IF EXISTS %I', tbl, pq_col_name);

EXECUTE format('DROP FUNCTION IF EXISTS %s CASCADE', trigger_func_name);
END;
$$ LANGUAGE plpgsql;
1 change: 0 additions & 1 deletion src/hnsw.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ PGDLLEXPORT Datum cos_dist(PG_FUNCTION_ARGS);
PGDLLEXPORT Datum cos_dist_with_guard(PG_FUNCTION_ARGS);
PGDLLEXPORT Datum vector_cos_dist(PG_FUNCTION_ARGS);
PGDLLEXPORT Datum lantern_reindex_external_index(PG_FUNCTION_ARGS);
PGDLLEXPORT Datum create_pq_codebook(PG_FUNCTION_ARGS);

HnswColumnType GetColumnTypeFromOid(Oid oid);
HnswColumnType GetIndexColumnType(Relation index);
Expand Down
12 changes: 12 additions & 0 deletions src/hnsw/pqtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Datum create_pq_codebook(PG_FUNCTION_ARGS)
uint32 cluster_cnt = PG_GETARG_UINT32(2);
uint32 subvector_cnt = PG_GETARG_UINT32(3);
text *distance_metric_text = PG_GETARG_TEXT_P(4);
uint32 dataset_size_limit = PG_GETARG_UINT32(5);
usearch_metric_kind_t distance_metric;
// -----------------
// Dataset variables
Expand Down Expand Up @@ -80,6 +81,10 @@ Datum create_pq_codebook(PG_FUNCTION_ARGS)
elog(ERROR, "Cluster count can not be greater than %d", 1 << 8);
}

if(dataset_size_limit > 0 && dataset_size_limit < cluster_cnt) {
elog(ERROR, "Dataset size limit should be greater or equal to cluster_cnt count");
}

distance_metric = GetMetricKindFromStr(text_to_cstring(distance_metric_text));

table = relation_open(tablerelid, AccessShareLock);
Expand All @@ -106,15 +111,22 @@ Datum create_pq_codebook(PG_FUNCTION_ARGS)
dataset_dim = current_tuple_dim;
// TODO:: this check can be removed as soon as we resolve return type issue from this function
if(dataset_dim % subvector_cnt != 0) {
heap_endscan(scan);
relation_close(table, AccessShareLock);
elog(ERROR, "Dataset dimensions should be divisible by subvector count");
}
} else if(current_tuple_dim != dataset_dim) {
heap_endscan(scan);
relation_close(table, AccessShareLock);
elog(ERROR, "Table should have equally sized array: expected %d got %d", dataset_dim, current_tuple_dim);
}

dataset[ dataset_size++ ] = (float4 *)ARR_DATA_PTR(array);

if(dataset_size_limit > 0 && dataset_size == dataset_size_limit) {
break;
}

if(estimated_row_count == dataset_size - 1) {
dataset = repalloc(dataset, estimated_row_count * 2 * sizeof(size_t));
estimated_row_count *= 2;
Expand Down
5 changes: 3 additions & 2 deletions test/expected/ext_relocation.out
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ ORDER BY 1, 3, 2;
schema1 | validate_index | _lantern_internal
schema1 | cos_dist | schema1
schema1 | create_pq_codebook | schema1
schema1 | decompress_vector | schema1
schema1 | dequantize_vector | schema1
schema1 | drop_quantization | schema1
schema1 | hamming_dist | schema1
schema1 | hnsw_handler | schema1
schema1 | l2sq_dist | schema1
Expand All @@ -59,7 +60,7 @@ ORDER BY 1, 3, 2;
schema1 | ldb_pqvec_send | schema1
schema1 | quantize_table | schema1
schema1 | quantize_vector | schema1
(25 rows)
(26 rows)

-- show all the extension operators
SELECT ne.nspname AS extschema, op.oprname, np.nspname AS proschema
Expand Down
123 changes: 109 additions & 14 deletions test/expected/hnsw_pq.out
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,23 @@ SELECT '{84,1,4,128,255}'::INT[]::pqvec;
SELECT '{84,1,4,128,256}'::pqvec;
ERROR: Compressed vector element can not be bigger than 255 at character 8
-- Verify wrong argument assertions
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'nonexistant', 10, 32, 'l2sq');
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'nonexistant', 10, 32, 'l2sq', 0);
ERROR: Column nonexistant not found in table
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 1001, 32, 'l2sq');
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 1001, 32, 'l2sq', 0);
ERROR: Cluster count can not be greater than 256
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 10, 33, 'l2sq');
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 10, 33, 'l2sq', 0);
ERROR: Dataset dimensions should be divisible by subvector count
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 10, 32, 'l2sqz');
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 10, 32, 'l2sqz', 0);
ERROR: Unsupported metric kind: l2sqz . Should be one of (l2sq, cos, hamming)
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 257, 32, 'l2sq');
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 257, 32, 'l2sq', 0);
ERROR: Cluster count can not be greater than 256
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 257, 0, 'l2sq');
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 257, 0, 'l2sq', 0);
ERROR: Subvector count can not be zero
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 256, 0, 'l2sq', 10);
ERROR: Subvector count can not be zero
\set ON_ERROR_STOP on
-- This should create codebook[1][1][128]
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 1, 1, 'l2sq') as codebook \gset
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 1, 1, 'l2sq', 0) as codebook \gset
INFO: Table scanned. Dataset size 1000
INFO: Starting k-means over dataset with (subvectors=1, clusters=1)
INFO: Codebooks created
Expand All @@ -133,7 +135,7 @@ SELECT array_length(:'codebook'::REAL[][][], 3);
(1 row)

-- This should create codebook[1][10][128]
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 10, 1, 'l2sq') as codebook \gset
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 10, 1, 'l2sq', 0) as codebook \gset
INFO: Table scanned. Dataset size 1000
INFO: Starting k-means over dataset with (subvectors=1, clusters=10)
INFO: Codebooks created
Expand All @@ -156,7 +158,7 @@ SELECT array_length(:'codebook'::REAL[][][], 3);
(1 row)

-- This should create codebook[32][10][4]
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 10, 32, 'l2sq') as codebook \gset
SELECT _lantern_internal.create_pq_codebook('sift_base1k'::regclass, 'v', 10, 32, 'l2sq', 0) as codebook \gset
INFO: Table scanned. Dataset size 1000
INFO: Starting k-means over dataset with (subvectors=32, clusters=10)
INFO: Codebooks created
Expand Down Expand Up @@ -223,18 +225,18 @@ ERROR: Cannot modify readonly table.
INSERT INTO _lantern_internal._codebook_sift_base1k_v (subvector_id, centroid_id, c) VALUES (1, 1, '{1,2,3,4}');
ERROR: Cannot modify readonly table.
-- Validate that compressing invalid vector raises an error
SELECT decompress_vector('{}'::pqvec, '_lantern_internal._codebook_sift_base1k_v'::regclass);
SELECT dequantize_vector('{}'::pqvec, '_lantern_internal._codebook_sift_base1k_v'::regclass);
ERROR: pqvector can not be empty at character 26
SELECT decompress_vector('{1,2,3}'::pqvec, '_lantern_internal._codebook_sift_base1k_v'::regclass);
SELECT dequantize_vector('{1,2,3}'::pqvec, '_lantern_internal._codebook_sift_base1k_v'::regclass);
ERROR: Codebook has 32 subvectors, but vector is quantized in 3 subvectors
\set ON_ERROR_STOP on
-- Compression and Decompression
-- Verify that vector was compressed correctly when generating quantized column
SELECT v as v1 FROM sift_base1k WHERE id=1 \gset
SELECT v_pq as v1_pq FROM sift_base1k WHERE id=1 \gset
SELECT quantize_vector(:'v1', '_lantern_internal._codebook_sift_base1k_v'::regclass, 'l2sq') as compressed \gset
SELECT decompress_vector(:'v1_pq', '_lantern_internal._codebook_sift_base1k_v'::regclass) as decompressed_1 \gset
SELECT decompress_vector(:'compressed', '_lantern_internal._codebook_sift_base1k_v'::regclass) as decompressed_2 \gset
SELECT dequantize_vector(:'v1_pq', '_lantern_internal._codebook_sift_base1k_v'::regclass) as decompressed_1 \gset
SELECT dequantize_vector(:'compressed', '_lantern_internal._codebook_sift_base1k_v'::regclass) as decompressed_2 \gset
SELECT l2sq_dist(:'decompressed_1', :'decompressed_2');
l2sq_dist
-----------
Expand All @@ -243,7 +245,7 @@ SELECT l2sq_dist(:'decompressed_1', :'decompressed_2');

-- Test recall for quantized vs non quantized vectors
ALTER TABLE sift_base1k ADD COLUMN v_pq_dec REAL[];
UPDATE sift_base1k SET v_pq_dec=decompress_vector(v_pq, '_lantern_internal._codebook_sift_base1k_v');
UPDATE sift_base1k SET v_pq_dec=dequantize_vector(v_pq, '_lantern_internal._codebook_sift_base1k_v');
-- Calculate recall over original vector
SELECT (calculate_table_recall('sift_base1k', 'sift_query1k', 'sift_truth1k', 'v', 10, 100) -
calculate_table_recall('sift_base1k', 'sift_query1k', 'sift_truth1k', 'v_pq_dec', 10, 100)) as recall_diff \gset
Expand Down Expand Up @@ -317,3 +319,96 @@ SELECT array_length(c, 1) FROM _lantern_internal._codebook_sift_base1k_v_pq_dec
4
(1 row)

-- Test that resources are being cleared correctly
SELECT drop_quantization('sift_base1k'::regclass, 'v');
drop_quantization
-------------------

(1 row)

SELECT drop_quantization('sift_base1k'::regclass, 'v_pq_dec');
drop_quantization
-------------------

(1 row)

SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = 'sift_base1k';
column_name
-------------
id
v
v_pq_dec
(3 rows)

SELECT table_name FROM information_schema.tables WHERE table_schema = '_lantern_internal';
table_name
------------
(0 rows)

-- Test quantization over subset of data
SELECT quantize_table('sift_base1k'::regclass, 'v', 10, 32, 'l2sq', 500);
INFO: Table scanned. Dataset size 500
INFO: Starting k-means over dataset with (subvectors=32, clusters=10)
INFO: Codebooks created
INFO: Compressing vectors...
quantize_table
----------------

(1 row)

SELECT COUNT(DISTINCT subvector_id) FROM _lantern_internal._codebook_sift_base1k_v;
count
-------
32
(1 row)

SELECT COUNT(DISTINCT centroid_id) FROM _lantern_internal._codebook_sift_base1k_v;
count
-------
10
(1 row)

SELECT COUNT(*) FROM _lantern_internal._codebook_sift_base1k_v;
count
-------
320
(1 row)

SELECT array_length(c, 1) FROM _lantern_internal._codebook_sift_base1k_v LIMIT 1;
array_length
--------------
4
(1 row)

-- Test quantization with mixed case and schema qualified table name
SELECT id, v AS "v_New" into "sift_Base1k_NEW" FROM sift_base1k;
SELECT quantize_table('"public"."sift_Base1k_NEW"'::regclass, 'v_New', 10, 32, 'l2sq');
INFO: Table scanned. Dataset size 1001
INFO: Starting k-means over dataset with (subvectors=32, clusters=10)
INFO: Codebooks created
INFO: Compressing vectors...
quantize_table
----------------

(1 row)

SELECT array_length(
dequantize_vector(
quantize_vector(
(SELECT "v_New" FROM "sift_Base1k_NEW" WHERE id=1),
'_lantern_internal."_codebook_sift_Base1k_NEW_v_New"'::regclass,
'l2sq'),
'_lantern_internal."_codebook_sift_Base1k_NEW_v_New"'::regclass),
1
);
array_length
--------------
128
(1 row)

SELECT drop_quantization('"sift_Base1k_NEW"'::regclass, 'v_New');
drop_quantization
-------------------

(1 row)

Loading

0 comments on commit 5d7da85

Please sign in to comment.