From f57c555a4f57456787b46e61c25a5df83c13d50c Mon Sep 17 00:00:00 2001 From: 2881099 <2881099@qq.com> Date: Wed, 11 Nov 2020 04:27:57 +0800 Subject: [PATCH] finish client-side-caching --- .../Program.cs | 105 +++----- readme.md | 16 ++ src/FreeRedis/ClientSideCaching.cs | 242 ++++++++++++++++++ src/FreeRedis/RedisClient.cs | 4 +- src/FreeRedis/RedisClientAsync.cs | 4 +- src/FreeRedis/RedisClientEvents.cs | 8 +- test/Unit/FreeRedis.Tests/InterceptorTests.cs | 82 ++---- 7 files changed, 320 insertions(+), 141 deletions(-) create mode 100644 src/FreeRedis/ClientSideCaching.cs diff --git a/examples/console_netcore31_client_side_caching/Program.cs b/examples/console_netcore31_client_side_caching/Program.cs index c76ac6a8..d3a5e999 100644 --- a/examples/console_netcore31_client_side_caching/Program.cs +++ b/examples/console_netcore31_client_side_caching/Program.cs @@ -2,8 +2,10 @@ using FreeRedis.Internal; using Newtonsoft.Json; using System; +using System.Collections.Generic; using System.Collections.Concurrent; using System.Linq; +using System.Threading; namespace console_netcore31_client_side_caching { @@ -24,86 +26,43 @@ class Program static void Main(string[] args) { - cli.UseClientSideCaching(); - - cli.Set("Interceptor01", "123123"); - - var val1 = cli.Get("Interceptor01"); - var val2 = cli.Get("Interceptor01"); - var val3 = cli.Get("Interceptor01"); + cli.UseClientSideCaching(new ClientSideCachingOptions + { + //本地缓存的容量 + Capacity = 3, + //过滤哪些键能被本地缓存 + KeyFilter = key => key.StartsWith("Interceptor"), + //检查长期未使用的缓存 + CheckExpired = (key, dt) => DateTime.Now.Subtract(dt) > TimeSpan.FromSeconds(2) + }); - Console.ReadKey(); + cli.Set("Interceptor01", "123123"); //redis-server - var val4 = cli.Get("Interceptor01"); + var val1 = cli.Get("Interceptor01"); //redis-server + var val2 = cli.Get("Interceptor01"); //本地 + var val3 = cli.Get("Interceptor01"); //断点等3秒,redis-server - Console.ReadKey(); - } - } - - static class MemoryCacheAopExtensions - { - public static void UseClientSideCaching(this RedisClient cli) - { - var sub = cli.Subscribe("__redis__:invalidate", (chan, msg) => - { - var keys = msg as object[]; - foreach (var key in keys) - { - _dicStrings.TryRemove(string.Concat(key), out var old); - } - }) as IPubSubSubscriber; + cli.Set("Interceptor01", "234567"); //redis-server + var val4 = cli.Get("Interceptor01"); //redis-server + var val5 = cli.Get("Interceptor01"); //本地 - var context = new ClientSideCachingContext(cli, sub); - cli.Interceptors.Add(() => new MemoryCacheAop()); - cli.Unavailable += (_, e) => - { - _dicStrings.Clear(); - }; - cli.Connected += (_, e) => - { - e.Client.ClientTracking(true, context._sub.RedisSocket.ClientId, null, false, false, false, false); - }; - } + var val6 = cli.MGet("Interceptor01", "Interceptor02", "Interceptor03"); //redis-server + var val7 = cli.MGet("Interceptor01", "Interceptor02", "Interceptor03"); //本地 + var val8 = cli.MGet("Interceptor01", "Interceptor02", "Interceptor03"); //本地 - class ClientSideCachingContext - { - internal RedisClient _cli; - internal IPubSubSubscriber _sub; - public ClientSideCachingContext(RedisClient cli, IPubSubSubscriber sub) - { - _cli = cli; - _sub = sub; - } - } + cli.MSet("Interceptor01", "Interceptor01Value", "Interceptor02", "Interceptor02Value", "Interceptor03", "Interceptor03Value"); //redis-server + var val9 = cli.MGet("Interceptor01", "Interceptor02", "Interceptor03"); //redis-server + var val10 = cli.MGet("Interceptor01", "Interceptor02", "Interceptor03"); //本地 - static ConcurrentDictionary _dicStrings = new ConcurrentDictionary(); - class MemoryCacheAop : IInterceptor - { - public void After(InterceptorAfterEventArgs args) - { - switch (args.Command._command) - { - case "GET": - if (_iscached == false && args.Exception == null) - _dicStrings.TryAdd(args.Command.GetKey(0), args.Value); - break; - } - } + //以下 KeyFilter 返回 false,从而不使用本地缓存 + cli.Set("123Interceptor01", "123123"); //redis-server - bool _iscached = false; - public void Before(InterceptorBeforeEventArgs args) - { - switch (args.Command._command) - { - case "GET": - if (_dicStrings.TryGetValue(args.Command.GetKey(0), out var tryval)) - { - args.Value = tryval; - _iscached = true; - } - break; - } - } + var val11 = cli.Get("123Interceptor01"); //redis-server + var val12 = cli.Get("123Interceptor01"); //redis-server + var val23 = cli.Get("123Interceptor01"); //redis-server + Console.ReadKey(); } } + + } diff --git a/readme.md b/readme.md index ace1458d..a214797c 100644 --- a/readme.md +++ b/readme.md @@ -99,6 +99,22 @@ public static RedisClient cli = new RedisClient( ----- +#### ⚡ Client-side-cahing (本地缓存) + +> requires redis-server 6.0 and above + +```csharp +cli.UseClientSideCaching(new ClientSideCachingOptions +{ + //本地缓存的容量 + Capacity = 3, + //过滤哪些键能被本地缓存 + KeyFilter = key => key.StartsWith("Interceptor"), + //检查长期未使用的缓存 + CheckExpired = (key, dt) => DateTime.Now.Subtract(dt) > TimeSpan.FromSeconds(2) +}); +``` + #### 📡 Subscribe (订阅) ```csharp diff --git a/src/FreeRedis/ClientSideCaching.cs b/src/FreeRedis/ClientSideCaching.cs new file mode 100644 index 00000000..2dc8cb99 --- /dev/null +++ b/src/FreeRedis/ClientSideCaching.cs @@ -0,0 +1,242 @@ +using FreeRedis.Internal; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; + +namespace FreeRedis +{ + public class ClientSideCachingOptions + { + public int Capacity { get; set; } + + /// + /// true: cache + /// + public Func KeyFilter { get; set; } + + /// + /// true: expired + /// + public Func CheckExpired { get; set; } + } + + public static class ClientSideCachingExtensions + { + public static void UseClientSideCaching(this RedisClient cli, ClientSideCachingOptions options) + { + new ClientSideCachingContext(cli, options) + .Start(); + } + + class ClientSideCachingContext + { + readonly RedisClient _cli; + readonly ClientSideCachingOptions _options; + IPubSubSubscriber _sub; + + public ClientSideCachingContext(RedisClient cli, ClientSideCachingOptions options) + { + _cli = cli; + _options = options ?? new ClientSideCachingOptions(); + } + + public void Start() + { + _sub = _cli.Subscribe("__redis__:invalidate", InValidate) as IPubSubSubscriber; + _cli.Interceptors.Add(() => new MemoryCacheAop(this)); + _cli.Unavailable += (_, e) => + { + lock (_dictLock) _dictSort.Clear(); + _dict.Clear(); + }; + _cli.Connected += (_, e) => + { + e.Client.ClientTracking(true, _sub.RedisSocket.ClientId, null, false, false, false, false); + }; + } + + void InValidate(string chan, object msg) + { + var keys = msg as object[]; + foreach (var key in keys) + RemoveCache(string.Concat(key)); + } + + static readonly DateTime _dt2020 = new DateTime(2020, 1, 1); + static long GetTime() => (long)DateTime.Now.Subtract(_dt2020).TotalSeconds; + /// + /// key -> Type(string|byte[]|class) -> value + /// + readonly ConcurrentDictionary _dict = new ConcurrentDictionary(); + readonly SortedSet _dictSort = new SortedSet(); + readonly object _dictLock = new object(); + bool TryGetCacheValue(string key, Type valueType, out object value) + { + if (_dict.TryGetValue(key, out var trydictval) && trydictval.Values.TryGetValue(valueType, out var tryval) + //&& DateTime.Now.Subtract(_dt2020.AddSeconds(tryval.SetTime)) < TimeSpan.FromMinutes(5) + ) + { + if (_options.CheckExpired?.Invoke(key, _dt2020.AddSeconds(tryval.SetTime)) == true) + { + RemoveCache(key); + value = null; + return false; + } + var time = GetTime(); + if (_options.Capacity > 0) + { + lock (_dictLock) + { + _dictSort.Remove($"{trydictval.GetTime.ToString("X").PadLeft(16, '0')}{key}"); + _dictSort.Add($"{time.ToString("X").PadLeft(16, '0')}{key}"); + } + } + Interlocked.Exchange(ref trydictval.GetTime, time); + value = tryval.Value; + return true; + } + value = null; + return false; + } + void SetCacheValue(string command, string key, Type valueType, object value) + { + _dict.GetOrAdd(key, keyTmp => + { + var time = GetTime(); + if (_options.Capacity > 0) + { + string removeKey = null; + lock (_dictLock) + { + if (_dictSort.Count >= _options.Capacity) removeKey = _dictSort.First().Substring(16); + _dictSort.Add($"{time.ToString("X").PadLeft(16, '0')}{key}"); + } + if (removeKey != null) + RemoveCache(removeKey); + } + return new DictValue(command, time); + }).Values + .AddOrUpdate(valueType, new DictValue.ObjectValue(value), (oldkey, oldval) => new DictValue.ObjectValue(value)); + } + void RemoveCache(params string[] keys) + { + if (keys?.Any() != true) return; + foreach (var key in keys) + { + if (_dict.TryRemove(key, out var old)) + { + if (_options.Capacity > 0) + { + lock (_dictLock) + { + _dictSort.Remove($"{old.GetTime.ToString("X").PadLeft(16, '0')}{key}"); + } + } + } + } + } + class DictValue + { + public readonly ConcurrentDictionary Values = new ConcurrentDictionary(); + public readonly string Command; + public long GetTime; + public DictValue(string command, long gettime) + { + this.Command = command; + this.GetTime = gettime; + } + public class ObjectValue + { + public readonly object Value; + public readonly long SetTime = (long)DateTime.Now.Subtract(_dt2020).TotalSeconds; + public ObjectValue(object value) => this.Value = value; + } + } + + class MemoryCacheAop : IInterceptor + { + ClientSideCachingContext _cscc; + public MemoryCacheAop(ClientSideCachingContext cscc) + { + _cscc = cscc; + } + + bool _iscached = false; + public void Before(InterceptorBeforeEventArgs args) + { + switch (args.Command._command) + { + case "GET": + if (_cscc.TryGetCacheValue(args.Command.GetKey(0), args.ValueType, out var getval)) + { + args.Value = getval; + _iscached = true; + } + break; + case "MGET": + var mgetValType = args.ValueType.GetElementType(); + var mgetKeys = args.Command._keyIndexes.Select((item, index) => args.Command.GetKey(index)).ToArray(); + var mgetVals = mgetKeys.Select(a => _cscc.TryGetCacheValue(a, mgetValType, out var mgetval) ? + new DictGetResult { Value = mgetval, Exists = true } : new DictGetResult { Value = null, Exists = false }) + .Where(a => a.Exists).Select(a => a.Value).ToArray(); + if (mgetVals.Length == mgetKeys.Length) + { + args.Value = args.ValueType.FromObject(mgetVals); + _iscached = true; + } + break; + } + } + + public void After(InterceptorAfterEventArgs args) + { + switch (args.Command._command) + { + case "GET": + if (_iscached == false && args.Exception == null) + { + var getkey = args.Command.GetKey(0); + if (_cscc._options.KeyFilter?.Invoke(getkey) != false) + _cscc.SetCacheValue(args.Command._command, getkey, args.ValueType, args.Value); + } + break; + case "MGET": + if (_iscached == false && args.Exception == null) + { + if (args.Value is Array valueArr) + { + var valueArrElementType = args.ValueType.GetElementType(); + var sourceArrLen = valueArr.Length; + for (var a = 0; a < sourceArrLen; a++) + _cscc.SetCacheValue("GET", args.Command.GetKey(a), valueArrElementType, valueArr.GetValue(a)); + } + } + break; + default: + if (args.Command._keyIndexes.Any()) + { + var cmdset = CommandSets.Get(args.Command._command); + if (cmdset != null && + (cmdset.Flag & CommandSets.ServerFlag.write) == CommandSets.ServerFlag.write && + (cmdset.Tag & CommandSets.ServerTag.write) == CommandSets.ServerTag.write && + (cmdset.Tag & CommandSets.ServerTag.@string) == CommandSets.ServerTag.@string) + { + _cscc.RemoveCache(args.Command._keyIndexes.Select((item, index) => args.Command.GetKey(index)).ToArray()); + } + } + break; + } + } + + class DictGetResult + { + public object Value; + public bool Exists; + } + } + } + } +} diff --git a/src/FreeRedis/RedisClient.cs b/src/FreeRedis/RedisClient.cs index 48aef0b8..083d8fa1 100644 --- a/src/FreeRedis/RedisClient.cs +++ b/src/FreeRedis/RedisClient.cs @@ -98,7 +98,7 @@ internal T LogCall(CommandPacket cmd, Func func) aopsws[idx] = new Stopwatch(); aopsws[idx].Start(); aops[idx] = isnotice && idx == aops.Length - 1 ? new NoticeCallInterceptor(this) : this.Interceptors[idx]?.Invoke(); - var args = new InterceptorBeforeEventArgs(this, cmd); + var args = new InterceptorBeforeEventArgs(this, cmd, typeof(T)); aops[idx].Before(args); if (args.ValueIsChanged && args.Value is T argsValue) { @@ -121,7 +121,7 @@ internal T LogCall(CommandPacket cmd, Func func) for (var idx = 0; idx < aops.Length; idx++) { aopsws[idx].Stop(); - var args = new InterceptorAfterEventArgs(this, cmd, ret, exception, aopsws[idx].ElapsedMilliseconds); + var args = new InterceptorAfterEventArgs(this, cmd, typeof(T), ret, exception, aopsws[idx].ElapsedMilliseconds); aops[idx].After(args); } } diff --git a/src/FreeRedis/RedisClientAsync.cs b/src/FreeRedis/RedisClientAsync.cs index c217cd34..bee469e6 100644 --- a/src/FreeRedis/RedisClientAsync.cs +++ b/src/FreeRedis/RedisClientAsync.cs @@ -35,7 +35,7 @@ async internal Task LogCallAsync(CommandPacket cmd, Func> func) aopsws[idx] = new Stopwatch(); aopsws[idx].Start(); aops[idx] = isnotice && idx == aops.Length - 1 ? new NoticeCallInterceptor(this) : this.Interceptors[idx]?.Invoke(); - var args = new InterceptorBeforeEventArgs(this, cmd); + var args = new InterceptorBeforeEventArgs(this, cmd, typeof(T)); aops[idx].Before(args); if (args.ValueIsChanged && args.Value is T argsValue) { @@ -58,7 +58,7 @@ async internal Task LogCallAsync(CommandPacket cmd, Func> func) for (var idx = 0; idx < aops.Length; idx++) { aopsws[idx].Stop(); - var args = new InterceptorAfterEventArgs(this, cmd, ret, exception, aopsws[idx].ElapsedMilliseconds); + var args = new InterceptorAfterEventArgs(this, cmd, typeof(T), ret, exception, aopsws[idx].ElapsedMilliseconds); aops[idx].After(args); } } diff --git a/src/FreeRedis/RedisClientEvents.cs b/src/FreeRedis/RedisClientEvents.cs index 4b54cc6d..6f537879 100644 --- a/src/FreeRedis/RedisClientEvents.cs +++ b/src/FreeRedis/RedisClientEvents.cs @@ -58,11 +58,13 @@ public class InterceptorBeforeEventArgs { public RedisClient Client { get; } public CommandPacket Command { get; } + public Type ValueType { get; } - public InterceptorBeforeEventArgs(RedisClient cli, CommandPacket cmd) + public InterceptorBeforeEventArgs(RedisClient cli, CommandPacket cmd, Type valueType) { this.Client = cli; this.Command = cmd; + this.ValueType = valueType; } public object Value @@ -81,15 +83,17 @@ public class InterceptorAfterEventArgs { public RedisClient Client { get; } public CommandPacket Command { get; } + public Type ValueType { get; } public object Value { get; } public Exception Exception { get; } public long ElapsedMilliseconds { get; } - public InterceptorAfterEventArgs(RedisClient cli, CommandPacket cmd, object value, Exception exception, long elapsedMilliseconds) + public InterceptorAfterEventArgs(RedisClient cli, CommandPacket cmd, Type valueType, object value, Exception exception, long elapsedMilliseconds) { this.Client = cli; this.Command = cmd; + this.ValueType = valueType; this.Value = value; this.Exception = exception; this.ElapsedMilliseconds = elapsedMilliseconds; diff --git a/test/Unit/FreeRedis.Tests/InterceptorTests.cs b/test/Unit/FreeRedis.Tests/InterceptorTests.cs index 82854eaf..c5d866e6 100644 --- a/test/Unit/FreeRedis.Tests/InterceptorTests.cs +++ b/test/Unit/FreeRedis.Tests/InterceptorTests.cs @@ -18,7 +18,7 @@ public void Interceptor() { using (var cli = CreateClient()) { - cli.UseClientSideCaching(); + cli.Interceptors.Add(() => new MemoryCacheAop()); cli.Set("Interceptor01", "123123"); @@ -32,76 +32,34 @@ public void Interceptor() } } } - - static class MemoryCacheAopExtensions + + class MemoryCacheAop : IInterceptor { - public static void UseClientSideCaching(this RedisClient cli) - { - var context = new ClientSideCachingContext(cli); - cli.Subscribe("__redis__:invalidate", (chan, msg) => - { - }); - - cli.Interceptors.Add(() => new MemoryCacheAop(context)); - cli.Unavailable += (_, e) => - { - _dicStrings.Clear(); - }; - cli.Connected += (_, e) => - { - e.Client.ClientTracking(true, 100, null, false, false, false, false); - }; - } + static ConcurrentDictionary _dicStrings = new ConcurrentDictionary(); - class ClientSideCachingContext + public void After(InterceptorAfterEventArgs args) { - internal RedisClient _cli; - internal long _clientid; - public ClientSideCachingContext(RedisClient cli) + switch (args.Command._command) { - _cli = cli; + case "GET": + if (_iscached == false && args.Exception == null) + _dicStrings.TryAdd(args.Command.GetKey(0), args.Value); + break; } } - static ConcurrentDictionary _dicStrings = new ConcurrentDictionary(); - class MemoryCacheAop : IInterceptor + bool _iscached = false; + public void Before(InterceptorBeforeEventArgs args) { - ClientSideCachingContext _context; - public MemoryCacheAop(ClientSideCachingContext context) - { - _context = context; - } - - public void After(InterceptorAfterEventArgs args) - { - switch (args.Command._command) - { - case "GET": - if (_iscached == false && args.Exception == null) - _dicStrings.TryAdd(args.Command.GetKey(0), args.Value); - break; - case "SUBSCRIBLE": - if (args.Command._input.Where((a, b) => b > 0 && string.Compare(a as string, "__redis__:invalidate", true) == 0).Any()) - { - _context._clientid = _context._cli.ClientId(); - } - break; - } - } - - bool _iscached = false; - public void Before(InterceptorBeforeEventArgs args) + switch (args.Command._command) { - switch (args.Command._command) - { - case "GET": - if (_dicStrings.TryGetValue(args.Command.GetKey(0), out var tryval)) - { - args.Value = tryval; - _iscached = true; - } - break; - } + case "GET": + if (_dicStrings.TryGetValue(args.Command.GetKey(0), out var tryval)) + { + args.Value = tryval; + _iscached = true; + } + break; } } }