Skip to content

Commit

Permalink
Backoff failing RSS feeds (#890)
Browse files Browse the repository at this point in the history
* Backoff RSS requests if a url repeatedly fails.

* Increase max backoff time to a day

* Add backoff for failing feeds.

* Remove unused finally

* Add this.feedLastBackoff

* Rewrite in rust.

* linting

* pop

* Optimise backoff function further

* Drop only!

* fix test

* lint

* lint further

* Better comments

* Fix urls calculation

* Remove testing URL

* Add some variance to speed up while loop

* correct comment

* Follow the advice and use a VecDeque as it's slightly faster.

* Vastly better shuffle method

* Speed up checking for previous guids.

* fix hasher function

* lint

* Content doesn't need to be calculated twice.

* Slightly more efficient iteration

* Improve performance of backoff insertion

* Configure feed reader

* lint

* Ensure appending and removing from the queue works as expected.

* Ensure we do keep urls that have been removed.

* lint

* Inc/dec metrics as queue items are added/deleted.

* Add comment

* tidy up
  • Loading branch information
Half-Shot authored Feb 20, 2024
1 parent 3ff87b7 commit 387f7c1
Show file tree
Hide file tree
Showing 13 changed files with 263 additions and 67 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ rss = "2.0"
atom_syndication = "0.12"
ruma = { version = "0.9", features = ["events", "html"] }
reqwest = "0.11"
rand = "0.8.5"

[build-dependencies]
napi-build = "2"
1 change: 1 addition & 0 deletions changelog.d/890.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Failing RSS/atom feeds are now backed off before being retried. This should result in a speedup for large public deployments where failing feeds may result in a slowdown.
2 changes: 1 addition & 1 deletion spec/github.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ describe('GitHub', () => {
return testEnv?.tearDown();
});

it.only('should be able to handle a GitHub event', async () => {
it('should be able to handle a GitHub event', async () => {
const user = testEnv.getUser('user');
const bridgeApi = await getBridgeApi(testEnv.opts.config?.widgets?.publicUrl!, user);
const testRoomId = await user.createRoom({ name: 'Test room', invite:[testEnv.botMxid] });
Expand Down
17 changes: 9 additions & 8 deletions src/Connections/FeedConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,16 @@ export class FeedConnection extends BaseConnection implements IConnection {

// We want to retry these sends, because sometimes the network / HS
// craps out.
const content = {
msgtype: 'm.notice',
format: "org.matrix.custom.html",
formatted_body: md.renderInline(message),
body: message,
external_url: entry.link ?? undefined,
"uk.half-shot.matrix-hookshot.feeds.item": entry,
};
await retry(
() => this.intent.sendEvent(this.roomId, {
msgtype: 'm.notice',
format: "org.matrix.custom.html",
formatted_body: md.renderInline(message),
body: message,
external_url: entry.link ?? undefined,
"uk.half-shot.matrix-hookshot.feeds.item": entry,
}),
() => this.intent.sendEvent(this.roomId, content),
SEND_EVENT_MAX_ATTEMPTS,
SEND_EVENT_INTERVAL_MS,
// Filter for showstopper errors like 4XX errors, but otherwise
Expand Down
5 changes: 3 additions & 2 deletions src/Stores/MemoryStorageProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider
return this.feedGuids.has(url);
}

async hasSeenFeedGuid(url: string, guid: string): Promise<boolean> {
return this.feedGuids.get(url)?.includes(guid) ?? false;
async hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]> {
const existing = this.feedGuids.get(url);
return existing ? guids.filter((existingGuid) => existing.includes(existingGuid)) : [];
}

public async setGithubIssue(repo: string, issueNumber: string, data: IssuesGetResponseData, scope = "") {
Expand Down
20 changes: 14 additions & 6 deletions src/Stores/RedisStorageProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ const WIDGET_USER_TOKENS = "widgets.user-tokens.";

const FEED_GUIDS = "feeds.guids.";



const log = new Logger("RedisASProvider");

export class RedisStorageContextualProvider implements IStorageProvider {

constructor(protected readonly redis: Redis, protected readonly contextSuffix = '') { }

public setSyncToken(token: string|null){
Expand Down Expand Up @@ -216,17 +215,26 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme
await this.redis.set(key, JSON.stringify(value));
}

public async storeFeedGuids(url: string, ...guid: string[]): Promise<void> {
public async storeFeedGuids(url: string, ...guids: string[]): Promise<void> {
const feedKey = `${FEED_GUIDS}${url}`;
await this.redis.lpush(feedKey, ...guid);
await this.redis.lpush(feedKey, ...guids);
await this.redis.ltrim(feedKey, 0, MAX_FEED_ITEMS);
}

public async hasSeenFeed(url: string): Promise<boolean> {
return (await this.redis.exists(`${FEED_GUIDS}${url}`)) === 1;
}

public async hasSeenFeedGuid(url: string, guid: string): Promise<boolean> {
return (await this.redis.lpos(`${FEED_GUIDS}${url}`, guid)) != null;
public async hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]> {
let multi = this.redis.multi();
for (const guid of guids) {
multi = multi.lpos(`${FEED_GUIDS}${url}`, guid);
}
const res = await multi.exec();
if (res === null) {
// Just assume we've seen none.
return [];
}
return guids.filter((_guid, index) => res[index][1] !== null);
}
}
6 changes: 3 additions & 3 deletions src/Stores/StorageProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export interface IBridgeStorageProvider extends IAppserviceStorageProvider, ISto
setStoredTempFile(key: string, value: string): Promise<void>;
getGitlabDiscussionThreads(connectionId: string): Promise<SerializedGitlabDiscussionThreads>;
setGitlabDiscussionThreads(connectionId: string, value: SerializedGitlabDiscussionThreads): Promise<void>;
storeFeedGuids(url: string, ...guid: string[]): Promise<void>;
hasSeenFeed(url: string, ...guid: string[]): Promise<boolean>;
hasSeenFeedGuid(url: string, guid: string): Promise<boolean>;
storeFeedGuids(url: string, ...guids: string[]): Promise<void>;
hasSeenFeed(url: string): Promise<boolean>;
hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]>;
}
122 changes: 77 additions & 45 deletions src/feeds/FeedReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@ import { randomUUID } from "crypto";
import { readFeed } from "../libRs";
import { IBridgeStorageProvider } from "../Stores/StorageProvider";
import UserAgent from "../UserAgent";
import { QueueWithBackoff } from "../libRs";

const log = new Logger("FeedReader");

const BACKOFF_TIME_MAX_MS = 24 * 60 * 60 * 1000;
const BACKOFF_POW = 1.05;
const BACKOFF_TIME_MS = 5 * 1000;

export class FeedError extends Error {
constructor(
public url: string,
Expand Down Expand Up @@ -73,21 +79,11 @@ function normalizeUrl(input: string): string {
return url.toString();
}

function shuffle<T>(array: T[]): T[] {
for (let i = array.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[array[i], array[j]] = [array[j], array[i]];
}
return array;
}

export class FeedReader {

private connections: FeedConnection[];
// ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version)
private observedFeedUrls: Set<string> = new Set();

private feedQueue: string[] = [];
private feedQueue = new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS);

// A set of last modified times for each url.
private cacheTimes: Map<string, { etag?: string, lastModified?: string}> = new Map();
Expand All @@ -100,11 +96,12 @@ export class FeedReader {

private shouldRun = true;
private readonly timeouts: (NodeJS.Timeout|undefined)[];
private readonly feedsToRetain = new Set();

get sleepingInterval() {
return (
// Calculate the number of MS to wait in between feeds.
(this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length || 1)
(this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length() || 1)
// And multiply by the number of concurrent readers
) * this.config.pollConcurrency;
}
Expand All @@ -120,22 +117,49 @@ export class FeedReader {
this.timeouts.fill(undefined);
Object.seal(this.timeouts);
this.connections = this.connectionManager.getAllConnectionsOfType(FeedConnection);
this.calculateFeedUrls();
connectionManager.on('new-connection', c => {
if (c instanceof FeedConnection) {
log.debug('New connection tracked:', c.connectionId);
this.connections.push(c);
this.calculateFeedUrls();
const feeds = this.calculateInitialFeedUrls();
connectionManager.on('new-connection', newConnection => {
if (!(newConnection instanceof FeedConnection)) {
return;
}
const normalisedUrl = normalizeUrl(newConnection.feedUrl);
if (!feeds.has(normalisedUrl)) {
log.info(`Connection added, adding "${normalisedUrl}" to queue`);
this.feedQueue.push(normalisedUrl);
feeds.add(normalisedUrl);
Metrics.feedsCount.inc();
Metrics.feedsCountDeprecated.inc();
}
});
connectionManager.on('connection-removed', removed => {
if (removed instanceof FeedConnection) {
this.connections = this.connections.filter(c => c.connectionId !== removed.connectionId);
this.calculateFeedUrls();
if (!(removed instanceof FeedConnection)) {
return;
}
let shouldKeepUrl = false;
const normalisedUrl = normalizeUrl(removed.feedUrl);
this.connections = this.connections.filter(c => {
// Cheeky reuse of iteration to determine if we should remove this URL.
if (c.connectionId !== removed.connectionId) {
shouldKeepUrl = shouldKeepUrl || normalizeUrl(c.feedUrl) === normalisedUrl;
return true;
}
return false;
});
if (shouldKeepUrl) {
log.info(`Connection removed, but not removing "${normalisedUrl}" as it is still in use`);
return;
}
log.info(`Connection removed, removing "${normalisedUrl}" from queue`);
this.feedsToRetain.delete(normalisedUrl);
this.feedQueue.remove(normalisedUrl);
feeds.delete(normalisedUrl);
this.feedsFailingHttp.delete(normalisedUrl);
this.feedsFailingParsing.delete(normalisedUrl);
Metrics.feedsCount.dec();
Metrics.feedsCountDeprecated.dec();
});

log.debug('Loaded feed URLs:', this.observedFeedUrls);
log.debug('Loaded feed URLs:', [...feeds].join(', '));

for (let i = 0; i < config.pollConcurrency; i++) {
void this.pollFeeds(i);
Expand All @@ -147,21 +171,24 @@ export class FeedReader {
this.timeouts.forEach(t => clearTimeout(t));
}

private calculateFeedUrls(): void {
/**
* Calculate the initial feed set for the reader. Should never
* be called twice.
*/
private calculateInitialFeedUrls(): Set<string> {
// just in case we got an invalid URL somehow
const normalizedUrls = [];
const observedFeedUrls = new Set<string>();
for (const conn of this.connections) {
try {
normalizedUrls.push(normalizeUrl(conn.feedUrl));
observedFeedUrls.add(normalizeUrl(conn.feedUrl));
} catch (err: unknown) {
log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`);
}
}
this.observedFeedUrls = new Set(normalizedUrls);
this.feedQueue = shuffle([...this.observedFeedUrls.values()]);

Metrics.feedsCount.set(this.observedFeedUrls.size);
Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size);
this.feedQueue.populate([...observedFeedUrls]);
Metrics.feedsCount.set(observedFeedUrls.size);
Metrics.feedsCountDeprecated.set(observedFeedUrls.size);
return observedFeedUrls;
}

/**
Expand All @@ -173,6 +200,11 @@ export class FeedReader {
* @returns A boolean that returns if we saw any changes on the feed since the last poll time.
*/
public async pollFeed(url: string): Promise<boolean> {
// If a feed is deleted while it is being polled, we need
// to remember NOT to add it back to the queue. This
// set keeps track of all the feeds that *should* be
// requeued.
this.feedsToRetain.add(url);
let seenEntriesChanged = false;
const fetchKey = randomUUID();
const { etag, lastModified } = this.cacheTimes.get(url) || {};
Expand Down Expand Up @@ -203,22 +235,20 @@ export class FeedReader {
if (feed) {
// If undefined, we got a not-modified.
log.debug(`Found ${feed.items.length} entries in ${url}`);

const seenItems = await this.storage.hasSeenFeedGuids(url, ...feed.items.filter(item => !!item.hashId).map(item => item.hashId!))
for (const item of feed.items) {
// Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage.
if (!item.hashId) {
log.error(`Could not determine guid for entry in ${url}, skipping`);
continue;
}
const hashId = `md5:${item.hashId}`;
newGuids.push(hashId);

if (initialSync) {
log.debug(`Skipping entry ${item.id ?? hashId} since we're performing an initial sync`);
if (seenItems.includes(item.hashId)) {
continue;
}
if (await this.storage.hasSeenFeedGuid(url, hashId)) {
log.debug('Skipping already seen entry', item.id ?? hashId);
newGuids.push(item.hashId);

if (initialSync) {
log.debug(`Skipping entry ${item.id ?? item.hashId} since we're performing an initial sync`);
continue;
}
const entry = {
Expand All @@ -243,25 +273,27 @@ export class FeedReader {
if (seenEntriesChanged && newGuids.length) {
await this.storage.storeFeedGuids(url, ...newGuids);
}

}
this.queue.push<FeedSuccess>({ eventName: 'feed.success', sender: 'FeedReader', data: { url } });
// Clear any feed failures
this.feedsFailingHttp.delete(url);
this.feedsFailingParsing.delete(url);
if (this.feedsToRetain.has(url)) {
// If we've removed this feed since processing it, do not requeue.
this.feedQueue.push(url);
}
} catch (err: unknown) {
// TODO: Proper Rust Type error.
if ((err as Error).message.includes('Failed to fetch feed due to HTTP')) {
this.feedsFailingHttp.add(url);
} else {
this.feedsFailingParsing.add(url);
}
const backoffDuration = this.feedQueue.backoff(url);
const error = err instanceof Error ? err : new Error(`Unknown error ${err}`);
const feedError = new FeedError(url.toString(), error, fetchKey);
log.error("Unable to read feed:", feedError.message);
log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`);
this.queue.push<FeedError>({ eventName: 'feed.error', sender: 'FeedReader', data: feedError});
} finally {
this.feedQueue.push(url);
}
return seenEntriesChanged;
}
Expand All @@ -277,11 +309,11 @@ export class FeedReader {
Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size );
Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size);

log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds (worker: ${workerId})`);
log.debug(`Checking for updates in ${this.feedQueue.length()} RSS/Atom feeds (worker: ${workerId})`);

const fetchingStarted = Date.now();

const [ url ] = this.feedQueue.splice(0, 1);
const url = this.feedQueue.pop();
let sleepFor = this.sleepingInterval;

if (url) {
Expand All @@ -298,7 +330,7 @@ export class FeedReader {
log.warn(`It took us longer to update the feeds than the configured pool interval`);
}
} else {
// It may be possible that we have more workers than feeds. This will cause the worker to just sleep.
// It is possible that we have more workers than feeds. This will cause the worker to just sleep.
log.debug(`No feeds available to poll for worker ${workerId}`);
}

Expand Down
3 changes: 2 additions & 1 deletion src/feeds/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel {
.map(|f| f.value)
.or(item.link.clone())
.or(item.title.clone())
.and_then(|f| hash_id(f).ok()),
.and_then(|f| hash_id(f).ok())
.map(|f| format!("md5:{}", f)),
})
.collect(),
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod feeds;
pub mod format_util;
pub mod github;
pub mod jira;
pub mod util;

#[macro_use]
extern crate napi_derive;
Expand Down
Loading

0 comments on commit 387f7c1

Please sign in to comment.