From 387f7c1ce96fdab121addcf368f0f6612892dbbe Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Tue, 20 Feb 2024 22:21:19 +0000 Subject: [PATCH] Backoff failing RSS feeds (#890) * Backoff RSS requests if a url repeatedly fails. * Increase max backoff time to a day * Add backoff for failing feeds. * Remove unused finally * Add this.feedLastBackoff * Rewrite in rust. * linting * pop * Optimise backoff function further * Drop only! * fix test * lint * lint further * Better comments * Fix urls calculation * Remove testing URL * Add some variance to speed up while loop * correct comment * Follow the advice and use a VecDeque as it's slightly faster. * Vastly better shuffle method * Speed up checking for previous guids. * fix hasher function * lint * Content doesn't need to be calculated twice. * Slightly more efficient iteration * Improve performance of backoff insertion * Configure feed reader * lint * Ensure appending and removing from the queue works as expected. * Ensure we do keep urls that have been removed. * lint * Inc/dec metrics as queue items are added/deleted. * Add comment * tidy up --- Cargo.lock | 1 + Cargo.toml | 1 + changelog.d/890.misc | 1 + spec/github.spec.ts | 2 +- src/Connections/FeedConnection.ts | 17 ++-- src/Stores/MemoryStorageProvider.ts | 5 +- src/Stores/RedisStorageProvider.ts | 20 ++-- src/Stores/StorageProvider.ts | 6 +- src/feeds/FeedReader.ts | 122 ++++++++++++++--------- src/feeds/parser.rs | 3 +- src/lib.rs | 1 + src/util/mod.rs | 148 ++++++++++++++++++++++++++++ tests/connections/GithubRepoTest.ts | 3 +- 13 files changed, 263 insertions(+), 67 deletions(-) create mode 100644 changelog.d/890.misc create mode 100644 src/util/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 7d8fdad65..e844d1cc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -681,6 +681,7 @@ dependencies = [ "napi", "napi-build", "napi-derive", + "rand", "reqwest", "rgb", "rss", diff --git a/Cargo.toml b/Cargo.toml index 4d75e4089..ab6e33d66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ rss = "2.0" atom_syndication = "0.12" ruma = { version = "0.9", features = ["events", "html"] } reqwest = "0.11" +rand = "0.8.5" [build-dependencies] napi-build = "2" diff --git a/changelog.d/890.misc b/changelog.d/890.misc new file mode 100644 index 000000000..23b8fb1c1 --- /dev/null +++ b/changelog.d/890.misc @@ -0,0 +1 @@ +Failing RSS/atom feeds are now backed off before being retried. This should result in a speedup for large public deployments where failing feeds may result in a slowdown. \ No newline at end of file diff --git a/spec/github.spec.ts b/spec/github.spec.ts index 57a431c5b..132674298 100644 --- a/spec/github.spec.ts +++ b/spec/github.spec.ts @@ -58,7 +58,7 @@ describe('GitHub', () => { return testEnv?.tearDown(); }); - it.only('should be able to handle a GitHub event', async () => { + it('should be able to handle a GitHub event', async () => { const user = testEnv.getUser('user'); const bridgeApi = await getBridgeApi(testEnv.opts.config?.widgets?.publicUrl!, user); const testRoomId = await user.createRoom({ name: 'Test room', invite:[testEnv.botMxid] }); diff --git a/src/Connections/FeedConnection.ts b/src/Connections/FeedConnection.ts index d1aa9b248..26f427f38 100644 --- a/src/Connections/FeedConnection.ts +++ b/src/Connections/FeedConnection.ts @@ -220,15 +220,16 @@ export class FeedConnection extends BaseConnection implements IConnection { // We want to retry these sends, because sometimes the network / HS // craps out. + const content = { + msgtype: 'm.notice', + format: "org.matrix.custom.html", + formatted_body: md.renderInline(message), + body: message, + external_url: entry.link ?? undefined, + "uk.half-shot.matrix-hookshot.feeds.item": entry, + }; await retry( - () => this.intent.sendEvent(this.roomId, { - msgtype: 'm.notice', - format: "org.matrix.custom.html", - formatted_body: md.renderInline(message), - body: message, - external_url: entry.link ?? undefined, - "uk.half-shot.matrix-hookshot.feeds.item": entry, - }), + () => this.intent.sendEvent(this.roomId, content), SEND_EVENT_MAX_ATTEMPTS, SEND_EVENT_INTERVAL_MS, // Filter for showstopper errors like 4XX errors, but otherwise diff --git a/src/Stores/MemoryStorageProvider.ts b/src/Stores/MemoryStorageProvider.ts index 257c1da3c..52b5bf545 100644 --- a/src/Stores/MemoryStorageProvider.ts +++ b/src/Stores/MemoryStorageProvider.ts @@ -35,8 +35,9 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider return this.feedGuids.has(url); } - async hasSeenFeedGuid(url: string, guid: string): Promise { - return this.feedGuids.get(url)?.includes(guid) ?? false; + async hasSeenFeedGuids(url: string, ...guids: string[]): Promise { + const existing = this.feedGuids.get(url); + return existing ? guids.filter((existingGuid) => existing.includes(existingGuid)) : []; } public async setGithubIssue(repo: string, issueNumber: string, data: IssuesGetResponseData, scope = "") { diff --git a/src/Stores/RedisStorageProvider.ts b/src/Stores/RedisStorageProvider.ts index 4f2343ce5..e1dc77433 100644 --- a/src/Stores/RedisStorageProvider.ts +++ b/src/Stores/RedisStorageProvider.ts @@ -29,11 +29,10 @@ const WIDGET_USER_TOKENS = "widgets.user-tokens."; const FEED_GUIDS = "feeds.guids."; - - const log = new Logger("RedisASProvider"); export class RedisStorageContextualProvider implements IStorageProvider { + constructor(protected readonly redis: Redis, protected readonly contextSuffix = '') { } public setSyncToken(token: string|null){ @@ -216,9 +215,9 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme await this.redis.set(key, JSON.stringify(value)); } - public async storeFeedGuids(url: string, ...guid: string[]): Promise { + public async storeFeedGuids(url: string, ...guids: string[]): Promise { const feedKey = `${FEED_GUIDS}${url}`; - await this.redis.lpush(feedKey, ...guid); + await this.redis.lpush(feedKey, ...guids); await this.redis.ltrim(feedKey, 0, MAX_FEED_ITEMS); } @@ -226,7 +225,16 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme return (await this.redis.exists(`${FEED_GUIDS}${url}`)) === 1; } - public async hasSeenFeedGuid(url: string, guid: string): Promise { - return (await this.redis.lpos(`${FEED_GUIDS}${url}`, guid)) != null; + public async hasSeenFeedGuids(url: string, ...guids: string[]): Promise { + let multi = this.redis.multi(); + for (const guid of guids) { + multi = multi.lpos(`${FEED_GUIDS}${url}`, guid); + } + const res = await multi.exec(); + if (res === null) { + // Just assume we've seen none. + return []; + } + return guids.filter((_guid, index) => res[index][1] !== null); } } diff --git a/src/Stores/StorageProvider.ts b/src/Stores/StorageProvider.ts index 73790ff95..50175d75e 100644 --- a/src/Stores/StorageProvider.ts +++ b/src/Stores/StorageProvider.ts @@ -25,7 +25,7 @@ export interface IBridgeStorageProvider extends IAppserviceStorageProvider, ISto setStoredTempFile(key: string, value: string): Promise; getGitlabDiscussionThreads(connectionId: string): Promise; setGitlabDiscussionThreads(connectionId: string, value: SerializedGitlabDiscussionThreads): Promise; - storeFeedGuids(url: string, ...guid: string[]): Promise; - hasSeenFeed(url: string, ...guid: string[]): Promise; - hasSeenFeedGuid(url: string, guid: string): Promise; + storeFeedGuids(url: string, ...guids: string[]): Promise; + hasSeenFeed(url: string): Promise; + hasSeenFeedGuids(url: string, ...guids: string[]): Promise; } \ No newline at end of file diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 46ccadb2c..699598ad6 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -9,8 +9,14 @@ import { randomUUID } from "crypto"; import { readFeed } from "../libRs"; import { IBridgeStorageProvider } from "../Stores/StorageProvider"; import UserAgent from "../UserAgent"; +import { QueueWithBackoff } from "../libRs"; const log = new Logger("FeedReader"); + +const BACKOFF_TIME_MAX_MS = 24 * 60 * 60 * 1000; +const BACKOFF_POW = 1.05; +const BACKOFF_TIME_MS = 5 * 1000; + export class FeedError extends Error { constructor( public url: string, @@ -73,21 +79,11 @@ function normalizeUrl(input: string): string { return url.toString(); } -function shuffle(array: T[]): T[] { - for (let i = array.length - 1; i > 0; i--) { - const j = Math.floor(Math.random() * (i + 1)); - [array[i], array[j]] = [array[j], array[i]]; - } - return array; -} - export class FeedReader { private connections: FeedConnection[]; - // ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version) - private observedFeedUrls: Set = new Set(); - private feedQueue: string[] = []; + private feedQueue = new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS); // A set of last modified times for each url. private cacheTimes: Map = new Map(); @@ -100,11 +96,12 @@ export class FeedReader { private shouldRun = true; private readonly timeouts: (NodeJS.Timeout|undefined)[]; + private readonly feedsToRetain = new Set(); get sleepingInterval() { return ( // Calculate the number of MS to wait in between feeds. - (this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length || 1) + (this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length() || 1) // And multiply by the number of concurrent readers ) * this.config.pollConcurrency; } @@ -120,22 +117,49 @@ export class FeedReader { this.timeouts.fill(undefined); Object.seal(this.timeouts); this.connections = this.connectionManager.getAllConnectionsOfType(FeedConnection); - this.calculateFeedUrls(); - connectionManager.on('new-connection', c => { - if (c instanceof FeedConnection) { - log.debug('New connection tracked:', c.connectionId); - this.connections.push(c); - this.calculateFeedUrls(); + const feeds = this.calculateInitialFeedUrls(); + connectionManager.on('new-connection', newConnection => { + if (!(newConnection instanceof FeedConnection)) { + return; + } + const normalisedUrl = normalizeUrl(newConnection.feedUrl); + if (!feeds.has(normalisedUrl)) { + log.info(`Connection added, adding "${normalisedUrl}" to queue`); + this.feedQueue.push(normalisedUrl); + feeds.add(normalisedUrl); + Metrics.feedsCount.inc(); + Metrics.feedsCountDeprecated.inc(); } }); connectionManager.on('connection-removed', removed => { - if (removed instanceof FeedConnection) { - this.connections = this.connections.filter(c => c.connectionId !== removed.connectionId); - this.calculateFeedUrls(); + if (!(removed instanceof FeedConnection)) { + return; } + let shouldKeepUrl = false; + const normalisedUrl = normalizeUrl(removed.feedUrl); + this.connections = this.connections.filter(c => { + // Cheeky reuse of iteration to determine if we should remove this URL. + if (c.connectionId !== removed.connectionId) { + shouldKeepUrl = shouldKeepUrl || normalizeUrl(c.feedUrl) === normalisedUrl; + return true; + } + return false; + }); + if (shouldKeepUrl) { + log.info(`Connection removed, but not removing "${normalisedUrl}" as it is still in use`); + return; + } + log.info(`Connection removed, removing "${normalisedUrl}" from queue`); + this.feedsToRetain.delete(normalisedUrl); + this.feedQueue.remove(normalisedUrl); + feeds.delete(normalisedUrl); + this.feedsFailingHttp.delete(normalisedUrl); + this.feedsFailingParsing.delete(normalisedUrl); + Metrics.feedsCount.dec(); + Metrics.feedsCountDeprecated.dec(); }); - log.debug('Loaded feed URLs:', this.observedFeedUrls); + log.debug('Loaded feed URLs:', [...feeds].join(', ')); for (let i = 0; i < config.pollConcurrency; i++) { void this.pollFeeds(i); @@ -147,21 +171,24 @@ export class FeedReader { this.timeouts.forEach(t => clearTimeout(t)); } - private calculateFeedUrls(): void { + /** + * Calculate the initial feed set for the reader. Should never + * be called twice. + */ + private calculateInitialFeedUrls(): Set { // just in case we got an invalid URL somehow - const normalizedUrls = []; + const observedFeedUrls = new Set(); for (const conn of this.connections) { try { - normalizedUrls.push(normalizeUrl(conn.feedUrl)); + observedFeedUrls.add(normalizeUrl(conn.feedUrl)); } catch (err: unknown) { log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`); } } - this.observedFeedUrls = new Set(normalizedUrls); - this.feedQueue = shuffle([...this.observedFeedUrls.values()]); - - Metrics.feedsCount.set(this.observedFeedUrls.size); - Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size); + this.feedQueue.populate([...observedFeedUrls]); + Metrics.feedsCount.set(observedFeedUrls.size); + Metrics.feedsCountDeprecated.set(observedFeedUrls.size); + return observedFeedUrls; } /** @@ -173,6 +200,11 @@ export class FeedReader { * @returns A boolean that returns if we saw any changes on the feed since the last poll time. */ public async pollFeed(url: string): Promise { + // If a feed is deleted while it is being polled, we need + // to remember NOT to add it back to the queue. This + // set keeps track of all the feeds that *should* be + // requeued. + this.feedsToRetain.add(url); let seenEntriesChanged = false; const fetchKey = randomUUID(); const { etag, lastModified } = this.cacheTimes.get(url) || {}; @@ -203,22 +235,20 @@ export class FeedReader { if (feed) { // If undefined, we got a not-modified. log.debug(`Found ${feed.items.length} entries in ${url}`); - + const seenItems = await this.storage.hasSeenFeedGuids(url, ...feed.items.filter(item => !!item.hashId).map(item => item.hashId!)) for (const item of feed.items) { // Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage. if (!item.hashId) { log.error(`Could not determine guid for entry in ${url}, skipping`); continue; } - const hashId = `md5:${item.hashId}`; - newGuids.push(hashId); - - if (initialSync) { - log.debug(`Skipping entry ${item.id ?? hashId} since we're performing an initial sync`); + if (seenItems.includes(item.hashId)) { continue; } - if (await this.storage.hasSeenFeedGuid(url, hashId)) { - log.debug('Skipping already seen entry', item.id ?? hashId); + newGuids.push(item.hashId); + + if (initialSync) { + log.debug(`Skipping entry ${item.id ?? item.hashId} since we're performing an initial sync`); continue; } const entry = { @@ -243,12 +273,15 @@ export class FeedReader { if (seenEntriesChanged && newGuids.length) { await this.storage.storeFeedGuids(url, ...newGuids); } - } this.queue.push({ eventName: 'feed.success', sender: 'FeedReader', data: { url } }); // Clear any feed failures this.feedsFailingHttp.delete(url); this.feedsFailingParsing.delete(url); + if (this.feedsToRetain.has(url)) { + // If we've removed this feed since processing it, do not requeue. + this.feedQueue.push(url); + } } catch (err: unknown) { // TODO: Proper Rust Type error. if ((err as Error).message.includes('Failed to fetch feed due to HTTP')) { @@ -256,12 +289,11 @@ export class FeedReader { } else { this.feedsFailingParsing.add(url); } + const backoffDuration = this.feedQueue.backoff(url); const error = err instanceof Error ? err : new Error(`Unknown error ${err}`); const feedError = new FeedError(url.toString(), error, fetchKey); - log.error("Unable to read feed:", feedError.message); + log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`); this.queue.push({ eventName: 'feed.error', sender: 'FeedReader', data: feedError}); - } finally { - this.feedQueue.push(url); } return seenEntriesChanged; } @@ -277,11 +309,11 @@ export class FeedReader { Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size ); Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size); - log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds (worker: ${workerId})`); + log.debug(`Checking for updates in ${this.feedQueue.length()} RSS/Atom feeds (worker: ${workerId})`); const fetchingStarted = Date.now(); - const [ url ] = this.feedQueue.splice(0, 1); + const url = this.feedQueue.pop(); let sleepFor = this.sleepingInterval; if (url) { @@ -298,7 +330,7 @@ export class FeedReader { log.warn(`It took us longer to update the feeds than the configured pool interval`); } } else { - // It may be possible that we have more workers than feeds. This will cause the worker to just sleep. + // It is possible that we have more workers than feeds. This will cause the worker to just sleep. log.debug(`No feeds available to poll for worker ${workerId}`); } diff --git a/src/feeds/parser.rs b/src/feeds/parser.rs index 0dcbc7d16..7d2add886 100644 --- a/src/feeds/parser.rs +++ b/src/feeds/parser.rs @@ -70,7 +70,8 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel { .map(|f| f.value) .or(item.link.clone()) .or(item.title.clone()) - .and_then(|f| hash_id(f).ok()), + .and_then(|f| hash_id(f).ok()) + .map(|f| format!("md5:{}", f)), }) .collect(), } diff --git a/src/lib.rs b/src/lib.rs index 1d03f680a..3a15bd657 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ pub mod feeds; pub mod format_util; pub mod github; pub mod jira; +pub mod util; #[macro_use] extern crate napi_derive; diff --git a/src/util/mod.rs b/src/util/mod.rs new file mode 100644 index 000000000..593b1f51d --- /dev/null +++ b/src/util/mod.rs @@ -0,0 +1,148 @@ +use rand::prelude::*; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::time::{SystemTime, UNIX_EPOCH}; + +const DEFAULT_BACKOFF_TIME_MAX_MS: f64 = 24f64 * 60f64 * 60f64 * 1000f64; +const DEFAULT_BACKOFF_POW: f64 = 1.05f64; +const DEFAULT_BACKOFF_TIME_MS: f64 = 5f64 * 1000f64; + +#[napi] + +pub struct QueueWithBackoff { + queue: VecDeque, + /** + * A map of absolute backoff timestamps mapped to the value. + */ + backoff: BTreeMap, + /** + * The last duration applied when a value was backed off. + */ + last_backoff_duration: HashMap, + backoff_time: f64, + backoff_exponent: f64, + backoff_max: f64, +} + +impl Default for QueueWithBackoff { + fn default() -> Self { + Self::new( + DEFAULT_BACKOFF_TIME_MS, + DEFAULT_BACKOFF_POW, + DEFAULT_BACKOFF_TIME_MAX_MS, + ) + } +} +#[napi] + +impl QueueWithBackoff { + #[napi(constructor)] + pub fn new(backoff_time: f64, backoff_exponent: f64, backoff_max: f64) -> Self { + QueueWithBackoff { + queue: VecDeque::new(), + backoff: BTreeMap::new(), + last_backoff_duration: HashMap::new(), + backoff_time, + backoff_exponent, + backoff_max, + } + } + + #[napi] + pub fn pop(&mut self) -> Option { + let start = SystemTime::now(); + let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64; + + // We only need to check this once, as we won't be adding to the backoff queue + // as often as we pull from it. + if let Some(item) = self.backoff.first_entry() { + if *item.key() < since_the_epoch { + let v = item.remove(); + self.queue.push_back(v); + } + } + + self.queue.pop_front() + } + + #[napi] + pub fn remove(&mut self, item: String) -> bool { + // Remove from the queue + if let Ok(index) = self.queue.binary_search(&item) { + self.queue.remove(index); + return true; + } else { + // We didn't find the key queued, so let's ensure we delete it + // from any backoff. + // This is *expensive* but hopefully called rarely. + let mut found_key: u64 = 0; + for (key, value) in self.backoff.iter() { + if *value == item { + found_key = *key; + } + } + if found_key != 0 { + self.backoff.remove(&found_key); + return true; + } + } + // Always remove the duration on removal. + self.last_backoff_duration.remove(&item); + false + } + + #[napi] + pub fn push(&mut self, item: String) { + self.last_backoff_duration.remove(&item); + self.queue.push_back(item); + } + + #[napi] + pub fn backoff(&mut self, item: String) -> u32 { + let last_backoff = (*self.last_backoff_duration.get(&item).unwrap_or(&0)) as f64; + + let mut rng = rand::thread_rng(); + let y: f64 = rng.gen::() + 0.5f64; // generates a float between 0.5 and 1.1 + + let backoff_duration = ((y * self.backoff_time) + last_backoff.powf(self.backoff_exponent)) + .min(self.backoff_max) as u32; + let backoff_item = item.clone(); + self.last_backoff_duration.insert(item, backoff_duration); + + let start = SystemTime::now(); + let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap(); + + let mut time = since_the_epoch.as_millis() as u64 + backoff_duration as u64; + + // If the backoff queue contains this time (likely) + // then we want to increase the backoff time slightly + // to allow for it. + let incr: f64 = (rng.gen::() * 2f64) + 2f64; + while self.backoff.contains_key(&time) { + time += (incr * self.backoff_time) as u64; + } + + self.backoff.insert(time, backoff_item); + backoff_duration + } + + #[napi] + pub fn length(&self) -> u32 { + self.queue.len() as u32 + } + + fn shuffle(&mut self) { + let mut rng = rand::thread_rng(); + self.queue.make_contiguous().shuffle(&mut rng); + } + + #[napi] + pub fn populate(&mut self, values: Vec) { + // This assumes an empty queue. + for v in values { + self.queue.push_back(v); + } + self.shuffle(); + } +} diff --git a/tests/connections/GithubRepoTest.ts b/tests/connections/GithubRepoTest.ts index 85af40c62..e686f2a5c 100644 --- a/tests/connections/GithubRepoTest.ts +++ b/tests/connections/GithubRepoTest.ts @@ -23,6 +23,7 @@ const GITHUB_ISSUE = { }, html_url: `https://github.com/${GITHUB_ORG_REPO.org}/${GITHUB_ORG_REPO.repo}/issues/1234`, title: "My issue", + assignees: [] }; const GITHUB_ISSUE_CREATED_PAYLOAD = { @@ -137,7 +138,7 @@ describe("GitHubRepoConnection", () => { intent.expectEventBodyContains(GITHUB_ISSUE_CREATED_PAYLOAD.issue.html_url, 0); intent.expectEventBodyContains(GITHUB_ISSUE_CREATED_PAYLOAD.issue.title, 0); }); - it.only("will handle assignees on issue creation", async () => { + it("will handle assignees on issue creation", async () => { const { connection, intent } = createConnection(); await connection.onIssueCreated({ ...GITHUB_ISSUE_CREATED_PAYLOAD,