Contents
Overview
Helidon has its own set of reactive operators that have no dependencies outside the Helidon ecosystem.
These operators can be used with java.util.concurrent.Flow based reactive streams.
Maven Coordinates
To enable Reactive Engine,
add the following dependency to your project’s pom.xml (see
Managing Dependencies).
<dependency>
<groupId>io.helidon.common</groupId>
<artifactId>helidon-common-reactive</artifactId>
</dependency>
Usage
The stream processing operator chain can be easily constructed by io.helidon.common.reactive.Multi, or
io.helidon.common.reactive.Single for streams with single value.
AtomicInteger sum = new AtomicInteger();
Multi.just("1", "2", "3", "4", "5")
.limit(3)
.map(Integer::parseInt)
.forEach(sum::addAndGet);
System.out.println("Sum: " + sum.get());
// >Sum: 6
Single.just("1")
.map(Integer::parseInt)
.map(i -> i + 5)
.toStage()
.whenComplete((i, t) -> System.out.println("Result: " + i));
// >Result: 6
defer |
Call the given supplier function for each individual downstream Subscriber to return a Flow.Publisher to subscribe to. |
map |
Map this |
defaultIfEmpty |
Signals the default item if the upstream is empty. |
switchIfEmpty |
Switch to the other publisher if the upstream is empty. |
peek |
Invoke provided consumer for every item in stream. |
distinct |
Filter out all duplicates. |
filter |
Filter stream items with provided predicate. |
takeWhile |
Take the longest prefix of elements from this stream that satisfy the given predicate. As long as predicate returns true, items from upstream are sent to downstream, when predicate returns false stream is completed. |
dropWhile |
Drop the longest prefix of elements from this stream that satisfy the given predicate. As long as predicate returns true, items from upstream are NOT sent to downstream but being dropped, predicate is never called again after it returns false for the first time. |
limit |
Limit stream to allow only specified number of items to pass. |
skip |
Skip first n items, all the others are emitted. |
flatMap |
Transform each upstream item with the supplied function into a |
flatMap |
Transform each upstream item with the supplied function and flatten the resulting |
flatMapCompletionStage |
Transform each upstream item with the supplied function and flatten the resulting |
flatMapIterable |
Transform each upstream item with the supplied function and flatten the resulting |
flatMapOptional |
Transform each upstream item with the supplied function and flatten the resulting |
observeOn |
Re-emit the upstream’s signals to the downstream on the given executor’s thread using a default buffer size of 32 and errors skipping ahead of items. |
observeOn |
Re-emit the upstream’s signals to the downstream on the given executor’s thread. |
forEach |
Terminal stage, invokes provided consumer for every item in the stream with no backpressure. |
forEachCompletionStage |
Terminal stage, invokes provided function for every item in the stream with strict backpressure, requests another item only when previous operation is finished. |
collectList |
Collect the items of this |
collect |
Collect the items of this |
collect |
Collect the items of this |
collectStream |
Collects up upstream items with the help of the callbacks of a |
reduce |
Combine subsequent items via a callback function and emit the final value result as a Single. |
reduce |
Combine every upstream item with an accumulator value to produce a new accumulator value and emit the final accumulator value as a Single. |
first |
Get the first item of this |
from |
Wrap a CompletionStage into a Multi and signal its outcome non-blockingly. |
from |
Wrap a CompletionStage into a Multi and signal its outcome non-blockingly. |
from |
Create a |
from |
Create a |
from |
Create a |
just |
Create a |
just |
Create a |
singleton |
Create a |
error |
Create a |
empty |
Get a |
never |
Get a |
concat |
Concat streams to one. |
onTerminate |
Executes given |
ifEmpty |
Executes given |
onComplete |
Executes given |
onError |
Executes the given java.util.function.Consumer when an onError signal is received. |
onCancel |
Executes given |
takeUntil |
Relay upstream items until the other source signals an item or completes. |
range |
Emits a range of ever-increasing integers. |
rangeLong |
Emits a range of ever-increasing longs. |
timer |
Signal 0L and complete the sequence after the given time elapsed. |
interval |
Signal 0L, 1L and so on periodically to the downstream. |
interval |
Signal 0L after an initial delay, then 1L, 2L and so on periodically to the downstream. |
timeout |
Signals a |
timeout |
Switches to a fallback source if the upstream doesn’t signal the next item, error or completion within the specified time. |
onErrorResume |
|
onErrorResumeWith |
Resume stream from supplied publisher if onError signal is intercepted. |
retry |
Retry a failing upstream at most the given number of times before giving up. |
retry |
Retry a failing upstream if the predicate returns true. |
retryWhen |
Retry a failing upstream when the given function returns a publisher that signals an item. |
Operator Chains Composition
In the situations when part of the operator chain needs to be prepared in advance,
compose and to operators are at hand.
// Assembly of stream, nothing is streamed yet
Multi<String> publisherStage =
Multi.just("foo", "bar")
.map(String::trim);
Function<Multi<String>, Multi<?>> processorStage =
upstream -> upstream.map(String::toUpperCase);
// Execution of pre-prepared stream
publisherStage
.compose(processorStage)
.map(s -> "Item received: " + s)
.forEach(System.out::println);
// > Item received: FOO
// > Item received: BAR