Skip to content

Commit

Permalink
i hate it here
Browse files Browse the repository at this point in the history
  • Loading branch information
sylveonnotdeko committed Apr 21, 2024
1 parent 87b57fc commit 3c6068b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public Task RunAsync()
{
try
{
var allStreamData = CacheGetAllData();
var allStreamData = await CacheGetAllData();
var oldStreamDataDict = allStreamData
// group by type
Expand Down Expand Up @@ -223,10 +223,10 @@ public async Task<bool> CacheAddData(StreamDataKey streamDataKey, StreamData? da
/// Deletes stream data from the cache.
/// </summary>
/// <param name="streamdataKey">The stream data key.</param>
public void CacheDeleteData(StreamDataKey streamdataKey)
private async Task CacheDeleteData(StreamDataKey streamdataKey)
{
var db = multi.GetDatabase();
db.HashDelete(this.key, JsonConvert.SerializeObject(streamdataKey));
await db.HashDeleteAsync(this.key, JsonConvert.SerializeObject(streamdataKey));
}

/// <summary>
Expand All @@ -242,14 +242,16 @@ private async void CacheClearAllData()
/// Gets all stream data from the cache.
/// </summary>
/// <returns>A dictionary containing all cached stream data.</returns>
public Dictionary<StreamDataKey, StreamData?> CacheGetAllData()
private async Task<Dictionary<StreamDataKey, StreamData?>> CacheGetAllData()
{
var db = multi.GetDatabase();
if (!db.KeyExists(key))
if (!await db.KeyExistsAsync(key))
return new Dictionary<StreamDataKey, StreamData?>();

return db.HashGetAll(key)
.Select(redisEntry => (Key: JsonConvert.DeserializeObject<StreamDataKey>(redisEntry.Name),

var getAll = await db.HashGetAllAsync(key);

return getAll.Select(redisEntry => (Key: JsonConvert.DeserializeObject<StreamDataKey>(redisEntry.Name),
Value: JsonConvert.DeserializeObject<StreamData?>(redisEntry.Value)))
.Where(keyValuePair => keyValuePair.Key.Name is not null)
.ToDictionary(keyValuePair => keyValuePair.Key, entry => entry.Value);
Expand Down Expand Up @@ -305,6 +307,6 @@ private async Task EnsureTracked(StreamData? data)
/// Removes a stream from tracking.
/// </summary>
/// <param name="streamDataKey">The stream data key.</param>
public void UntrackStreamByKey(in StreamDataKey streamDataKey)
=> CacheDeleteData(streamDataKey);
public async Task UntrackStreamByKey(StreamDataKey streamDataKey)
=> await CacheDeleteData(streamDataKey);
}
106 changes: 59 additions & 47 deletions src/Mewdeko/Modules/Searches/Services/StreamNotificationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,39 @@ public StreamNotificationService(
});
}

this.pubSub.Sub(streamsOfflineKey, HandleStreamsOffline);
this.pubSub.Sub(streamsOnlineKey, HandleStreamsOnline);
_ = Task.Run(async () =>
{
await this.pubSub.Sub(streamsOnlineKey, async data =>
{
await HandleStreamsOnline(data);
});
await this.pubSub.Sub(streamsOfflineKey, async data =>
{
await HandleStreamsOffline(data);
});
});

if (client.ShardId == 0)
{
// only shard 0 will run the tracker,
// and then publish updates with redis to other shards
streamTracker.OnStreamsOffline += OnStreamsOffline;
streamTracker.OnStreamsOnline += OnStreamsOnline;
_ = streamTracker.RunAsync();

this.pubSub.Sub(streamFollowKey, HandleFollowStream);
this.pubSub.Sub(streamUnfollowKey, HandleUnfollowStream);
_ = Task.Run(async () =>
{
// only shard 0 will run the tracker,
// and then publish updates with redis to other shards
streamTracker.OnStreamsOffline +=
async data => await OnStreamsOffline(data).ConfigureAwait(false);
streamTracker.OnStreamsOnline +=
async data => await OnStreamsOnline(data).ConfigureAwait(false);
_ = streamTracker.RunAsync();
await this.pubSub.Sub(streamFollowKey, async data =>
{
await HandleFollowStream(data);
});
await this.pubSub.Sub(streamUnfollowKey, async data =>
{
await HandleUnfollowStream(data);
});
});
}

bot.JoinedGuild += ClientOnJoinedGuild;
Expand Down Expand Up @@ -171,7 +191,7 @@ public async Task OnReadyAsync()
await uow.SaveChangesAsync().ConfigureAwait(false);

foreach (var loginToDelete in kvp.Value)
streamTracker.UntrackStreamByKey(new StreamDataKey(kvp.Key, loginToDelete));
await streamTracker.UntrackStreamByKey(new StreamDataKey(kvp.Key, loginToDelete));
}
}
catch (Exception ex)
Expand All @@ -186,20 +206,17 @@ public async Task OnReadyAsync()
/// When counter reaches 0, stream is removed from tracking because
/// that means no guilds are subscribed to that stream anymore
/// </summary>
private async ValueTask HandleFollowStream(FollowStreamPubData info)
private async Task HandleFollowStream(FollowStreamPubData info)
{
await streamTracker.CacheAddData(info.Key, null, false);
lock (shardLock)
var key = info.Key;
if (trackCounter.TryGetValue(key, out _))
{
var key = info.Key;
if (trackCounter.TryGetValue(key, out _))
{
trackCounter[key].Add(info.GuildId);
}
else
{
trackCounter[key] = [info.GuildId];
}
trackCounter[key].Add(info.GuildId);
}
else
{
trackCounter[key] = [info.GuildId];
}
}

Expand All @@ -208,32 +225,27 @@ private async ValueTask HandleFollowStream(FollowStreamPubData info)
/// When counter reaches 0, stream is removed from tracking because
/// that means no guilds are subscribed to that stream anymore
/// </summary>
private ValueTask HandleUnfollowStream(FollowStreamPubData info)
private async Task HandleUnfollowStream(FollowStreamPubData info)
{
lock (shardLock)
var key = info.Key;
if (!trackCounter.TryGetValue(key, out var set))
{
var key = info.Key;
if (!trackCounter.TryGetValue(key, out var set))
{
// it should've been removed already?
streamTracker.UntrackStreamByKey(in key);
return default;
}

set.Remove(info.GuildId);
if (set.Count != 0)
return default;

trackCounter.Remove(key);
// if no other guilds are following this stream
// untrack the stream
streamTracker.UntrackStreamByKey(in key);
// it should've been removed already?
await streamTracker.UntrackStreamByKey(key);
return;
}

return default;
set.Remove(info.GuildId);
if (set.Count != 0)
return;

trackCounter.Remove(key);
// if no other guilds are following this stream
// untrack the stream
await streamTracker.UntrackStreamByKey(key);
}

private async ValueTask HandleStreamsOffline(List<StreamData> offlineStreams)
private async Task HandleStreamsOffline(List<StreamData> offlineStreams)
{
foreach (var stream in offlineStreams)
{
Expand All @@ -252,7 +264,7 @@ await fss
}
}

private async ValueTask HandleStreamsOnline(List<StreamData> onlineStreams)
private async Task HandleStreamsOnline(List<StreamData> onlineStreams)
{
foreach (var stream in onlineStreams)
{
Expand Down Expand Up @@ -280,11 +292,11 @@ await fss.SelectMany(x => x.Value)
}
}

private Task OnStreamsOnline(List<StreamData> data)
=> pubSub.Pub(streamsOnlineKey, data);
private async Task OnStreamsOnline(List<StreamData> data)
=> await pubSub.Pub(streamsOnlineKey, data);

private Task OnStreamsOffline(List<StreamData> data)
=> pubSub.Pub(streamsOfflineKey, data);
private async Task OnStreamsOffline(List<StreamData> data)
=> await pubSub.Pub(streamsOfflineKey, data);

private Task ClientOnJoinedGuild(GuildConfig guildConfig)
{
Expand Down

0 comments on commit 3c6068b

Please sign in to comment.