Skip to content

Commit

Permalink
Fix permissions for non-superusers to use lantern.async_task
Browse files Browse the repository at this point in the history
- Grant necessary permissions to pg_cron resources and lantern.tasks
  table
- Refactor pg_cron unscheduling logic to bypass pg_cron unscheduling
related issue: citusdata/pg_cron#320
citusdata/pg_cron#320
  • Loading branch information
Ngalstyan4 committed Apr 26, 2024
1 parent c217ab6 commit 9e2b535
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 3 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ set (_update_files
sql/updates/0.2.1--0.2.2.sql
sql/updates/0.2.2--0.2.3.sql
sql/updates/0.2.3--0.2.4.sql
sql/updates/0.2.4--0.2.5.sql
)

# Generate version information for the binary
Expand Down
46 changes: 43 additions & 3 deletions sql/lantern.sql
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ BEGIN
RAISE NOTICE 'pg_cron extension not found. Skipping lantern async task setup';
RETURN;
END IF;
GRANT USAGE ON SCHEMA cron TO PUBLIC;

CREATE TABLE lantern.tasks (
jobid bigserial primary key,
Expand All @@ -500,7 +501,8 @@ BEGIN
error_message text
);

GRANT SELECT ON lantern.tasks TO public;
GRANT SELECT, INSERT, UPDATE, DELETE ON lantern.tasks TO public;
GRANT USAGE, SELECT ON SEQUENCE lantern.tasks_jobid_seq TO public;
ALTER TABLE lantern.tasks ENABLE ROW LEVEL SECURITY;
CREATE POLICY lantern_tasks_policy ON lantern.tasks USING (username OPERATOR(pg_catalog.=) current_user);

Expand All @@ -521,7 +523,7 @@ BEGIN
-- Get the job name from the jobid
-- Call the job finalizer if corresponding job exists BOTH in lantern async tasks AND
-- active cron jobs
UPDATE lantern.tasks t SET
UPDATE lantern.tasks t SET
(duration, status, error_message, pg_cron_job_name) = (run.end_time - t.started_at, NEW.status,
CASE WHEN NEW.status = 'failed' THEN return_message ELSE NULL END,
c.jobname )
Expand All @@ -532,9 +534,16 @@ BEGIN
t.pg_cron_job_name = c.jobname AND
c.jobid = NEW.jobid
-- using returning as a trick to run the unschedule function as a side effect
RETURNING cron.unschedule(t.pg_cron_job_name) INTO res;
-- Note: have to unschedule by jobid because of pg_cron#320 https://github.com/citusdata/pg_cron/issues/320
RETURNING cron.unschedule(t.jobid) INTO res;

RETURN NEW;

EXCEPTION
WHEN OTHERS THEN
RAISE WARNING 'Lantern Async tasks: Unknown job failure in % % %', NEW, SQLERRM, SQLSTATE;
PERFORM cron.unschedule(NEW.jobid);
RETURN NEW;
END
$$ LANGUAGE plpgsql;

Expand Down Expand Up @@ -887,3 +896,34 @@ $weighted_vector_search$ LANGUAGE plpgsql;

SELECT _lantern_internal.maybe_setup_weighted_vector_search();
DROP FUNCTION _lantern_internal.maybe_setup_weighted_vector_search;

-- helper function to mask large vectors in explain outputs of queries containing vectors
CREATE OR REPLACE FUNCTION lantern.masked_explain(
query text,
do_analyze boolean = true,
buffers boolean = true,
costs boolean = true,
timing boolean = true
) RETURNS text AS $$
DECLARE
explain_query text;
explain_output jsonb;
flags text = '';
BEGIN
IF do_analyze THEN
flags := flags || 'ANALYZE, ';
END IF;
IF buffers THEN
flags := flags || 'BUFFERS, ';
END IF;
IF costs THEN
flags := flags || 'COSTS, ';
END IF;
IF timing THEN
flags := flags || 'TIMING ';
END IF;
explain_query := format('EXPLAIN (%s, FORMAT JSON) %s', flags, query);
EXECUTE explain_query INTO explain_output;
RETURN jsonb_pretty(_lantern_internal.mask_order_by_in_plan(explain_output));
END $$ LANGUAGE plpgsql;

97 changes: 97 additions & 0 deletions sql/updates/0.2.4--0.2.5.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@

DROP TRIGGER IF EXISTS status_change_trigger ON cron.job_run_details;
DROP FUNCTION IF EXISTS _lantern_internal.async_task_finalizer_trigger();

DO $async_update$

BEGIN
IF NOT (SELECT EXISTS (SELECT 1 FROM information_schema.schemata WHERE schema_name = 'cron'))
THEN
RAISE NOTICE 'pg_cron extension not found. Skipping lantern async task setup';
RETURN;
END IF;

GRANT USAGE ON SCHEMA cron TO PUBLIC;
GRANT SELECT, INSERT, UPDATE, DELETE ON lantern.tasks TO public;
GRANT USAGE, SELECT ON SEQUENCE lantern.tasks_jobid_seq TO public;

-- create a trigger and added to cron.job_run_details
CREATE OR REPLACE FUNCTION _lantern_internal.async_task_finalizer_trigger() RETURNS TRIGGER AS $$
DECLARE
res RECORD;
BEGIN
-- if NEW.status is one of "starting", "running", "sending, "connecting", return
IF NEW.status IN ('starting', 'running', 'sending', 'connecting') THEN
RETURN NEW;
END IF;

IF NEW.status NOT IN ('succeeded', 'failed') THEN
RAISE WARNING 'Lantern Async tasks: Unexpected status %', NEW.status;
END IF;

-- Get the job name from the jobid
-- Call the job finalizer if corresponding job exists BOTH in lantern async tasks AND
-- active cron jobs
UPDATE lantern.tasks t SET
(duration, status, error_message, pg_cron_job_name) = (run.end_time - t.started_at, NEW.status,
CASE WHEN NEW.status = 'failed' THEN return_message ELSE NULL END,
c.jobname )
FROM cron.job c
LEFT JOIN cron.job_run_details run
ON c.jobid = run.jobid
WHERE
t.pg_cron_job_name = c.jobname AND
c.jobid = NEW.jobid
-- using returning as a trick to run the unschedule function as a side effect
-- Note: have to unschedule by jobid because of pg_cron#320 https://github.com/citusdata/pg_cron/issues/320
RETURNING cron.unschedule(t.jobid) INTO res;

RETURN NEW;

EXCEPTION
WHEN OTHERS THEN
RAISE WARNING 'Lantern Async tasks: Unknown job failure in % % %', NEW, SQLERRM, SQLSTATE;
PERFORM cron.unschedule(NEW.jobid);
RETURN NEW;
END
$$ LANGUAGE plpgsql;

CREATE TRIGGER status_change_trigger
AFTER UPDATE OF status
ON cron.job_run_details
FOR EACH ROW
WHEN (OLD.status IS DISTINCT FROM NEW.status)
EXECUTE FUNCTION _lantern_internal.async_task_finalizer_trigger();

$async_update$
LANGUAGE plpgsql;

-- helper function to mask large vectors in explain outputs of queries containing vectors
CREATE OR REPLACE FUNCTION lantern.masked_explain(
query text,
do_analyze boolean = true,
buffers boolean = true,
costs boolean = true,
timing boolean = true
) RETURNS text AS $$
DECLARE
explain_query text;
explain_output jsonb;
flags text = '';
BEGIN
IF do_analyze THEN
flags := flags || 'ANALYZE, ';
END IF;
IF buffers THEN
flags := flags || 'BUFFERS, ';
END IF;
IF costs THEN
flags := flags || 'COSTS, ';
END IF;
IF timing THEN
flags := flags || 'TIMING ';
END IF;
explain_query := format('EXPLAIN (%s, FORMAT JSON) %s', flags, query);
EXECUTE explain_query INTO explain_output;
RETURN jsonb_pretty(_lantern_internal.mask_order_by_in_plan(explain_output));
END $$ LANGUAGE plpgsql;
48 changes: 48 additions & 0 deletions test/expected/async_tasks.out
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,51 @@ SELECT jobid, query, pg_cron_job_name, job_name, duration IS NOT NULL AS is_done
(4 rows)

-- NOTE: the test finishes but the async index creation may still be in progress
-- create non superuser and test the function
SET client_min_messages = WARNING;
-- suppress NOTICE: role "test_user" does not exist, skipping
DROP USER IF EXISTS test_user_async;
SET client_min_messages = NOTICE;
CREATE USER test_user_async WITH PASSWORD 'test_password';
GRANT SELECT ON "sift_base1k_UpperCase" TO test_user_async;
GRANT SELECT ON sift_base1k_id_seq TO test_user_async;
SET ROLE test_user_async;
SELECT lantern.async_task($$SELECT 1$$, 'simple job');
NOTICE: Job scheduled with pg_cron name: 'async_task_5'
async_task
------------
5
(1 row)

SELECT lantern.async_task($$CREATE INDEX idx2 ON "sift_base1k_UpperCase" USING lantern_hnsw (v) WITH (dim=128, M=6);$$, 'Indexing Job');
NOTICE: Job scheduled with pg_cron name: 'async_task_6'
async_task
------------
6
(1 row)

-- this should fail since test_user does not have permission to drop the table
-- sql line for do not stop on error
SELECT lantern.async_task('DROP TABLE "sift_base1k_UpperCase";', 'Dropping Table Job');
NOTICE: Job scheduled with pg_cron name: 'async_task_7'
async_task
------------
7
(1 row)

SELECT pg_sleep(4);
pg_sleep
----------

(1 row)

SELECT jobid, query, pg_cron_job_name, job_name, duration IS NOT NULL AS is_done, status, error_message FROM lantern.tasks ORDER BY jobid;
jobid | query | pg_cron_job_name | job_name | is_done | status | error_message
-------+------------------------------------------------------------------------------------------+------------------+--------------------+---------+-----------+------------------------------------------------------
5 | SELECT 1 | async_task_5 | simple job | t | succeeded |
6 | CREATE INDEX idx2 ON "sift_base1k_UpperCase" USING lantern_hnsw (v) WITH (dim=128, M=6); | async_task_6 | Indexing Job | t | failed | ERROR: must be owner of table sift_base1k_UpperCase+
| | | | | |
7 | DROP TABLE "sift_base1k_UpperCase"; | async_task_7 | Dropping Table Job | t | failed | ERROR: must be owner of table sift_base1k_UpperCase+
| | | | | |
(3 rows)

20 changes: 20 additions & 0 deletions test/sql/async_tasks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,23 @@ SELECT _lantern_internal.validate_index('idx', false);

SELECT jobid, query, pg_cron_job_name, job_name, duration IS NOT NULL AS is_done, status, error_message FROM lantern.tasks;
-- NOTE: the test finishes but the async index creation may still be in progress

-- create non superuser and test the function
SET client_min_messages = WARNING;
-- suppress NOTICE: role "test_user" does not exist, skipping
DROP USER IF EXISTS test_user_async;
SET client_min_messages = NOTICE;
CREATE USER test_user_async WITH PASSWORD 'test_password';
GRANT SELECT ON "sift_base1k_UpperCase" TO test_user_async;
GRANT SELECT ON sift_base1k_id_seq TO test_user_async;

SET ROLE test_user_async;

SELECT lantern.async_task($$SELECT 1$$, 'simple job');

SELECT lantern.async_task($$CREATE INDEX idx2 ON "sift_base1k_UpperCase" USING lantern_hnsw (v) WITH (dim=128, M=6);$$, 'Indexing Job');
-- this should fail since test_user does not have permission to drop the table
-- sql line for do not stop on error
SELECT lantern.async_task('DROP TABLE "sift_base1k_UpperCase";', 'Dropping Table Job');
SELECT pg_sleep(4);
SELECT jobid, query, pg_cron_job_name, job_name, duration IS NOT NULL AS is_done, status, error_message FROM lantern.tasks ORDER BY jobid;

0 comments on commit 9e2b535

Please sign in to comment.