-
Notifications
You must be signed in to change notification settings - Fork 51
ReactiveSeq : Examples
This is a list of some example Stream operations available in Cyclops. ReactiveSeq provides an interface over an augmented java.util.stream.Stream with a host of additional operators (it also implements org.jooq.lambda.Seq). Most of the same operations are available on standard JDK 8 Streams via the static utility methods in StreamUtils. There is also an equivalent StreamUtils class that can augment Javaslang Streams. Used in conjunction with Lombok ExtensionMethods this can even add methods directly to the Javaslang or JDK Stream interface.
These methods are available via ReactiveSeq or to plain JDK 8 Streams via com.aol.cyclops.streams.StreamUtils, for Javaslang Streams use com.aol.cyclops.javaslang.streams.StreamUtils.
A HotStream starts emitting values imediatately. To write to the console, in a similar manner to forEach, but still be 'connnectable' or non-terminal.
Executor exec = Executors.newFixedThreadPool(1);
ReactiveSeq.of(1,2,3)
.peek(v->v+1)
.peek(System.out::println)
.hotStream(exec);
In the example below 5,000 entries will be written out on the HotStreams executing thread, and 100 of those will also be written out on the current thread.
ReactiveSeq.range(0,Integer.MAX_VALUE)
.limit(5000)
.peek(System.out::println)
.hotStream(exec)
.connect()
.limit(100)
.forEach(next->System.out.println("Current thread : " + next);
The futureOperations operator moves Stream execution to another thread.
ReactiveSeq.of(1,2,3,4,5).futureOperations(exec).max((t1,t2) -> t1-t2);
//CompletableFuture[5]
ReactiveSeq.of(1,2,3,4,5).futureOperations(exec).min((t1,t2) -> t1-t2);
//CompletableFuture[1]
ReactiveSeq.of(1,2,3,4,5).map(it -> it*100).futureOperations(exec)
.reduce( (acc,next) -> acc+next);
//CompletableFuture[1500]
ReactiveSeq.of(1,2,3,4,5).filter(it -> it <3).futureOperations(exec)
.findFirst();
//CompletableFuture[1]
ReactiveSeq.of(1,2,3,5,6,7).futureOperations(exec).xMatch(3, i-> i>4 );
//true
ReactiveSeq.of(1,2,3,4,5,6)
.groupedWhile(i->i%3!=0)
.toList()
//[1,2,3],[4,5,6]
ReactiveSeq.of(1,2,3,4,5,6)
.groupedUntil(i->i%3==0,()->new ArrayList<>())
.toList().size()
//[1,2,3],[4,5,6]
ReactiveSeq.of(1,2,3,4,5, 6)
.map(n-> n==6? sleep(1) : n)
.groupedByTime(10,TimeUnit.MICROSECONDS)
.toList()
//[1,2,3,4,5],[6]
ReactiveSeq.of(1,2,3,4,5, 6)
.map(n-> n==6? sleep(1) : n)
.groupedBySize(4)
.toList()
//[1,2,3,4],[5,6]
Create a sliding list over the data in a Stream (unlike window which produces a Streamable, sliding produces a List)
ReactiveSeq.of(1, 2, 3, 4, 5, 6).sliding(2).collect(Collectors.toList())
//[1,2],[2,3],[3,4],[4,5],[5,6]
List<List<Integer>> list = ReactiveSeq.of(1, 2, 3, 4, 5, 6).sliding(3, 2).collect(Collectors.toList());
//[[1, 2, 3], [3, 4, 5], [5, 6]]
ReactiveSeq.of(1, 2, 3, 4, 5, 6).grouped(3).collect(Collectors.toList());
//[[1, 2, 3], [4, 5, 6]]
ReactiveSeq.of(1,2,3,4,5,6).zip(ReactiveSeqof(100,200,300,400))
.collect(Collectors.toList());
//[(1, 100), (2, 200), (3, 300), (4, 400)]
ReactiveSeq.of(1,2,3,4,5,6).zip3(ReactiveSeq.of(100,200,300,400),ReactiveSeq.of('a','b','c'))
.collect(Collectors.toList());
//[(1, 100, a), (2, 200, b), (3, 300, c)]
ReactiveSeq.of(1,2,3,4,5,6).zip4(ReactiveSeq.of(100,200,300,400),ReactiveSeq.of('a','b','c'),ReactiveSeq.of("hello","world"))
.collect(Collectors.toList());
//[(1, 100, a, hello), (2, 200, b, world)]
ReactiveSeq.of(new Tuple2(1, "a"), new Tuple2(2, "b"), new Tuple2(3, "c"));
//Tuple2[ReactiveSeq[1,2,3],ReactiveSeq[a,b,c]]
ReactiveSeq.of('a','b','c').zipWithIndex()
//ReactiveSeq[Tuple['a',0],Tuple['b',1],Tuple['c',2]]
ReactiveSeq.fromIntStream(IntStream.range(0, 1000))
.map(it -> System.currentTimeMillis())
.jitter(10_000l)
.forEach(System.out::println);
//random wait up to 10 seconds between each value being printed
ReactiveSeq.fromIntStream(IntStream.range(0, 1000))
.fixedDelay(1l, TimeUnit.MICROSECONDS)
.forEach(System.out::println)
//wait 1 second between each value being printed
ReactiveSeq.iterate(0, it -> it + 1)
.limit(100)
.onePer(1, TimeUnit.MICROSECONDS)
.map(seconds -> "hello!")
.peek(System.out::println)
.toList();
//one value emitted per second
ReactiveSeq<String> helloWorld = anyM("hello","world","last").toSequence();
HeadAndTail<String> headAndTail = helloWorld.headAndTail();
String head = headAndTail.head();
//hello
ReactiveSeq<String> tail = headAndTail.tail();
//[world,last]
Get at 0, this extracts the first value and returns a Stream of the remaining values (as a Tuple2)
ReactiveSeq.of(1,2,3,4).get(0)
//[1],ReactiveSeq[2,3,4]
Get at 1
ReactiveSeq.of(1,2,3,4).get(1)
//[2],ReactiveSeq[1,3,4]
Returns an optional containing the element at index (if exists) otherwise optional empty
ReactiveSeq.of(1).elementAt(0)
//Optional[1]
ReactiveSeq.of().elementAt(0).isPresent()
//false
ReactiveSeq.of(1,2,3,4)
.map(u->{throw new RuntimeException();})
.recover(e->"hello")
.firstValue()
//hello
ReactiveSeq.of(1,2,3,4)
.map(i->i+2)
.map(u->{ExceptionSoftener.throwSoftenedException( new IOException()); return null;})
.recover(IOException.class,e->"hello")
.firstValue()
//hello
ReactiveSeq.of( 1, 2, 3)
.retry(this::remoteCall)
.map(this::continueProcessing)
//if remote call fails, it will be retried with a backoff
ReactiveSeq.range(1,1_000_000)
.peek(i->sleep(i*100))
.limit(1000,TimeUnit.MILLISECONDS)
.toList()
//takes from the range for 1,000ms (1 sec)
ReactiveSeq.range(1,Integer.MAX_VALUE)
.peek(i->sleep(i*100))
.skip(1000,TimeUnit.MILLISECONDS)
.toList()
//skips values from the range until 1 second has elapsed, then accept values
Skip the specified number of entries from the end of the stream
ReactiveSeq.of(1,2,3,4,5)
.skipLast(2)
.collect(Collectors.toList());
//List[1,2,3]
ReactiveSeq.of(1,2,3,4,5)
.limitLast(2)
.collect(Collectors.toList())
//List[4,5]
ReactiveSeq.of(1, 2, 3, 4, 5).skipWhile(i->i<5);
//ReactiveSeq[5]
ReactiveSeq.of(1, 2, 3, 4, 5).limitWhile(i->i<5);
//ReactiveSeq[1,2,3,4]
ReactiveSeq.of(1, 2, 3, 4, 5).skipUntil(i->i==4);
//ReactiveSeq[4,5]
ReactiveSeq.of(1, 2, 3, 4, 5).limitWhile(i->i==4);
//ReactiveSeq[1,2,3]
ReactiveSeq.of(1,2,3,4,5,6)
.endsWith(Arrays.asList(5,6))
//true
ReactiveSeq.of(1,2,3,4,5,6)
.endsWith(Stream.of(5,6))
//true
ReactiveSeq.of(1,2,3,4,5,6)
.startsWith(Arrays.asList(5,6))
//false
ReactiveSeq.of(1,2,3,4,5,6)
.startsWith(Stream.of(1,2))
//true
Streamable<Integer> repeat = ReactiveSeq.of(1,2,3,4,5,6)
.map(i->i*2)
.toStreamable();
repeat.ReactiveSeq().toList(); //List[2,4,6,8,10,12]
repeat.ReactiveSeq().toList()//List[2,4,6,8,10,12]
Streamable<Integer> repeat = ReactiveSeq.of(1,2,3,4,5,6)
.map(i->i*2)
.toConcurrentLazyStreamable();
Thread1 : repeat.ReactiveSeq().toList() //List[2,4,6,8,10,12]
Thread2 : repeat.ReactiveSeq().toList() //List[2,4,6,8,10,12]
Lazy collection only materializes the Stream when the returned collection is accessed. If it is never accessed, the Stream is never executed, if it is only partially accessed - the Stream is only partially processed.
Collection<Integer> col = ReactiveSeq.of(1,2,3,4,5)
.peek(System.out::println)
.toLazyCollection();
for(Integer next : col){
System.out.println(next); //stream is executed here.
}
Involves a synchronization overhead, but the Lazy Collection can be shared across threads.
Collection<Integer> col = ReactiveSeq.of(1,2,3,4,5)
.peek(System.out::println)
.toConcurrentLazyCollection();
ReactiveSeq.of(1,2,3,4,5)
.timestamp()
//[1,timestampInMillis],[2,timestampInMillis],[3,timestampInMillis] etc
(Elasped time between each emission)
ReactiveSeq.of(1,2,3,4,5).elapsed().noneMatch(t->t.v2<0)
Tuple2<ReactiveSeq<Integer>, ReactiveSeq<Integer>> copies =ReactiveSeq.of(1,2,3,4,5,6).duplicateSequence();
Tuple3<ReactiveSeq<Integer>, ReactiveSeq<Integer>, ReactiveSeq<Integer>> copies =ReactiveSeq.of(1,2,3,4,5,6).triplicate();
Tuple4<ReactiveSeq<Integer>, ReactiveSeq<Integer>, ReactiveSeq<Integer>,ReactiveSeq<Integer>> copies =ReactiveSeq.of(1,2,3,4,5,6).quadruplicate();
List<String> result = ReactiveSeq.of(1,2,3).prepend(100,200,300)
.map(it ->it+"!!").collect(Collectors.toList());
List<String> result = ReactiveSeq.of(1,2,3).prependStream(ReactiveSeq.of(100,200,300))
.map(it ->it+"!!").collect(Collectors.toList());
//["100!!","200!!","300!!","1!!","2!!","3!!"]
List<String> result = ReactiveSeq.of(1,2,3).append(100,200,300)
.map(it ->it+"!!").collect(Collectors.toList());
List<String> result = ReactiveSeq.of(1,2,3).appendStream(ReactiveSeq.of(100,200,300))
.map(it ->it+"!!").collect(Collectors.toList());
//["1!!","2!!","3!!","100!!","200!!","300!!"]
List<String> result = ReactiveSeq.of(1,2,3).insertAt(1,100,200,300)
.map(it ->it+"!!").collect(Collectors.toList());
List<String> result = ReactiveSeq.of(1,2,3).insertStreamAt(1,ReactiveSeq.of(100,200,300))
.map(it ->it+"!!").collect(Collectors.toList());
//["1!!","100!!","200!!","300!!","2!!","3!!"]
List<String> result = ReactiveSeq.of(1,2,3,4,5,6).deleteBetween(2,4)
.map(it ->it+"!!").collect(Collectors.toList());
//["1!!","2!!","5!!","6!!"]
ReactiveSeq.of(1, 2, 3, 4, 5, 6).splitBy(i -> i % 2 != 0)
//tuple[ReactiveSeq[1,3,5],ReactiveSeq[2,4,6]]
ReactiveSeq.of(1, 2, 3, 4, 5, 6).splitAt(2)
//tuple[ReactiveSeq[1,2,3],ReactiveSeq[4,5,6]]
ReactiveSeq.of("a", "b", "c").scanLeft("", String::concat).toList()
//List("", "a", "ab", "abc")
ReactiveSeq.of("a", "ab", "abc").map(str -> str.length()).scanLeft(0, (u, t) -> u + t).toList(),
//List(0, 1, 3, 6)))
ReactiveSeq.of("a", "b", "c").scanLeft(Reducers.toString("")).toList()
//List("", "a", "ab", "abc")
ReactiveSeq.of("a", "ab", "abc").map(str -> str.length()).scanLeft(Reducers.toTotalInt()).toList()
//List(0, 1, 3, 6)));
ReactiveSeq.of("a", "b", "c").scanRight("", String::concat).toList()
//List("", "c", "bc", "abc")
ReactiveSeq.of("a", "ab", "abc").map(str -> str.length()).scanRight(0, (t, u) -> u + t).toList()
//List(0, 3, 5, 6)
ReactiveSeq.of("a", "b", "c").scanRight(Reducers.toString("")).toList()
//List("", "c", "bc", "abc")
ReactiveSeq.of("a", "ab", "abc").map(str -> str.length()).scanRight(Reducers.toTotalInt()).toList()
//List(0, 3, 5, 6)
ReactiveSeq.of("hello","2","world","4").mapReduce(Reducers.toCountInt())
//4
ReactiveSeq.of("one","two","three","four").mapReduce(this::toInt,Reducers.toTotalInt())
//10
ReactiveSeq.of("hello","2","world","4").join(",");
ReactiveSeq.of("hello","2","world","4").reduce(Reducers.toString(",");
//",hello,2,world,4"
List result = ReactiveSeq.of(1,2,3)
.collectStream(Stream.of(Collectors.toList(),Collectors.summingInt(Integer::intValue),Collectors.averagingInt(Integer::intValue)));
//List[List(1,2,3),6.2.0]
ReactiveSeq.of(1, 2, 3).join()
//"123"
ReactiveSeq.of(1, 2, 3).join(", ")
//"1, 2, 3"
ReactiveSeq.of(1, 2, 3).join("|", "^", "$")
"^1|2|3$"
Monoid<Integer> sum = Monoid.of(0,(a,b)->a+b);
Monoid<Integer> mult = Monoid.of(1,(a,b)->a*b);
List<Integer> result = ReactiveSeq.of(1,2,3,4)).reduce(Arrays.asList(sum,mult) );
//List[10,24]
ReactiveSeq.of(1,2,3).toList()
//List[1,2,3]
ReactiveSeq.of(1,2,3,1,2,3).toSet()
//Set[1,2,3]
ReactiveSeq.of(1,2,3).toMap(v->"key:"+v,v->v)
//Map["key:1":1,"key:2":2,"key:3":3]
ReactiveSeq.of("a","b","c").foldRight(Reducers.toString(""))
//"cba"
ReactiveSeq.of("a","b","c").foldLeft(Reducers.toString(""))
//"abc"
ReactiveSeq.range(0,10).skip(8).reverse()
//ReactiveSeq[10,9]
ReactiveSeq.reversedOf(1,2)
.toList()
//List[2,1]
List<Integer> list= Arrays.asList(1,2);
ReactiveSeq.reversedListOf(list)
.toList()
//List[2,1]
file://input.file ={
hello
world
}
ReactiveSeq.of("input.file")
.map(getClass().getClassLoader()::getResource)
.peek(System.out::println)
.map(URL::getFile)
.flatMapFile(File::new)
.toList();
//List["hello","world"]
ReactiveSeq.of("input.file")
.flatMapURL(getClass().getClassLoader()::getResource)
.toList();
//List["hello","world"]
ReactiveSeq.of("input.file")
.flatMapCharSequence(i->"hello world")
.toList()
//List['h','e','l','l','o',' ','w','o','r','l','d']
ReactiveSeq.of("input.file")
.map(getClass().getClassLoader()::getResourceAsStream)
.map(InputStreamReader::new)
.flatMapBufferedReader(BufferedReader::new)
.toList()
//List["hello","world"]
ReactiveSeq.of(1,2,3,null)
.flatMapOptional(Optional::ofNullable)
.collect(Collectors.toList())
//List[1,2,3]
ReactiveSeq.of(1,2,3)
.flatMapCompletableFuture(i->CompletableFuture.completedFuture(i+2))
.collect(Collectors.toList())
//List[1,2,3]
ReactiveSeq.of(1,2,3,null)
.flatMapCollection(i->Arrays.asList(1,2,i))
.collect(Collectors.toList())
//List[1,2,1,1,2,2,1,2,3]
//Optional[1]
ReactiveSeq.of(1).singleOptional();
//Optional.empty
ReactiveSeq.of().singleOpional();
//Optional.empty
ReactiveSeq.of(1,2,3).singleOptional();
//1
ReactiveSeq.of(1).single();
//NoSuchElementException
ReactiveSeq.of().single();
//NoSuchElementException
ReactiveSeq.of(1,2,3).single();