Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BFD-3566: Add materialized view for tracking the current beneficiary for an xref group #2398

Draft
wants to merge 10 commits into
base: BFD-3550
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
-- This view tracks the current version of a beneficiary tied to each xref group.
-- This process is inexact due to the nature of the data, but we make our best guess based on the information we have available.

-- Our best guess is stored in a field called xref_rank which is defined as follows:
-- If we have an MBI and the xref_sw is set to 'N' (meaning this record is NOT merged into another), assign a rank of 1 (1st place)
-- If we have an MBI and the xref_sw is set to 'Y' (meaning this record IS merged into another), assign a rank of 2 (2nd place)
-- Otherwise, assign a rank of 3 (last place). This case should only be hit if the MBI is null
-- because each bene with an xref id will have an xref_sw value of 'Y' or 'N'

-- For each xref group, we take the record with the lowest rank
-- If there's a tie, we take the record with the highest bene_id

CREATE MATERIALIZED VIEW IF NOT EXISTS ccw.current_beneficiaries AS (
WITH ranked_benes AS (
SELECT
bene_id,
xref_grp_id,
CASE
WHEN mbi_num IS NOT NULL
AND xref_sw = 'N'
THEN 1
WHEN mbi_num IS NOT NULL
AND xref_sw = 'Y'
THEN 2
ELSE 3
END AS xref_rank
FROM ccw.beneficiaries
WHERE xref_grp_id IS NOT NULL
)
SELECT DISTINCT ON (xref_grp_id)
bene_id,
xref_grp_id
FROM ranked_benes
ORDER BY xref_grp_id,
xref_rank,
bene_id DESC
);

CREATE UNIQUE INDEX IF NOT EXISTS current_beneficiaries_bene_id_idx ON ccw.current_beneficiaries(bene_id);

-- Care must be taken to create "security definer" functions safely
-- see https://www.postgresql.org/docs/current/sql-createfunction.html
CREATE OR REPLACE FUNCTION ccw.refresh_current_beneficiaries()
RETURNS VOID AS $$
DECLARE comment_sql TEXT;
BEGIN
-- Using "concurrently" will make the refresh slower, but it will not block any reads
-- on the view while the refresh is in progress
REFRESH MATERIALIZED VIEW CONCURRENTLY ccw.current_beneficiaries;
-- There's no implicit way to know when a materialized view was last updated
-- add a comment on the object in case we need to verify that it's being updated as expected
comment_sql := 'COMMENT ON MATERIALIZED VIEW ccw.current_beneficiaries is '
|| quote_literal('{"last_refreshed": "' || now() || '"}');
EXECUTE comment_sql;
END;
$$
LANGUAGE plpgsql
-- Only the owner of the view may refresh it, we need to set "security definer" so the function
-- can execute in the context of the creator
SECURITY DEFINER;
-- search_path is the order in which schemas are searched when a name is referenced with no schema specified
-- Postgres recommends setting this on functions marked as "security definer" to prevent malicious users from
-- creating an object that shadows an existing one on a globally writable schema
ALTER FUNCTION ccw.refresh_current_beneficiaries() SET search_path = ccw;
-- Execute privilege is granted to PUBLIC by default
REVOKE ALL ON FUNCTION ccw.refresh_current_beneficiaries() FROM PUBLIC;
-- This only needs to be executed by the pipeline
GRANT EXECUTE ON FUNCTION ccw.refresh_current_beneficiaries() TO api_pipeline_svcs;
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public final class RifLoader {
*/
private static final Duration MAX_INTERRUPTED_WAIT_TIME = Duration.ofMinutes(5);

/** Query for refreshing the current beneficiaries materialized view. */
private static final String REFRESH_CURRENT_BENEFICIARIES_VIEW_SQL =
"SELECT ccw.refresh_current_beneficiaries()";

/**
* Constructs a new {@link RifLoader} instance.
*
Expand Down Expand Up @@ -235,7 +239,13 @@ public Flux<RifRecordLoadResult> processAsync(
// Mark active record as complete so progress can be updated.
.doOnNext(result -> progressTracker.recordComplete(result.getRecordNumber()))
// Update progress with final result when all records have been processed
.doOnComplete(() -> progressTracker.writeProgress())
.doOnComplete(
() -> {
progressTracker.writeProgress();
if (fileType == RifFileType.BENEFICIARY) {
meliGuzman marked this conversation as resolved.
Show resolved Hide resolved
refreshCurrentBeneficiariesView();
}
})
// clean up when the flux terminates (either by error or completion)
.doFinally(
ignored -> {
Expand All @@ -251,6 +261,21 @@ public Flux<RifRecordLoadResult> processAsync(
});
}

/** Refreshes the current beneficiaries materialized view. */
private void refreshCurrentBeneficiariesView() {
try (EntityManager entityManager = appState.getEntityManagerFactory().createEntityManager()) {
try (final Timer.Context timerRefreshCurrentBeneficiaries =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a filter to prevent these metrics from being published to CloudWatch if we don't need to create alarms off of them so we don't incur the additional costs. We can see this data in New Relic instead (unit here is nanoseconds so the value here is 0.28 seconds)
image

appState
.getMetrics()
.timer(
MetricRegistry.name(
getClass().getSimpleName(), "refreshCurrentBeneficiariesView"))
.time()) {
entityManager.createNativeQuery(REFRESH_CURRENT_BENEFICIARIES_VIEW_SQL).getResultList();
}
}
}

/**
* Creates a {@link Flux} that, when subscribed to in a scheduler, processes a batch of records
* and publishes the result for each record.
Expand Down Expand Up @@ -443,7 +468,7 @@ private void tweakIfBeneficiary(
Optional<Beneficiary> oldBeneficiaryRecord = Optional.empty();

/*
* Grab the the previous/current version of the Beneficiary (if any, as it exists in the
* Grab the previous/current version of the Beneficiary (if any, as it exists in the
* database before applying the specified RifRecordEvent).
*/
if (rifRecordEvent.getRecordAction() == RecordAction.UPDATE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,14 @@ public void loadSamplesWithXref() {
assertEquals(beneficiaryFromDb2.getXrefSwitch().get(), 'Y');
assertEquals(beneficiaryFromDb2.getXrefGroupId().get(), 1);

// Ensure the current beneficiaries view has been refreshed
Long currentBenes =
(Long)
entityManager
.createNativeQuery("SELECT count(*) from ccw.current_beneficiaries")
.getSingleResult();
assertEquals(currentBenes, 2);

} finally {
if (entityManager != null) {
entityManager.close();
Expand Down