Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for a cache operator supporting asynchronous refresh #3573

Open
keddie opened this issue Sep 11, 2023 · 8 comments
Open

Support for a cache operator supporting asynchronous refresh #3573

keddie opened this issue Sep 11, 2023 · 8 comments
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...) status/need-user-input This needs user input to proceed type/enhancement A general enhancement

Comments

@keddie
Copy link

keddie commented Sep 11, 2023

Caffeine supports a concept called "refresh" where after a value is written into the cache, it will be refreshed if it's referenced after a certain duration. While the refresh is in progress, the original value is returned. This allows low-latency, while still getting a relatively up-to-date value. I propose adding an operator Mono.refreshCache(Duration refreshAfterWrite, Duration expireAfterWrite) with this functionality.

Motivation

For example suppose you have a method which returns after 60 minutes takes60Minutes() ... callers to

Mono.fromSupplier(() -> takes60Minutes()).cache(Duration.ofHours(2))

will wait for an hour every 2 hours when the value expires. On the other hand an operator supporting refresh

Mono.fromSupplier(() -> takes60Minutes()).refreshCache(refresh: Duration.ofMinutes(30), expire: Duration.ofHours(2))

Will always immediately have a value as long as there is at least one read per half hour.

Desired solution

Add an implementation of the operator above.

Considered alternatives

One approach people might take is to just grab the latest value from a Flux periodically polling the slow emitter. One key advantage of the refreshCache() structure is that the call is only made when/if there are further subscriptions, so expensive operations are not re-run excessively. Note that the refresh does rate limit the upstream.

I feel like there ought to be some combination of other operators that could build up this behavior (i.e. two stacked cache() calls... but I don't see it.

Another option would be to just provide said operator as a transformation used like:
sourceMono.transform(Extensions.refreshCache(ofHours(1), ofHours(3)))
In some suitable extensions library.

A clear downside of adding any new operator is that the proliferation of methods on Mono is already fairly baffling, and comes with a support cost :) This would argue for just providing a transformation.

Additional context

I made an standalone implementation Mono.refreshCache(). I'm happy to contribute it. But since I made it with a few things specific to a project for my current employer(who are supportive)... before I go to the trouble of making a PR. I thought I'd check on your interest in such an operator. I did not yet implement a corresponding operator for Flux, but it'd be straightforward to do, and I think I should do that just for consistency.

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Sep 11, 2023
@OlegDokuka OlegDokuka added type/enhancement A general enhancement and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Dec 14, 2023
@MikkelHJuul
Copy link

Wouldn't it be a lot easier for you to have a single Sinks.many().replay().latest() and return that as a Mono.from(sink.asFlux()) and spawn a background process to update that

Or Flux.interval(howOftenIWantToFetch).map(costlyFetch).cache()

A Mono that change is no Mono

@Akshay-Sundarraj
Copy link

@MikkelHJuul Can you please provide more info on how to use Sinks.many? or better way to handle following situation?
I'm facing the similar issue mentioned by op. In my case I need to get bearer token from server on application startup and refresh it at half life to reduce latency. During refresh, application should continue to use the cached token and once refresh completes new token should be used and old token to be discarded. The refresh should happen in the background and should not affect main path.
I'm currently using Mono.cache(ttl) to cache the token for specified duration and when caching expires it is triggering the call to server which is increasing latency for some of the requests. TIA.

@MikkelHJuul
Copy link

@MikkelHJuul Can you please provide more info on how to use Sinks.many? or better way to handle following situation?

I'm facing the similar issue mentioned by op. In my case I need to get bearer token from server on application startup and refresh it at half life to reduce latency. During refresh, application should continue to use the cached token and once refresh completes new token should be used and old token to be discarded. The refresh should happen in the background and should not affect main path.

I'm currently using Mono.cache(ttl) to cache the token for specified duration and when caching expires it is triggering the call to server which is increasing latency for some of the requests. TIA.

So, if I had an app designed around reactor anyway I would use a Flux directly, as the value may change over time. If you have other flux-like input data you can choose either of sources as primary, and use #zip or #combineLatest to combine the fluxes' content.

But depending on your use it may be nice to have the token requested on subscribe since errors during its use could warrant refreshing the token. But if you can promise a time based token that is sure to not become rejected I would use the Flux.interval(..).cache(1) which would make it a hot-source that, if resubscribed, will re-emit the most recent token (new pipelines are trying to make request should still use the most recent token.

The cache(1) operator is essentially the same operator as the Sinks.many.replay.latest btw. The sinks thing just gives you the inverted control of being able to "send" messages on demand

@keddie
Copy link
Author

keddie commented May 11, 2024

@MikkelHJuul - the semantics are different.

The problem is that background processes are expensive, as is storing a value which isn't actually being accessed. If you make more calls to the backing service to get the data than the number of times the data is accessed, that's a bad thing. If you do either Sinks.many()... or Flux.interval()... then there can be more calls to the source of the data than access attempts. A refresh cache guarantees that the number of upstream fetches is less than or equal to the number of accesses.

As a thought experiment, It might help to imagine creating 10000 of these, and the signals being 100 MBs of data extracted and transformed from different databases.

obv. Take a look at caffeine to get a fuller picture. If anyone seems interested I could just open a PR with the implementation I made.

@ben-manes
Copy link

@keddie is correct. A common misunderstanding with Caffeine's refresh is when users think of it as a periodic reload of the content at the fixed delay. That's a (unbounded) periodic replica of the source data, whereas a cache is more often a bounded subset of that data. In those cases, users can simply use a ScheduledExecutorService with a periodic task that reloads the data, e.g. building an immutable snapshot and setting a volatile field to make it visible for readers. Your case, @MikkelHJuul, sounds similar and I believe interval(duration) provides that functionality.

@MikkelHJuul
Copy link

@keddie / @ben-manes - now I'm not sure if you agree or disagree. It sounds like yes except for the fact that you want the source to be cancelled if no one listens, so as to not cache data when it is unneeded. #refCount spring to mind.

We actually do that as well where all subscribers use the same cached data (Flux#cache) and the cached flux is cancelled with Flux#refCount.

Now it may be nicer to do something more specialized, sure. I think my initial thought however is that a data source that changes like this is logically, not, a Mono, but a Flux. So it should not be covered by the Mono api but the Flux api, if any.

But I guess that is only slightly related to whether to add such a construct or not :)

@chemicL
Copy link
Member

chemicL commented Aug 19, 2024

I played with the idea a little bit and I think it would be interesting functionality. Although I think I do have some reservations, similar to the one from @MikkelHJuul.

With current Mono.cache variants, once the value is invalidated, all subscriptions that happen after the TTL will be coordinated to fetch the source Mono's result. The vehicle for this is some Thread that calls one of the winning Subscribers' methods.

In this proposal I think it seems that the lifecycle can span between a batch of Subscribers that triggered the first resolution and another batch that can potentially see a refreshed value after the first batch retrieved a result. Let's say that once the first retrieval completes, current Subscribers are served and a refresh is triggered. What Thread is carrying out the refresh? And how does a Mono know it will be reused? If it's not reused then we're performing unnecessary work and holding a reference.

Anyways, here's my experiment on this to understand better, let me know if I caught the idea. Below the code is the output:

Refreshing Cache implementation
public static void main(String[] args) throws InterruptedException {
    Scheduler scheduler = Schedulers.boundedElastic();

    AtomicInteger counter = new AtomicInteger();

    Callable<Integer> refresh = () -> {
        try {
            print("Refreshing value");
            Thread.sleep(1000);
            print("Refreshed value");
            return counter.incrementAndGet();
        } catch (InterruptedException e) {
            throw new RuntimeException("interrupted");
        }
    };

    CountDownLatch batch1 = new CountDownLatch(3);

    RefreshingCache<Integer> cache = new RefreshingCache<>(refresh, Duration.ofSeconds(5), Duration.ofSeconds(10));

    print("Subscribing first 3");
    cache.getCached().doOnNext(i -> print("sub1: " + i)).subscribe(i -> batch1.countDown());
    cache.getCached().doOnNext(i -> print("sub2: " + i)).subscribe(i -> batch1.countDown());
    cache.getCached().doOnNext(i -> print("sub3: " + i)).subscribe(i -> batch1.countDown());

    print("Waiting for 3 to complete");
    batch1.await();
    print("3 complete. Waiting 7s");

    Thread.sleep(7000);

    print("Subscriber 4 starts");
    cache.getCached().doOnNext(i -> print("sub4: " + i)).subscribe();

    print("Waiting 7s");
    Thread.sleep(7000);

    print("Subscriber 5 starts");
    cache.getCached().doOnNext(i -> print("sub5: " + i)).subscribe();

    print("Waiting 20s");
    Thread.sleep(20_000);

    print("Resource refresh count: " + counter.get());
}

static final class RefreshingCache<R> {

    Callable<R>                 resource;
    private final Duration refresh;
    private final Duration expiry;
    Scheduler                   scheduler      = Schedulers.boundedElastic();
    AtomicReference<R> currentValue = new AtomicReference<>();
    Mono<R> source;
    AtomicReference<Disposable> disposable = new AtomicReference<>();

    public RefreshingCache(Callable<R> resource, Duration refresh, Duration expiry) {
        this.resource = resource;
        this.refresh = refresh;
        this.expiry = expiry;
        this.source = Mono.<R>create(s -> {
            final R cV = currentValue.get();
            // TODO: can also check retrieval time and compare with expiry to wait
            //   for the result
            if (cV == null) {
                scheduler.schedule(() -> {
                    try {
                        R r = resource.call();
                        currentValue.set(r);
                        s.success(r);
                    }
                    catch (Exception e) {
                        s.error(e);
                    }
                });
            } else {
                s.success(cV);
            }
        })
                .doOnSuccess(r -> {
                    if (disposable.get() == null) {
                        disposable.set(scheduler.schedule(() -> {
                            disposable.set(null);
                            try {
                                currentValue.set(resource.call());
                            } catch (Exception e) {
                                // ?
                            }
                        }, refresh.toMillis(), TimeUnit.MILLISECONDS));
                    }
                })
                          .cacheInvalidateIf(r -> !r.equals(currentValue.get()));
    }

    Mono<R> getCached() {
        return source;
    }
}

private static void print(String what) {
    System.out.println(Instant.now() + " [" + Thread.currentThread().getName() + "] " + what);
}
Output
2024-08-19T16:50:33.223838Z [main] Subscribing first 3
2024-08-19T16:50:33.229510Z [boundedElastic-1] Refreshing value
2024-08-19T16:50:33.229903Z [main] Waiting for 3 to complete
2024-08-19T16:50:34.234884Z [boundedElastic-1] Refreshed value
2024-08-19T16:50:34.240232Z [boundedElastic-1] sub1: 1
2024-08-19T16:50:34.240734Z [boundedElastic-1] sub2: 1
2024-08-19T16:50:34.240844Z [boundedElastic-1] sub3: 1
2024-08-19T16:50:34.241046Z [main] 3 complete. Waiting 7s
2024-08-19T16:50:39.247949Z [boundedElastic-2] Refreshing value
2024-08-19T16:50:40.253289Z [boundedElastic-2] Refreshed value
2024-08-19T16:50:41.245015Z [main] Subscriber 4 starts
2024-08-19T16:50:41.246529Z [main] sub4: 2
2024-08-19T16:50:41.246628Z [main] Waiting 7s
2024-08-19T16:50:46.251564Z [boundedElastic-2] Refreshing value
2024-08-19T16:50:47.256825Z [boundedElastic-2] Refreshed value
2024-08-19T16:50:48.251806Z [main] Subscriber 5 starts
2024-08-19T16:50:48.254611Z [main] sub5: 3
2024-08-19T16:50:48.255182Z [main] Waiting 20s
2024-08-19T16:50:53.258787Z [boundedElastic-2] Refreshing value
2024-08-19T16:50:54.262319Z [boundedElastic-2] Refreshed value
2024-08-19T16:51:08.257524Z [main] Resource refresh count: 4

@chemicL chemicL added for/user-attention This issue needs user attention (feedback, rework, etc...) status/need-user-input This needs user input to proceed labels Aug 20, 2024
@chemicL
Copy link
Member

chemicL commented Aug 27, 2024

@keddie @ben-manes I'm really interested in your input. I'm not sure the above idea is generic enough to incorporate an an operator, but I think we could add a FAQ section to our reference documentation for users to tailor it to their use cases. Do you think this is near something you'd recommend? WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...) status/need-user-input This needs user input to proceed type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

7 participants