Skip to content

Commit

Permalink
Merge pull request #365 from Uniswap/zachyang/proto-361-update-rfq-en…
Browse files Browse the repository at this point in the history
…dpoint-to-let-fillers-know-theyve-hit-circuit

feat: notify blocked fillers of circuit breaker status
  • Loading branch information
ConjunctiveNormalForm authored Aug 26, 2024
2 parents 34c1fed + b213eb3 commit 842d1f5
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 265 deletions.
35 changes: 26 additions & 9 deletions lib/providers/circuit-breaker/dynamo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient } from '@aws-sdk/lib-dynamodb';
import Logger from 'bunyan';

import { CircuitBreakerConfigurationProvider } from '.';
import { CircuitBreakerConfigurationProvider, EndpointStatuses } from '.';
import { BaseTimestampRepository, FillerTimestampMap, TimestampRepository } from '../../repositories';
import { WebhookConfiguration } from '../webhook';

Expand Down Expand Up @@ -47,8 +47,9 @@ export class DynamoCircuitBreakerConfigurationProvider implements CircuitBreaker
this.timestamps = await this.timestampDB.getFillerTimestampsMap(this.fillerEndpoints);
}

/* add filler if it's not blocked until a future timestamp */
async getEligibleEndpoints(endpoints: WebhookConfiguration[]): Promise<WebhookConfiguration[]> {
/* add filler to `enabled` array if it's not blocked until a future timestamp
add disabled fillers and the `blockUntilTimestamp`s to disabled array */
async getEndpointStatuses(endpoints: WebhookConfiguration[]): Promise<EndpointStatuses> {
try {
const now = Math.floor(Date.now() / 1000);
const fillerTimestamps = await this.getConfigurations();
Expand All @@ -57,20 +58,36 @@ export class DynamoCircuitBreakerConfigurationProvider implements CircuitBreaker
const enabledEndpoints = endpoints.filter((e) => {
return !(fillerTimestamps.has(e.endpoint) && fillerTimestamps.get(e.endpoint)!.blockUntilTimestamp > now);
});
const disabledEndpoints = endpoints.filter((e) => {
return fillerTimestamps.has(e.endpoint) && fillerTimestamps.get(e.endpoint)!.blockUntilTimestamp > now;
});
const disabledEndpoints = endpoints
.filter((e) => {
return fillerTimestamps.has(e.endpoint) && fillerTimestamps.get(e.endpoint)!.blockUntilTimestamp > now;
})
.map((e) => {
return {
webhook: e,
blockUntil: fillerTimestamps.get(e.endpoint)!.blockUntilTimestamp,
};
});

this.log.info({ num: enabledEndpoints.length, endpoints: enabledEndpoints }, `Endpoints enabled`);
this.log.info({ num: disabledEndpoints.length, endpoints: disabledEndpoints }, `Endpoints disabled`);

return enabledEndpoints;
return {
enabled: enabledEndpoints,
disabled: disabledEndpoints,
};
}

return endpoints;
return {
enabled: endpoints,
disabled: [],
};
} catch (e) {
this.log.error({ error: e }, `Error getting eligible endpoints, default to returning all`);
return endpoints;
return {
enabled: endpoints,
disabled: [],
};
}
}
}
10 changes: 9 additions & 1 deletion lib/providers/circuit-breaker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@ export interface CircuitBreakerConfiguration {
enabled: boolean;
}

export interface EndpointStatuses {
enabled: WebhookConfiguration[];
disabled: {
webhook: WebhookConfiguration;
blockUntil: number;
}[];
}

export interface CircuitBreakerConfigurationProvider {
allow_list?: Set<string>;
getConfigurations(): Promise<CircuitBreakerConfiguration[] | FillerTimestampMap>;
getEligibleEndpoints(endpoints: WebhookConfiguration[]): Promise<WebhookConfiguration[]>;
getEndpointStatuses(endpoints: WebhookConfiguration[]): Promise<EndpointStatuses>;
}
52 changes: 21 additions & 31 deletions lib/providers/circuit-breaker/mock.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CircuitBreakerConfiguration, CircuitBreakerConfigurationProvider } from '.';
import { CircuitBreakerConfigurationProvider, EndpointStatuses } from '.';
import { FillerTimestampMap } from '../../repositories';
import { WebhookConfiguration } from '../webhook';

Expand All @@ -9,42 +9,32 @@ export class MockV2CircuitBreakerConfigurationProvider implements CircuitBreaker
return this.timestamps;
}

async getEligibleEndpoints(endpoints: WebhookConfiguration[]): Promise<WebhookConfiguration[]> {
async getEndpointStatuses(endpoints: WebhookConfiguration[]): Promise<EndpointStatuses> {
const now = Math.floor(Date.now() / 1000);
const fillerTimestamps = await this.getConfigurations();
if (fillerTimestamps.size) {
const enabledEndpoints = endpoints.filter((e) => {
return !(fillerTimestamps.has(e.endpoint) && fillerTimestamps.get(e.endpoint)!.blockUntilTimestamp > now);
});
return enabledEndpoints;
}
return endpoints;
}
}
const disabledEndpoints = endpoints
.filter((e) => {
return fillerTimestamps.has(e.endpoint) && fillerTimestamps.get(e.endpoint)!.blockUntilTimestamp > now;
})
.map((e) => {
return {
webhook: e,
blockUntil: fillerTimestamps.get(e.endpoint)!.blockUntilTimestamp,
};
});

export class MockCircuitBreakerConfigurationProvider implements CircuitBreakerConfigurationProvider {
allow_list: Set<string>;

constructor(private config: CircuitBreakerConfiguration[], _allow_list: Set<string> = new Set<string>([])) {
this.allow_list = _allow_list;
}

async getConfigurations(): Promise<CircuitBreakerConfiguration[]> {
return this.config;
}

async getEligibleEndpoints(endpoints: WebhookConfiguration[]): Promise<WebhookConfiguration[]> {
const fillerToConfigMap = new Map(this.config.map((c) => [c.hash, c]));
const enabledEndpoints: WebhookConfiguration[] = [];
endpoints.forEach((e) => {
if (
this.allow_list.has(e.hash) ||
(fillerToConfigMap.has(e.hash) && fillerToConfigMap.get(e.hash)?.enabled) ||
!fillerToConfigMap.has(e.hash) // default to allowing fillers not in the config
) {
enabledEndpoints.push(e);
}
});
return enabledEndpoints;
return {
enabled: enabledEndpoints,
disabled: disabledEndpoints,
};
}
return {
enabled: endpoints,
disabled: [],
};
}
}
42 changes: 33 additions & 9 deletions lib/quoters/WebhookQuoter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
} from '../entities';
import { ProtocolVersion, WebhookConfiguration, WebhookConfigurationProvider } from '../providers';
import { FirehoseLogger } from '../providers/analytics';
import { CircuitBreakerConfigurationProvider } from '../providers/circuit-breaker';
import { CircuitBreakerConfigurationProvider, EndpointStatuses } from '../providers/circuit-breaker';
import { FillerComplianceConfigurationProvider } from '../providers/compliance';
import { FillerAddressRepository } from '../repositories/filler-address-repository';
import { timestampInMstoISOString } from '../util/time';
Expand All @@ -36,33 +36,42 @@ export class WebhookQuoter implements Quoter {
private webhookProvider: WebhookConfigurationProvider,
private circuitBreakerProvider: CircuitBreakerConfigurationProvider,
private complianceProvider: FillerComplianceConfigurationProvider,
private repository: FillerAddressRepository,
_allow_list: Set<string> = new Set<string>(['22a23abb38e0612e58ebdd15756b18110e6aac078645210afe0c60f8220307b0'])
private repository: FillerAddressRepository
) {
this.log = _log.child({ quoter: 'WebhookQuoter' });
}

public async quote(request: QuoteRequest): Promise<QuoteResponse[]> {
let endpoints = await this.getEligibleEndpoints();
const statuses = await this.getEndpointStatuses();
const endpointToAddrsMap = await this.complianceProvider.getEndpointToExcludedAddrsMap();
endpoints = endpoints.filter(
const enabledEndpoints = statuses.enabled.filter(
(e) =>
passFillerCompliance(e, endpointToAddrsMap, request.swapper) &&
getEndpointSupportedProtocols(e).includes(request.protocol)
);

this.log.info({ endpoints }, `Fetching quotes from ${endpoints.length} endpoints`);
const quotes = await Promise.all(endpoints.map((e) => this.fetchQuote(e, request)));
const disabledEndpoints = statuses.disabled;

this.log.info(
{ enabled: enabledEndpoints, disabled: disabledEndpoints },
`Fetching quotes from ${enabledEndpoints.length} endpoints and notifying disabled endpoints`
);

const quotes = await Promise.all(enabledEndpoints.map((e) => this.fetchQuote(e, request)));

// should not await and block
Promise.all(disabledEndpoints.map((e) => this.notifyBlock(e)));

return quotes.filter((q) => q !== null) as QuoteResponse[];
}

public type(): QuoterType {
return QuoterType.RFQ;
}

private async getEligibleEndpoints(): Promise<WebhookConfiguration[]> {
private async getEndpointStatuses(): Promise<EndpointStatuses> {
const endpoints = await this.webhookProvider.getEndpoints();
return this.circuitBreakerProvider.getEligibleEndpoints(endpoints);
return this.circuitBreakerProvider.getEndpointStatuses(endpoints);
}

private async fetchQuote(config: WebhookConfiguration, request: QuoteRequest): Promise<QuoteResponse | null> {
Expand Down Expand Up @@ -299,6 +308,21 @@ export class WebhookQuoter implements Quoter {
return null;
}
}

private async notifyBlock(status: { webhook: WebhookConfiguration; blockUntil: number }): Promise<void> {
const timeoutOverride = status.webhook.overrides?.timeout;
const axiosConfig = {
timeout: timeoutOverride ? Number(timeoutOverride) : WEBHOOK_TIMEOUT_MS,
...(!!status.webhook.headers && { headers: status.webhook.headers }),
};
axios.post(
status.webhook.endpoint,
{
blockUntilTimestamp: status.blockUntil,
},
axiosConfig
);
}
}

// returns true if the given hook response is an explicit non-quote
Expand Down
15 changes: 15 additions & 0 deletions test/fixtures.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { MockV2CircuitBreakerConfigurationProvider } from '../lib/providers/circuit-breaker/mock';

const now = Math.floor(Date.now() / 1000);

export const WEBHOOK_URL = 'https://uniswap.org';
export const WEBHOOK_URL_ONEINCH = 'https://1inch.io';
export const WEBHOOK_URL_SEARCHER = 'https://searcher.com';

export const MOCK_V2_CB_PROVIDER = new MockV2CircuitBreakerConfigurationProvider(
[WEBHOOK_URL, WEBHOOK_URL_ONEINCH, WEBHOOK_URL_SEARCHER],
new Map([
[WEBHOOK_URL_ONEINCH, { blockUntilTimestamp: now + 100000, lastPostTimestamp: now - 10, consecutiveBlocks: 0 }],
[WEBHOOK_URL_SEARCHER, { blockUntilTimestamp: now - 10, lastPostTimestamp: now - 100, consecutiveBlocks: NaN }],
])
);
42 changes: 9 additions & 33 deletions test/handlers/quote/handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import {
import { QuoteHandler } from '../../../lib/handlers/quote/handler';
import { MockWebhookConfigurationProvider, ProtocolVersion } from '../../../lib/providers';
import { FirehoseLogger } from '../../../lib/providers/analytics';
import { MockCircuitBreakerConfigurationProvider } from '../../../lib/providers/circuit-breaker/mock';
import { MockFillerComplianceConfigurationProvider } from '../../../lib/providers/compliance';
import { MockQuoter, MOCK_FILLER_ADDRESS, Quoter, WebhookQuoter } from '../../../lib/quoters';
import { MockFillerAddressRepository } from '../../../lib/repositories/filler-address-repository';
import { MOCK_V2_CB_PROVIDER } from '../../fixtures';

jest.mock('axios');
const mockedAxios = axios as jest.Mocked<typeof axios>;
Expand Down Expand Up @@ -237,15 +237,12 @@ describe('Quote handler', () => {
{ endpoint: 'https://uniswap.org', headers: {}, name: 'uniswap', hash: '0xuni' },
]);

const circuitBreakerProvider = new MockCircuitBreakerConfigurationProvider([
{ fadeRate: 0.02, enabled: true, hash: '0xuni' },
]);
const quoters = [
new WebhookQuoter(
logger,
mockFirehoseLogger,
webhookProvider,
circuitBreakerProvider,
MOCK_V2_CB_PROVIDER,
emptyMockComplianceProvider,
repository
),
Expand Down Expand Up @@ -312,15 +309,12 @@ describe('Quote handler', () => {
hash: '0xuni',
},
]);
const circuitBreakerProvider = new MockCircuitBreakerConfigurationProvider([
{ hash: '0xuni', fadeRate: 0.02, enabled: true },
]);
const quoters = [
new WebhookQuoter(
logger,
mockFirehoseLogger,
webhookProvider,
circuitBreakerProvider,
MOCK_V2_CB_PROVIDER,
emptyMockComplianceProvider,
repository
),
Expand Down Expand Up @@ -369,15 +363,12 @@ describe('Quote handler', () => {
const webhookProvider = new MockWebhookConfigurationProvider([
{ name: 'uniswap', endpoint: 'https://uniswap.org', headers: {}, hash: '0xuni' },
]);
const circuitBreakerProvider = new MockCircuitBreakerConfigurationProvider([
{ hash: '0xuni', fadeRate: 0.02, enabled: true },
]);
const quoters = [
new WebhookQuoter(
logger,
mockFirehoseLogger,
webhookProvider,
circuitBreakerProvider,
MOCK_V2_CB_PROVIDER,
emptyMockComplianceProvider,
repository
),
Expand All @@ -404,15 +395,12 @@ describe('Quote handler', () => {
const webhookProvider = new MockWebhookConfigurationProvider([
{ name: 'uniswap', endpoint: 'https://uniswap.org', headers: {}, hash: '0xuni' },
]);
const circuitBreakerProvider = new MockCircuitBreakerConfigurationProvider([
{ hash: '0xuni', fadeRate: 0.02, enabled: true },
]);
const quoters = [
new WebhookQuoter(
logger,
mockFirehoseLogger,
webhookProvider,
circuitBreakerProvider,
MOCK_V2_CB_PROVIDER,
emptyMockComplianceProvider,
repository
),
Expand Down Expand Up @@ -440,15 +428,12 @@ describe('Quote handler', () => {
const webhookProvider = new MockWebhookConfigurationProvider([
{ name: 'uniswap', endpoint: 'https://uniswap.org', headers: {}, hash: '0xuni' },
]);
const circuitBreakerProvider = new MockCircuitBreakerConfigurationProvider([
{ hash: '0xuni', fadeRate: 0.02, enabled: true },
]);
const quoters = [
new WebhookQuoter(
logger,
mockFirehoseLogger,
webhookProvider,
circuitBreakerProvider,
MOCK_V2_CB_PROVIDER,
emptyMockComplianceProvider,
repository
),
Expand Down Expand Up @@ -479,15 +464,12 @@ describe('Quote handler', () => {
const webhookProvider = new MockWebhookConfigurationProvider([
{ name: 'uniswap', endpoint: 'https://uniswap.org', headers: {}, hash: '0xuni' },
]);
const circuitBreakerProvider = new MockCircuitBreakerConfigurationProvider([
{ hash: '0xuni', fadeRate: 0.02, enabled: true },
]);
const quoters = [
new WebhookQuoter(
logger,
mockFirehoseLogger,
webhookProvider,
circuitBreakerProvider,
MOCK_V2_CB_PROVIDER,
emptyMockComplianceProvider,
repository
),
Expand Down Expand Up @@ -542,15 +524,12 @@ describe('Quote handler', () => {
const webhookProvider = new MockWebhookConfigurationProvider([
{ name: 'uniswap', endpoint: 'https://uniswap.org', headers: {}, hash: '0xuni' },
]);
const circuitBreakerProvider = new MockCircuitBreakerConfigurationProvider([
{ hash: '0xuni', fadeRate: 0.02, enabled: true },
]);
const quoters = [
new WebhookQuoter(
logger,
mockFirehoseLogger,
webhookProvider,
circuitBreakerProvider,
MOCK_V2_CB_PROVIDER,
emptyMockComplianceProvider,
repository
),
Expand Down Expand Up @@ -590,15 +569,12 @@ describe('Quote handler', () => {
const webhookProvider = new MockWebhookConfigurationProvider([
{ name: 'uniswap', endpoint: 'https://uniswap.org', headers: {}, hash: '0xuni' },
]);
const circuitBreakerProvider = new MockCircuitBreakerConfigurationProvider([
{ hash: '0xuni', fadeRate: 0.02, enabled: true },
]);
const quoters = [
new WebhookQuoter(
logger,
mockFirehoseLogger,
webhookProvider,
circuitBreakerProvider,
MOCK_V2_CB_PROVIDER,
mockComplianceProvider,
repository
),
Expand Down
Loading

0 comments on commit 842d1f5

Please sign in to comment.