Skip to content

A simple API, and a Rich API

johnmcclean-aol edited this page Nov 23, 2016 · 9 revisions

SimpleReact Streams

LazyFutureStream has all the functionality in this page.

SimpleReactStream is a simpler, more focused API with only the SimpleReact Core API features (and some extensions).

SimpleReact Core API

The core SimpleReact API remains very simple. Although it has expanded slightly since the initial release it is today :-

  • with
  • then
  • doOnEach
  • retry
  • onFail
  • capture
  • block
  • allOf
  • anyOf
  • run
  • toQueue
  • flatMap
  • peek
  • filter
  • merge

These are the concurrent non-blocking operations (except for block!) that represent the core of the API.

#java.util.stream.Stream

With SimpleReact v0.3 we have also added all the methods of the Stream api to this 👍

  • filter(Predicate<? super T>)
  • map(Function<? super T, ? extends R>)
  • mapToInt(ToIntFunction<? super T>)
  • mapToLong(ToLongFunction<? super T>)
  • mapToDouble(ToDoubleFunction<? super T>)
  • flatMap(Function<? super T, ? extends Stream<? extends R>>)
  • flatMapToInt(Function<? super T, ? extends IntStream>)
  • flatMapToLong(Function<? super T, ? extends LongStream>)
  • flatMapToDouble(Function<? super T, ? extends DoubleStream>)
  • distinct()
  • sorted()
  • sorted(Comparator<? super T>)
  • peek(Consumer<? super T>)
  • limit(long)
  • skip(long)
  • forEach(Consumer<? super T>)
  • forEachOrdered(Consumer<? super T>)
  • toArray()
  • toArray(IntFunction<A[]>)
  • reduce(T, BinaryOperator)
  • reduce(BinaryOperator)
  • reduce(U, BiFunction<U, ? super T, U>, BinaryOperator)
  • collect(Supplier, BiConsumer<R, ? super T>, BiConsumer<R, R>)
  • collect(Collector<? super T, A, R>)
  • min(Comparator<? super T>)
  • max(Comparator<? super T>)
  • count()
  • anyMatch(Predicate<? super T>)
  • allMatch(Predicate<? super T>)
  • noneMatch(Predicate<? super T>)
  • findFirst()
  • findAny()
  • builder()
  • empty()
  • of(T)
  • of(T...)
  • iterate(T, UnaryOperator)
  • generate(Supplier)
  • concat(Stream<? extends T>, Stream<? extends T>)

#org.jooq.lambda.Seq

And we have also implemented Seq, which adds the following functions

  • stream()
  • concat(Stream)
  • concat(T)
  • concat(T...)
  • cycle()
  • zip(Seq)
  • zip(Seq, BiFunction<T, U, R>)
  • zipWithIndex()
  • foldLeft(U, BiFunction<U, ? super T, U>)
  • foldRight(U, BiFunction<? super T, U, U>)
  • scanLeft(U, BiFunction<U, ? super T, U>)
  • scanRight(U, BiFunction<? super T, U, U>)
  • reverse()
  • shuffle()
  • shuffle(Random)
  • skipWhile(Predicate<? super T>)
  • skipUntil(Predicate<? super T>)
  • limitWhile(Predicate<? super T>)
  • limitUntil(Predicate<? super T>)
  • intersperse(T)
  • duplicate()
  • partition(Predicate<? super T>)
  • splitAt(long)
  • splitAtHead()
  • slice(long, long)
  • toCollection(Supplier)
  • toList()
  • toSet()
  • toMap(Function<T, K>, Function<T, V>)
  • toString(String)
  • minBy(Function<T, U>)
  • minBy(Function<T, U>, Comparator<? super U>)
  • maxBy(Function<T, U>)
  • maxBy(Function<T, U>, Comparator<? super U>)
  • ofType(Class)
  • cast(Class)
  • groupBy(Function<? super T, ? extends K>)
  • groupBy(Function<? super T, ? extends K>, Collector<? super T, A, D>)
  • groupBy(Function<? super T, ? extends K>, Supplier, Collector<? super T, A, D>)
  • join()
  • join(CharSequence)
  • join(CharSequence, CharSequence, CharSequence)
  • of(T)
  • of(T...)
  • empty()
  • iterate(T, UnaryOperator)
  • generate()
  • generate(T)
  • generate(Supplier)
  • seq(Stream)
  • seq(Iterable)
  • seq(Iterator)
  • seq(Map<K, V>)
  • seq(Optional)
  • cycle(Stream)
  • unzip(Stream<Tuple2<T1, T2>>)
  • unzip(Stream<Tuple2<T1, T2>>, Function<T1, U1>, Function<T2, U2>)
  • unzip(Stream<Tuple2<T1, T2>>, Function<Tuple2<T1, T2>, Tuple2<U1, U2>>)
  • unzip(Stream<Tuple2<T1, T2>>, BiFunction<T1, T2, Tuple2<U1, U2>>)
  • zip(Stream, Stream)
  • zip(Stream, Stream, BiFunction<T1, T2, R>)
  • zipWithIndex(Stream)
  • foldLeft(Stream, U, BiFunction<U, ? super T, U>)
  • foldRight(Stream, U, BiFunction<? super T, U, U>)
  • scanLeft(Stream, U, BiFunction<U, ? super T, U>)
  • scanRight(Stream, U, BiFunction<? super T, U, U>)
  • unfold(U, Function<U, Optional<Tuple2<T, U>>>)
  • reverse(Stream)
  • shuffle(Stream)
  • shuffle(Stream, Random)
  • concat(Stream...)
  • duplicate(Stream)
  • toString(Stream<?>)
  • toString(Stream<?>, String)
  • toCollection(Stream, Supplier)
  • toList(Stream)
  • toSet(Stream)
  • toMap(Stream<Tuple2<K, V>>)
  • toMap(Stream, Function<T, K>, Function<T, V>)
  • slice(Stream, long, long)
  • skip(Stream, long)
  • skipWhile(Stream, Predicate<? super T>)
  • skipUntil(Stream, Predicate<? super T>)
  • limit(Stream, long)
  • limitWhile(Stream, Predicate<? super T>)
  • limitUntil(Stream, Predicate<? super T>)
  • intersperse(Stream, T)
  • partition(Stream, Predicate<? super T>)
  • splitAt(Stream, long)
  • splitAtHead(Stream)
  • ofType(Stream, Class)
  • cast(Stream, Class)
  • groupBy(Stream, Function<? super T, ? extends K>)
  • groupBy(Stream, Function<? super T, ? extends K>, Collector<? super T, A, D>)
  • groupBy(Stream, Function<? super T, ? extends K>, Supplier, Collector<? super T, A, D>)
  • join(Stream<?>)
  • join(Stream<?>, CharSequence)
  • join(Stream<?>, CharSequence, CharSequence, CharSequence)
  • filter(Predicate<? super T>)
  • map(Function<? super T, ? extends R>)
  • mapToInt(ToIntFunction<? super T>)
  • mapToLong(ToLongFunction<? super T>)
  • mapToDouble(ToDoubleFunction<? super T>)
  • flatMap(Function<? super T, ? extends Stream<? extends R>>)
  • flatMapToInt(Function<? super T, ? extends IntStream>)
  • flatMapToLong(Function<? super T, ? extends LongStream>)
  • flatMapToDouble(Function<? super T, ? extends DoubleStream>)
  • distinct()
  • sorted()
  • sorted(Comparator<? super T>)
  • peek(Consumer<? super T>)
  • limit(long)
  • skip(long)
  • onClose(Runnable)
  • close()
  • sequential()
  • parallel()
  • unordered()
  • spliterator()
  • forEach(Consumer<? super T>)

#com.aol.cyclops.sequence.ReactiveSeq

* flatten()
* Optional<List<T>> toOptional();
* CompletableFuture<List<T>> toCompletableFuture();
* cycle(int times)
* cycle()
* cycle(Monoid<T> m, int times) ;
* <R> ReactiveSeq<R> cycle(Class<R> monadC, int times);
* ReactiveSeq<T> cycleWhile(Predicate<? super T> predicate);
* ReactiveSeq<T> cycleUntil(Predicate<? super T> predicate);
* <U> ReactiveSeq<Tuple2<T, U>> zipStream(Stream<U> other);
* <S,U> ReactiveSeq<Tuple3<T,S,U>> zip3(Stream<? extends S> second,Stream<? extends U> third);
*  <T2,T3,T4> ReactiveSeq<Tuple4<T,T2,T3,T4>> zip4(Stream<T2> second,Stream<T3> third,Stream<T4> fourth);
* <S, R> ReactiveSeq<R> zipSequence(ReactiveSeq<? extends S> second,
* 			BiFunction<? super T, ? super S, ? extends R> zipper)
* <S, R> ReactiveSeq<R> zipAnyM(AnyM<? extends S> second,
* 			BiFunction<? super T, ? super S, ? extends R> zipper) ;
*  <S, R> ReactiveSeq<R> zipStream(BaseStream<? extends S,? extends BaseStream<? extends S,?>> second,
* 			BiFunction<? super T, ? super S, ? extends R> zipper);
* Tuple2<ReactiveSeq<T>,ReactiveSeq<T>> duplicateSequence();
* Tuple3<ReactiveSeq<T>,ReactiveSeq<T>,ReactiveSeq<T>> triplicate();
* Tuple4<ReactiveSeq<T>,ReactiveSeq<T>,ReactiveSeq<T>,ReactiveSeq<T>> quadruplicate();
* Tuple2<Optional<T>,ReactiveSeq<T>> splitSequenceAtHead();
* Tuple2<ReactiveSeq<T>,ReactiveSeq<T>> splitBy(Predicate<T> splitter);
* ReactiveSeq<List<T>> sliding(int windowSize);
* ReactiveSeq<List<T>> sliding(int windowSize,int increment);
* ReactiveSeq<List<T>> grouped(int groupSize);
* ReactiveSeq<T> scanLeft(Monoid<T> monoid);
* ReactiveSeq<T> scanRight(Monoid<T> monoid);
* boolean xMatch(int num, Predicate<? super T> c);
* HeadAndTail<T> headAndTail();
*  Optional<HeadAndTail<T>> headAndTailOptional();
* <R> R mapReduce(Monoid<R> reducer);
* <R> R mapReduce(Function<? super T,? extends R> mapper, Monoid<R> reducer);
*  List collectStream(Stream<Collector> collectors);
* <R> List<R> collectIterable(Iterable<Collector> collectors);
* T reduce(Monoid<T> reducer);
*  List<T> reduce(Stream<Monoid<T>> reducers);
*  List<T> reduce(Iterable<Monoid<T>> reducers);
*  T foldLeft(Monoid<T> reducer);
* <T> T foldLeftMapToType(Monoid<T> reducer);
* T foldRight(Monoid<T> reducer);
* public <T> T foldRightMapToType(Monoid<T> reducer);
*  Streamable<T> toStreamable();
* <T> Stream<T> toStream();
*  startsWith(Iterable<T> iterable);
* startsWith(Iterator<T> iterator);
* AnyM<T> anyM();
* flatMapAnyM(Function<? super T,AnyM<? extends R>> fn);
* flatMapCollection(Function<? super T,Collection<? extends R>> fn);
*  flatMapStream(Function<? super T,BaseStream<? extends R,?>> fn);
* flatMapOptional(Function<? super T,Optional<? extends R>> fn) ;
* flatMapCompletableFuture(Function<? super T,CompletableFuture<? extends R>> fn);
* flatMapCharSequence(Function<? super T,CharSequence> fn);
* flatMapFile(Function<? super T,File> fn);
* flatMapURL(Function<? super T, URL> fn) ;
* flatMapBufferedReader(Function<? super T,BufferedReader> fn);
* Collection<T> toLazyCollection();
*  Collection<T> toConcurrentLazyCollection();
*  Streamable<T> toConcurrentLazyStreamable();
* ReactiveSeq<T> appendStream(Stream<T> stream);
* ReactiveSeq<T> prependStream(Stream<T> stream);
* ReactiveSeq<T> append(T... values);
* ReactiveSeq<T> prepend(T... values) ;
* ReactiveSeq<T> insertAt(int pos, T... values);
* ReactiveSeq<T> deleteBetween(int start,int end);
* ReactiveSeq<T> insertStreamAt(int pos, Stream<T> stream);
* FutureOperations<T> futureOperations(Executor exec);
* boolean endsWith(Iterable<T> iterable);
* boolean endsWith(Stream<T> stream);
* ReactiveSeq<T> skip(long time, final TimeUnit unit);
* ReactiveSeq<T> limit(long time, final TimeUnit unit);
* ReactiveSeq<T> skipLast(int num);
* ReactiveSeq<T> limitLast(int num);
* HotStream<T> hotStream(Executor e);
* T firstValue();
* T single()
* Optional<T> elementAt(long index)
* Tuple2<T,ReactiveSeq<T>> get(long index)
* ReactiveSeq<Tuple2<T,Long>> elapsed()
* ReactiveSeq<Tuple2<T,Long>> timestamp()
* <T> CyclopsSubscriber<T> subscriber()
* ReactiveSeq<T> xPer(int x, long time, TimeUnit t);
* ReactiveSeq<T> onePer(long time, TimeUnit t);
* ReactiveSeq<T> fixedDelay(long l, TimeUnit unit);
* ReactiveSeq<T> jitter(long maxJitterPeriodInNanos);
* ReactiveSeq<T> debounce(long time, TimeUnit t);
* ReactiveSeq<List<T>> batchBySizeAndTime(int size, long time, TimeUnit t);
* <C extends Collection<T>> ReactiveSeq<C> batchBySizeAndTime(int size,long time, TimeUnit unit, Supplier<C> factory);
* ReactiveSeq<List<T>> batchByTime(long time, TimeUnit t);
* <C extends Collection<T>> ReactiveSeq<C> batchByTime(long time, TimeUnit unit, Supplier<C> factory);
* ReactiveSeq<List<T>> batchBySize(int size);
* <C extends Collection<T>>ReactiveSeq<C> batchBySize(int size, Supplier<C> supplier);
* ReactiveSeq<Streamable<T>> windowBySizeAndTime(int maxSize, long maxTime, TimeUnit maxTimeUnit);
* ReactiveSeq<Streamable<T>> windowWhile(Predicate<T> predicate);
* ReactiveSeq<Streamable<T>> windowUntil(Predicate<T> predicate);
* ReactiveSeq<Streamable<T>> windowStatefullyWhile(BiPredicate<Streamable<T>,T> predicate);
* ReactiveSeq<Streamable<T>> windowByTime(long time, TimeUnit t);
* ReactiveSeq<List<T>> batchUntil(Predicate<T> predicate);
* ReactiveSeq<List<T>> batchWhile(Predicate<T> predicate);
* <C extends Collection<T>>  ReactiveSeq<C> batchWhile(Predicate<T> predicate, Supplier<C> factory);
* <C extends Collection<T>>  ReactiveSeq<C> batchUntil(Predicate<T> predicate, Supplier<C> factory);
* ReactiveSeq<T> recover(final Function<Throwable, T> fn);
* <EX extends Throwable> ReactiveSeq<T> recover(Class<EX> exceptionClass, final Function<EX, T> fn);
* <R> ReactiveSeq<R> retry(Function<T,R> fn)

Other operators

(many of these are also available on ReactiveSeq also)


zipping operators

  • combineLatest
  • withLatest

sharding operators :

  • shard (map, fn)

Control operators -

  • debounce
  • onePer
  • xPer
  • control (fn)
  • skipUntil (stream)
  • takeUntil (stream)
  • jitter
  • fixedDelay

Batching operators

  • batchBySize
  • batchByTime
  • batch (fn)

Chunking operators

  • chunkSinceLastRead
  • chunkSinceLastReadIterator

Futures operators

  • limitFutures

  • skipFutures

  • sliceFutures

  • duplicateFutures

  • partitionFutures

  • splitAtFutures

  • zipFutures

  • zipFuturesWithIndex

  • firstOf

batchBySizeAndTime : batches results until a time or size limit is reached

e.g. batch in 10's or whatever has returned within 5 seconds

               lazyReact.from(urls)
                        .map(this::load)
                        .batchBySizeAndTime(10,5,TimeUnit.SECONDS)
                        .toList();

switchOnNextValue : creates a new stream that takes the lasted value from a number of streams

        	LazyFutureStream<Integer> fast =  ... //  [1,2,3,4,5,6,7..]
        	LazyFutureStream<Integer> slow =  ... //  [100,200,300,400,500,600..]
	  
        	LazyFutureStream<Integer> merged = fast.switchOnNextValue(Stream.of(slow));  
        	//[1,2,3,4,5,6,7,8,100,9,10,11,12,13,14,15,16,200..] 

copy : copies a Stream the specified number of times

            LazyFutureStream.of(1,2,3,4,5,6)
				.map(i->i+2)
				.copy(5)
				.forEach(s -> System.out.println(s.toList()));

toLazyCollection : creates a Collection placeholder but doesn't block. EagerFutureStreams and SimpleReactStreams can populate the Collection asynchronously immediately and LazyFutureStreams won't populate it until a method is invoked

             Collection<Integer> col = LazyFutureStream.of(1,2,3,4,5,6)
				.map(i->i+2)
				.toLazyCollection();

toConcurrentLazyCollection : creates a lazy collection that can be shared across threads

           Collection<Integer> col = LazyFutureStream.of(1,2,3,4,5,6)
				.map(i->i+2)
				.toConcurrentLazyCollection(); 

firstValue : return the first value in a stream, must be present - no optional

           int first = LazyFutureStream.of(1,2,3,4)
                                        .firstValue();
	   
	    //first is 1

single : return a single entry, exception if no entries or multiple

          int num  =	LazyFutureStream.of(1)
                                        .single();
	  
	   //num is 1

futureOperations() & futureOperations(executor) - access terminal operations that return a future and execute asyncrhonously

          CompletableFuture<Integer>  sum = LazyFutureStream.of(1,2,3,4,5)
												.map(it -> it*100)
												.futureOperations()
												.reduce( 50,(acc,next) -> acc+next);

         //sum is CompletableFuture[1550]

sliding(size) : creates a sliding window over the data in the stream

          //futureStream 
Clone this wiki locally