Contents

Overview

Helidon has its own set of reactive operators that have no dependencies outside the Helidon ecosystem. These operators can be used with java.util.concurrent.Flow based reactive streams.

Maven Coordinates

To enable Reactive Engine, add the following dependency to your project’s pom.xml (see Managing Dependencies).

<dependency>
    <groupId>io.helidon.common</groupId>
    <artifactId>helidon-common-reactive</artifactId>
</dependency>

Usage

The stream processing operator chain can be easily constructed by io.helidon.common.reactive.Multi, or io.helidon.common.reactive.Single for streams with single value.

Example of Multi usage:
AtomicInteger sum = new AtomicInteger();

Multi.just("1", "2", "3", "4", "5")
        .limit(3)
        .map(Integer::parseInt)
        .forEach(sum::addAndGet);

System.out.println("Sum: " + sum.get());

// >Sum: 6
Example of Single usage:
Single.just("1")
        .map(Integer::parseInt)
        .map(i -> i + 5)
        .toStage()
        .whenComplete((i, t) -> System.out.println("Result: " + i));

// >Result: 6
Table 1. Operators

defer

Call the given supplier function for each individual downstream Subscriber to return a Flow.Publisher to subscribe to.

map

Map this Multi instance to a new Multi of another type using the given Mapper.

defaultIfEmpty

Signals the default item if the upstream is empty.

switchIfEmpty

Switch to the other publisher if the upstream is empty.

peek

Invoke provided consumer for every item in stream.

distinct

Filter out all duplicates.

filter

Filter stream items with provided predicate.

takeWhile

Take the longest prefix of elements from this stream that satisfy the given predicate. As long as predicate returns true, items from upstream are sent to downstream, when predicate returns false stream is completed.

dropWhile

Drop the longest prefix of elements from this stream that satisfy the given predicate. As long as predicate returns true, items from upstream are NOT sent to downstream but being dropped, predicate is never called again after it returns false for the first time.

limit

Limit stream to allow only specified number of items to pass.

skip

Skip first n items, all the others are emitted.

flatMap

Transform each upstream item with the supplied function into a Flow.Publisher, subscribe to them and then flatten their items into a single sequence of items emitted to the downstream.

flatMap

Transform each upstream item with the supplied function and flatten the resulting Flow.Publisher to downstream while limiting the maximum number of concurrent inner `Flow.Publisher`s and their in-flight item count, optionally aggregating and delaying all errors until all sources terminate.

flatMapCompletionStage

Transform each upstream item with the supplied function and flatten the resulting CompletionStage results to downstream.

flatMapIterable

Transform each upstream item with the supplied function and flatten the resulting Iterable to the downstream.

flatMapOptional

Transform each upstream item with the supplied function and flatten the resulting Optional to the downstream as item if present.

observeOn

Re-emit the upstream’s signals to the downstream on the given executor’s thread using a default buffer size of 32 and errors skipping ahead of items.

observeOn

Re-emit the upstream’s signals to the downstream on the given executor’s thread.

forEach

Terminal stage, invokes provided consumer for every item in the stream with no backpressure.

forEachCompletionStage

Terminal stage, invokes provided function for every item in the stream with strict backpressure, requests another item only when previous operation is finished.

collectList

Collect the items of this Multi instance into a Single of List.

collect

Collect the items of this Multi instance into a Single.

collect

Collect the items of this Multi into a collection provided via a Supplier and mutated by a BiConsumer callback.

collectStream

Collects up upstream items with the help of the callbacks of a java.util.stream.Collector.

reduce

Combine subsequent items via a callback function and emit the final value result as a Single.

reduce

Combine every upstream item with an accumulator value to produce a new accumulator value and emit the final accumulator value as a Single.

first

Get the first item of this Multi instance as a Single.

from

Wrap a CompletionStage into a Multi and signal its outcome non-blockingly.

from

Wrap a CompletionStage into a Multi and signal its outcome non-blockingly.

from

Create a Multi instance wrapped around the given publisher.

from

Create a Multi instance that publishes the given iterable.

from

Create a Multi instance that publishes the given Stream.

just

Create a Multi instance that publishes the given items to a single subscriber.

just

Create a Multi instance that publishes the given items to a single subscriber.

singleton

Create a Multi that emits a pre-existing item and then completes.

error

Create a Multi instance that reports the given exception to its subscriber(s). The exception is reported by invoking Subscriber#onError(java.lang.Throwable) when Publisher#subscribe(Subscriber) is called.

empty

Get a Multi instance that completes immediately.

never

Get a Multi instance that never completes.

concat

Concat streams to one.

onTerminate

Executes given java.lang.Runnable when any of signals onComplete, onCancel or onError is received.

ifEmpty

Executes given java.lang.Runnable when stream is finished without value(empty stream).

onComplete

Executes given java.lang.Runnable when onComplete signal is received.

onError

Executes the given java.util.function.Consumer when an onError signal is received.

onCancel

Executes given java.lang.Runnable when a cancel signal is received.

takeUntil

Relay upstream items until the other source signals an item or completes.

range

Emits a range of ever-increasing integers.

rangeLong

Emits a range of ever-increasing longs.

timer

Signal 0L and complete the sequence after the given time elapsed.

interval

Signal 0L, 1L and so on periodically to the downstream.

interval

Signal 0L after an initial delay, then 1L, 2L and so on periodically to the downstream.

timeout

Signals a TimeoutException if the upstream doesn’t signal the next item, error or completion within the specified time.

timeout

Switches to a fallback source if the upstream doesn’t signal the next item, error or completion within the specified time.

onErrorResume

java.util.function.Function providing one item to be submitted as onNext in case of onError signal is received.

onErrorResumeWith

Resume stream from supplied publisher if onError signal is intercepted.

retry

Retry a failing upstream at most the given number of times before giving up.

retry

Retry a failing upstream if the predicate returns true.

retryWhen

Retry a failing upstream when the given function returns a publisher that signals an item.

Operator Chains Composition

In the situations when part of the operator chain needs to be prepared in advance, compose and to operators are at hand.

Combining operator chains:
// Assembly of stream, nothing is streamed yet
Multi<String> publisherStage =
        Multi.just("foo", "bar")
                .map(String::trim);

Function<Multi<String>, Multi<?>> processorStage =
        upstream -> upstream.map(String::toUpperCase);

// Execution of pre-prepared stream
publisherStage
        .compose(processorStage)
        .map(s -> "Item received: " + s)
        .forEach(System.out::println);

// > Item received: FOO
// > Item received: BAR