Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

It is not easy to chain a CancelableOperation following other work #210

Open
natebosch opened this issue Mar 17, 2022 · 10 comments
Open

It is not easy to chain a CancelableOperation following other work #210

natebosch opened this issue Mar 17, 2022 · 10 comments
Labels
type-enhancement A request for a change that isn't a bug

Comments

@natebosch
Copy link
Member

natebosch commented Mar 17, 2022

There is no equivalent of async and await, and the chaining we have through .then requires FutureOr returns from the callbacks - we can't do any cancelable work inside the callback. These factors mean that some idioms which feel easy with Future feel clunky, and require careful use of CancelableCompleter.

For example, I tried to write a retry when a CancelableOperation results in an error. The nicest I could come up with is:

CancelableOperation<T> retryOperation<T>(
    CancelableOperation<T> Function() callback,
    {int allowedTries = 3}) {
  late CancelableOperation<T> operation;
  final completer = CancelableCompleter<T>(onCancel: () => operation.cancel());
  _retry(() => operation = callback(), completer, allowedTries);
  return completer.operation;
}

Future<void> _retry<T>(CancelableOperation<T> Function() callback,
    CancelableCompleter<T> completer, int remainingTries) async {
  for (;;) {
    try {
      final current = callback();
      var result = await current.valueOrCancellation();
      if (completer.isCanceled) return;
      if (current.isCompleted && !current.isCanceled) {
        completer.complete(result);
      } else {
        completer.operation.cancel();
      }
      return;
    } on Object catch (error, stackTrace) {
      if (--remainingTries > 0) continue;
      if (completer.isCanceled) return;
      completer.completeError(error, stackTrace);
    }
  }
}

In another place I tried to wrap a CancelableOperation Function() where it needs some async work before forwarding to a delegate, which is equivalent to flattening a Future<CancelableOperation> to a CancelableOperation.

CancelableOperation<T> flatten<T>(
    Future<CancelableOperation<T>> eventualOperation) {
  CancelableOperation<T>? operation;
  final completer = CancelableCompleter<T>(onCancel: () => operation?.cancel());
  eventualOperation.then((result) {
    operation = result;
    return result.valueOrCancellation().then((innerResult) {
      if (completer.isCanceled) return;
      if (result.isCompleted && !result.isCanceled) {
        completer.complete(innerResult);
      } else {
        completer.operation.cancel();
      }
    });
  }, onError: (Object error, StackTrace stackTrace) {
    if (completer.isCompleted) return;
    completer.completeError(error, stackTrace);
  });
  return completer.operation;
}

This is coming up because I'm exploring the impact of making BaseClient.send return CancelableOperation instead of Future for dart-lang/http#424, and many of the clients that wrap a delegate feel more complex after the change.

I think the flatten utility could be good to add to this package. The retry here is simplified and wouldn't work for what we need - I'm not sure it's wise to try to write a general enough retry though.

cc @lrhn - Can you think of any simpler way to translate these patterns? Can you think of any other building blocks we could add to CancelableOperation to make them easier? I wonder if something similar to .then taking a CancelableOperation<R> Function(T) onValue instead of a FutureOr<R> Function(T) onValue would help?

@lrhn
Copy link
Member

lrhn commented Mar 17, 2022

Ideally, you should never have a Future<CancelableOperation> to begin with, like you should never have a Future<Future>.
Can you say something about the code that needed it? (Why could it not just return a CancelableOperation directly?)

I think returning CancelableOperationr<R> from the callbacks of a cancelableThen would make sense.
That would make it possible to cancel not just before the callback, but also during it. (I guess the callback then needs to find a way to not complete).
We don't have any CancelableOperationOr type, so we should probably have CancelableOperation.value, CancelableOperation.error and CancelableOperation.canceled constructors, along with CancelableOperation.fromFuture that we already have. That should make it easy (but verbose) to inject results into CancelableOperation.

Or maybe it should be:

CancelableOperation<R> thenOperation<R>(void Function(T, CancelableCompleter<R>) onValue,
  {void Function(Object, StackTrace, CancelableCompleter<R>) onError, 
    void Function(CancelableCompleter<R>) onCancel});

because having three outcomes means representing them by return or throw is not sufficient. That works for then where the callbacks can return value or throw (or return a future which completes with a value or error). Here, the operations can also be cancelled, which means not returning at all, which is tricky. Not calling the completer seems eaasier.

It's never going to be as convenient as async/await.

@lrhn
Copy link
Member

lrhn commented Mar 17, 2022

My flatten would be very similar:

CancelableOperation<T> flatten<T>(Future<CancelableOperation<T>> nested) {
  CancelableOperation? operation;
  var completer = CancelableCompleter<T>(onCancel: () {
     operation?.cancel();
  });
  nested.then<void>((o) {
    if (completer.isCanceled) {
      o.cancel();
      o.value.ignore(); // In case it has already completed with an error.
      return;
    }
    operation = o;
    //  // if only we could do this, then the `operation` variable wouldn't be needed:
    //  completer.onCancel = o.cancel;
    o.then<void>(completer.complete, onError: completer.completeError, onCancel: completer.operation.cancel);
  } , onError: (e, s) {
    completer.completeError(e, s);
  });
  return completer.operation;
}

What we need might be a void completeOperation(CancelableOperation operation) on CancelableCompleter.
Something like (untested, obviously):

class CancelableCompleter<T> ...  {
  /// Makes this [CancelableCompleter.operation] complete with the same result as [operation].
  ///
  /// If the operation of this completer is cancelled before [operation] completes,
  /// then if [propagateCancel] is set to true, [operation] is also cancelled.
  void completeOperation(CancelableOperation operation, {bool propagateCancel = false}) {
    if (!_mayComplete) throw StateError("Already completed");
    _mayComplete = false;
    if (isCanceled) {
      if (propagateCancel) operation.cancel();
      operation.value.ignore();
      return;
    }
    operation.then<void>((value) {
      _inner?.complete(value); // _inner is set to null if this.operation is cancelled.
    }, onError: (error, stack) {
      _inner?.completeError(error, stack);
    }, onCancel: () {
      this.operation.cancel();
    });
    if (propagateCancel) {
      _cancelCompleter.future.whenComplete(operation.cancel);
    }
}

Then flatten would be:

CancelableOperation<T> flatten<T>(Future<CancelableOperation<T>> operation) {
  var completer = CancelableCompleter<T>();
  operation.then(completer.completeOperation);
  return completer.operation;
}

@natebosch
Copy link
Member Author

Ideally, you should never have a Future<CancelableOperation> to begin with, like you should never have a Future<Future>.
Can you say something about the code that needed it? (Why could it not just return a CancelableOperation directly?)

I modeled it as a Future<CancelableOperation> because that seemed like an easier utility to document and it's equivalent to any situation where you need to do some async work before a cancelable operation, but want to return a synchronous cancelable operation.

The real situation was a custom Client implementation that checks for fresh auth credentials before each request. Refreshing auth credentials is async. When send returns a Future<StreamedResponse> the implementation is trivial. (Race condition while creating the client is an existing issue that I'm ignoring here).

  @override
  Future<StreamedResponse> send(BaseRequest request) async {
    if (!credentials.accessToken.hasExpired) {
      return _authClient.send(request);
    }
    _authClient = await _createAuthorizedClient();
    return _authClient.send(request); 
  }

Without await it's still very readable.

  @override
  Future<StreamedResponse> send(BaseRequest request) {
    if (!credentials.accessToken.hasExpired) {
      return _authClient.send(request);
    }
    return _createAuthorizedClient().then((client) {
      _authClient = client;
      return client.send(request);
    });
  }

Once I make it return CancelableOperation the complexity jumps.

  @override
  CancelableOperation<StreamedResponse> send(BaseRequest request) {
    if (!credentials.accessToken.hasExpired) {
      return _authClient.send(request);
    }
    CancelableOperation<StreamedResponse>? operation;
    final completer = CancelableCompleter<StreamedResponse>(
        onCancel: () => operation?.cancel());
    createAuthorizedClient().then((client) {
      _authClient = client;
      operation = client.send(request);
      return operation!.valueOrCancellation().then((response) {
        if (completer.isCanceled) return;
        if (operation!.isCompleted && !operation!.isCanceled) {
          completer.complete(response);
        } else {
          completer.operation.cancel();
        }
      });
    }, onError: (Object error, StackTrace stackTrace) {
      if (completer.isCanceled) return;
      completer.completeError(error, stackTrace);
    });
    return completer.operation;
  }

@natebosch
Copy link
Member Author

natebosch commented Mar 17, 2022

I do like the combination of

  CancelableOperation<R> thenOperation<R>(
      void Function(T, CancelableCompleter<R>) onValue,
      {void Function(Object, StackTrace, CancelableCompleter<R>)? onError,
      void Function(CancelableCompleter<R>)? onCancel}) {

With Completer.completeOperation.

With these the solution feels pretty similar to the Future.then version.

  @override
  CancelableOperation<StreamedResponse> send(BaseRequest request) {
    if (!credentials.accessToken.hasExpired) {
      return _authClient.send(request);
    }
    return CancelableOperation.fromFuture(
            _createAuthorizedClient().then((client) => _authClient = client))
        .thenOperation((client, completer) {
      completer.completeOperation(client.send(request));
    });
  }

@natebosch
Copy link
Member Author

I was just thinking that it's too bad the more general version can't be named .then instead of .thenOperation... it turns out I'm the one to blame 😢
#81 (comment)

I do think that the void Function(T, CancelableCompleter<R>) signature is likely to be nicer than CancelableOperation<R> Function(T) in practice, as long as we add CancelableCompleter.completeOperation.

natebosch added a commit that referenced this issue Mar 18, 2022
Towards #210

Once combined with `Completer.completeOperation` this signature adds
significant flexibility for chaining Cancelable work following other
async work.

Move the existing `.then` operation to `.thenOperation` since the latter
is more general.
@natebosch
Copy link
Member Author

We have a PR in RetryClient, which is interesting to consider.
dart-lang/http#679

I wondered what it would look like to need to migrate this to a retry around CancelableOperation and take each of those new await into account, where each could be interrupted by cancellation. (I think the _onRetry call would be OK with no await, and it might not be crucial that these are all points that cancellation could interrupt work)

It isn't nice to read, but I don't think it's too far off of what I'd write without await over Future, and the proposed two new APIs are the crucial point to make it easier.

  @override
  CancelableOperation<StreamedResponse> send(BaseRequest request) =>
      _send(request, StreamSplitter(request.finalize()), 0);

  CancelableOperation<StreamedResponse> _send(
      BaseRequest request, StreamSplitter<List<int>> splitter, int attempt) {

    void retry(StreamedResponse? response,
        CancelableCompleter<StreamedResponse> completer) {
      final work = _onRetry?.call(request, response, attempt);
      if (work is Future<void>) {
        work.then((_) {
          completer.completeOperation(_send(request, splitter, ++attempt));
        });
      } else {
        completer.completeOperation(_send(request, splitter, ++attempt));
      }
    }

    return _inner.send(_copyRequest(request, splitter.split())).thenOperation(
        (response, completer) {
      if (attempt == _retries) {
        completer.complete(response);
        return;
      }
      void act(bool when) {
        if (when) {
          retry(response, completer);
        } else {
          completer.complete(response);
        }
      }

      final when = _when(response);
      if (when is Future<bool>) {
        when.then(act);
      } else {
        act(when);
      }
    }, onError: (error, stackTrace, completer) {
      if (attempt == _retries) {
        completer.completeError(error, stackTrace);
        return;
      }
      void act(bool when) {
        if (when) {
          retry(null, completer);
        } else {
          completer.completeError(error, stackTrace);
        }
      }

      final when = _whenError(error, stackTrace);
      if (when is Future<bool>) {
        when.then(act);
      } else {
        act(when);
      }
    });
  }

@lrhn
Copy link
Member

lrhn commented Mar 21, 2022

The issue here is that a cancelable operation can complete in three ways, just not two like a Future. So, we can't just use await which is hard-wired to future's two options of "value" or "throw".

An interesting thought is that Future combines two monads: The "delayed" monad and the error monad. Or, seen differently, it's a delayed sum type (of VALUE(v) | ERROR(e, s)) where the await reifies the VALUE as a value and the ERROR as a throw.

The cancelable operation is also a delayed sum type, just with three summands (value/error/cancel), and the then callback therefore has three cases to handle.

What if we had a lower-level "wait" operation on a DelayedValue-type which only has DelayedValue<R> then<R>(DelayedValue<R> Function(T) handler), and which can only just wait for a result, not an error. Then await e is really defined as let x = wait e in switch x {case Value(v) => v; case Error(e, s) => Error.throwWithStackTrace(e, s)} on a DelayedValue<ValueOrError<T>>.
(Using let, sum types, pattern matching and general-wait, so all new syntax 😁).

Anyway, for the retry, I'd consider just ignoring the synchronous case and always await the FutureOr.
Then you can also combine the two calls to completer.complete*:

    void retry(StreamedResponse? response,
        CancelableCompleter<StreamedResponse> completer) async {
      await _onRetry?.call(request, response, attempt);
      completer.completeOperation(_send(request, splitter, ++attempt));
    }

    return _inner.send(_copyRequest(request, splitter.split())).thenOperation(
      (response, completer) async {
         if (attempt != _retries && await _when(response)) {
          await retry(response, completer);
        } else {
          completer.complete(response);
        }
    }, onError: (error, stackTrace, completer) async {
        if (attempt != _retries && await _whenError(error, stackTrace) {
          await retry(null, completer);
        } else {
          completer.completeError(error, stackTrace);
        }
      }
  });

If something allows returning either a future or a value, I treat it as an asynchronous computation which allows the user to omit wrapping the result in a future. I won't do extra work to ensure that everything runs synchronously if the return value is not a future.

I made retry use await too, because otherwise an error in

        work.then((_) {
          completer.completeOperation(_send(request, splitter, ++attempt));

would be uncaught. If that's deliberate and documented, then it's fine.

@natebosch
Copy link
Member Author

Anyway, for the retry, I'd consider just ignoring the synchronous case and always await the FutureOr.
Then you can also combine the two calls to completer.complete*:

Yeah, we can simplify the implementation if we don't allow cancelling work in between each of the callbacks. I was curious whether there are other building blocks that would make even an extreme case more readable.

For now I'll plan on pushing through with the two APIs we've discussed so far.

@natebosch
Copy link
Member Author

natebosch commented Mar 31, 2022

Another use case that came up is wanting to add an extra onCancel behavior to an existing CancelableOperation. There is a usage of CancelableOperation.then where it seems they expect the onCancel callback to be called when the returned operation is canceled, in order to update the UI.

We should consider adding a CancelableOperation.onCancel(void Function()) method.

@natebosch natebosch added the type-enhancement A request for a change that isn't a bug label Apr 13, 2022
natebosch added a commit that referenced this issue Apr 18, 2022
Towards #210

Once combined with `Completer.completeOperation` this signature adds
significant flexibility for chaining Cancelable work following other
async work.

Move the existing `.then` implementation to `.thenOperation` since the latter
is more general.

Co-authored-by: Lasse R.H. Nielsen <[email protected]>
natebosch added a commit that referenced this issue Apr 18, 2022
Towards #210

Combined with `CancelableOperation.thenOperation` this allows chaining
cancelable work that can be canceled at multiple points.
natebosch added a commit that referenced this issue Apr 19, 2022
Towards #210

Combined with `CancelableOperation.thenOperation` this allows chaining
cancelable work that can be canceled at multiple points.
@dnys1
Copy link

dnys1 commented Oct 12, 2022

Thanks for the great discussion here. I've been looking into using CancelableOperation and ran into many of the same challenges. @natebosch - are the two proposed APIs finalized, do you think? Would there be a possibility of publishing them, if so? Thanks again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type-enhancement A request for a change that isn't a bug
Projects
None yet
Development

No branches or pull requests

3 participants