Introduction to RxJava Java 11.12.2016

rxjava_logo.png Imperative programming and functional programming

As you may already know, Java is an imperative programming language. Typically, a Java program consists of a sequence of instructions. Each of these instructions is executed in the same order in which you write them, and the execution leads to changes in the state of the program.

For example, the following code creates a collection of even numbers

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> output = new ArrayList<>();
for (Integer x : input) {
    if (x % 2 == 0) {
        output.add(x);
    }
}

In order to produce the desired output, you define every step that the program has to take to build the result list, and each step is defined sequentially.

One alternative to imperative programming is functional programming. In functional programming, the result of the program derives from the evaluation of functions, without changing the internal program state.

So, using functional programming, the example above can be rewritten with the following pseudocode:

Observable.from(input).filter(x -> x % 2 == 0);

Functional code is often shorter and easier to understand than the corresponding imperative code. You can do the same work by writing less code, and every programmer knows that less code leads to less bugs.

In imperative programming, implementing abstraction requires you to define interfaces and split code into components that implement those interfaces; functional languages make it easier to create abstractions.

The features of a functional language are the following:

  • Higher-order functions. Higher-order functions are functions that take other functions as arguments.
  • Immutable data. Data is immutable by default; instead of modifying existing values, functional languages often operate on a copy of original values to preserve them (in Java, primitive types are already immutable but an object is not, so its implementation must not allow the object’s state to be changed after creation).
  • Concurrency. Concurrency is supported and is safer to implement, thanks also to the immutability by default.
  • Referential transparency. This term defines the fact that computations can be performed at any time, always producing the same result (similar to static methods in Java).
  • Lazy evaluation. Values can be computed only when needed (lazily) because functions can be evaluated at any time, always giving the same result (these functions do not depend on the program’s internal state).

With the release of Java 8, some constructs of functional programming have been added, like lambda functions and streams. But with the RxJava library we can use concepts of functional programming with Java 1.7 and Java 1.6.

Lambda expressions are anonymous functions; the lambda operator is indicated using an arrow symbol pointing to the right (->). Inputs are placed at the left of the operator, and the function body is placed at the right.

Reactive Programming

Reactive programming takes functional programming a little bit further, by adding the concept of data flows and propagation of data changes.

Functional reactive programming is a new programming paradigm; it was made popular by Erik Meijer and it’s based on two concepts:

  • Code "reacts" to events.
  • Code handles values as they vary in time, propagating changes to every part of the code that uses those values.

The key to understand reactive programming is to think about it as operating on a stream of data. It's a sequence of events, where an event could be user input (like a tap on a button), a response from an API request (like a Facebook feed), data contained in a collection, or even a single variable.

Reactive programming is a programming paradigm based on the concept of an asynchronous data flow. A data flow is like a river: it can be observed, filtered, manipulated, or merged with a second flow to create a new flow for a new consumer.

In reactive programming, there's often a component that acts as the source, emitting a sequence of items (or a stream of data), and some other components that observe this flow of items and react to each emitted item (they "react" to item emission).

In reactive programming the consumer reacts to the data as it comes in. This is the reason why asynchronous programming is also called reactive programming. Reactive programming allows to propagates event changes to registered observers.

The Observer Pattern

The Observer pattern is a design pattern in which there are two kinds of objects: observers and subjects. An observer is an object that observes the changes of one or more subjects; a subject is an object that keeps a list of its observers and automatically notifies them when it changes its state.

java_observer_pattern.jpg

The definition of the Observer pattern from the "Gang of Four" book is to

Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.

This pattern is the core of reactive programming. It fits perfectly the concept of reactive programming by providing the structures to implement the produce/react mechanism.

Java SDK implements the Observer pattern with the class java.util.Observable and the interface java.util.Observer.

class Rose extends java.util.Observable {
    public void doWinkAndNotify() {
        Object wink = doWink();
        notifyObservers(wink);
    }
}

class Billy implements Observer {
    @Override
    public void update(Observable obs, Object item) {
        doRun(item)
    }
}

class Jack implements Observer {
    @Override
    public void update(Observable obs, Object item) {
        doFlirt(item)
    }
}

The Rose class extends java.utils.Observable and is responsible for producing an object and notifying the observers as soon as the item has been produced.

Billy and Jack implements Observer and is responsible for observing Rose and consuming every 'wink' that Rose produces.

Putting all together

Billy billy = new Billy();
Jack jack = new Jack();
Rose rose = new Rose();
rose.addObserver(billy);
rose.addObserver(jack);
rose.doWinkAndNotify();

You'll never use this implementation; instead you’ll use the built-in RxJava implementation.

What's RxJava?

ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming. It’s a library that implements functional reactive programming in many languages. It uses observables to represent asynchronous data streams, and it abstracts all details related to threading, concurrency, and synchronization.

Rx makes well-known concepts, such as the push approach, easy to implement and consume. Instead of asking for a result and waiting, the developer simply asks for a result and gets notified when the result is available. The developer provides a clear sequence of reactions to the events that are going to happen. For every event, the developer provides a reaction; for example, the user is asked to log in and submit a form with his username and password. The application executes the network call for the login and states what is going to happen: show a success message and store the user's profile or show an error message.

ReactiveX has been implemented as a library for the most used programming languages: Java, JavaScript, C#, Scala, Clojure, C++, Ruby, Python, Groovy, JRuby, Kotlin, Swift, and more.

RxJava is a library that implements the concepts of ReactiveX in Java. You can rewrite the imperative code that filters even numbers using RxJava:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
Observable.from(input).filter(new Func1() {
    @Override
    public Boolean call(Integer x) {
        return x % 2 == 0;
    }
})

Or, using a lambda expression:

Observable.from(input).filter(x -> x % 2 == 0);

The resulting object (the instance of rx.Observable) will generate a sequence of the even numbers contained in the input sequence: 2 and 4.

The basic building blocks of reactive code are Observable and Subscriber. An Observable emits items; a Subscriber consumes those items.

Observables (or Subjects) are the two "producing" entities. Observers ( or Subscribers) are the two "consuming" entities.

Whenever you want to emit a single scalar value or a sequence of values, or even an infinite value stream, you can use an Observable.

An Observable may emit any number of items (including zero items), then it terminates either by successfully completing, or due to an error. It represents a push-based collection, which is a collection in which events are pushed when they are created. For each Subscriber it has, an Observable calls Subscriber.onNext() any number of times, followed by either Subscriber.onComplete() or Subscriber.onError().

rxjava_consume.png

An Subscriber is an object that subscribes to an Observable. It listens and reacts to whatever sequence of items is emitted by the Observable. The Subscriber is not blocked while waiting for new emitted items, so in concurrent operations, no blocking occurs. It just wakes up when a new item is emitted.

The following pseudocode is an example of the method that the Subscriber implements that reacts to the Observable's items:

onNext = { it -> doSomething }

Here, the method is defined, but nothing is invoked. To start reacting, you need to subscribe to the Observable:

observable.subscribe(onNext)

Now the Subscriber is listening for items and will react to every new item that will be emitted.

Notice that an Subscriber reacts to three types of events:

  • Item emission by the Observable. It occurs zero, one, or more times. If the sequence completes correctly, the onNext method will be invoked as many times as the number of items in the sequence. If an error occurs at a certain point, the onNext method won’t be invoked any further.
  • The completion of items emission. Only when all items in the sequence are emitted correctly will the onComplete method be invoked. It’s invoked only once, and after the last item has been emitted. It also can never happen if you’re working with an infinite sequence.
  • An error. Error can occur in every moment of the sequence, and the sequence will stop immediately. In this case, the method onError will be invoked, passing the error as a Throwable object. The other two methods, onNext and onComplete, won’t be invoked.
In RxJava 1.0, the onComplete() event is actually called onCompleted().

This approach looks a lot like standard Observer pattern, but it differs in one key way - Observables often don't start emitting items until someone explicitly subscribes to them. In other words: if no one is there to listen, the tree won't fall in the forest.

In RxJava, rx.Observable adds two semantics to the Gang of Four's Observer pattern (the default semantic is to emit created items, like a list with items 2,4 in the example above):

  • The producer can notify the consumer that there is no more data available.
  • The producer can notify the consumer that an error has occurred.

The easiest way to create an observable is to use the factory methods that are implemented in the RxJava library.

  • Method Observable.just() allows to create an observable as wrapper around other data types.
  • Method Observable.from() takes a collection or an array and emits their values in their order in the data structure.
  • Method Observable.fromCallable() allows to create an observable for a Callable.

Observable.just() creates an Observable such that when an Subscriber subscribes, the onNext() of the Subscriber is immediately called with the argument provided to Observable.just().

Observable.range(a, n) creates an Observable that emits a range of n consecutive integers starting from a. Observable.just(1, 2, 3, 4, 5) and Observable.range(1, 5) will emit the same sequence.

import java.util.Arrays;
import java.util.List;

import rx.Observable;

public class SandBox {
    public static void main(String[] args) {
        List<String> list = Arrays.asList("Android", "Ubuntu", "Mac OS");
        Observable<List<String>> listObservable = Observable.just(list);
        listObservable.subscribe(new Observer<List<String>>() {

        @Override
        public void onComplete() {}

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onNext(List<String> list) {
            System.out.println(list);
        }
    });
    }
}

Let's add RxJava to simple project in IntelliJ IDEA: File -> Project Structure ... -> Libraries -> Click "+" -> Choose "From Maven ..." -> Type io.reactivex:rxjava and press Enter -> Choose io.reactivex:rxjava:x.x.x -> Click "OK".

Following is simple start point for RxJava

import rx.Observable;
import java.util.Arrays;
import java.util.List;

public class SandBox {
    public static void main(String[] args) {
        List<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
        List<Integer> output;

        // print
        Observable.from(input).filter(x -> x % 2 == 0).subscribe(System.out::println);

        // convert to List
        output = Observable.from(input).filter(x -> x % 2 == 0).toList().toBlocking().single();
        System.out.println(output);
    }
}

Operators

Operators can be used in between the source Observable and the ultimate Subscriber to manipulate emitted items. With the usage of some operators defined in the library, you can compose and transform sequences of data in a easy way that requires little coding, so it’s less prone to error.

Operator Description
map transforms the items emitted by an Observable by applying a function to each item
zip combines the emissions of multiple Observables together via a specified function, then emits a single item for each combination based on the results of this function
filter emits only those items from an Observable that pass a predicate test
flatMap transforms the items emitted by an Observable into Observables, then flattens the emissions from those into a single Observable
take emits only the first n items emitted by an Observable
reduce applies a function to each item emitted by an Observable, sequentially, and emits the final value
skip suppresses the first n items emitted by an Observable
buffer periodically gathers items emitted by an Observable into bundles and emits these bundles rather than emitting the items one at a time
concat emits the emissions from two or more Observables without interleaving them
replay ensures that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items
merge combines multiple Observables into one by merging their emissions

Let's look at each of them in details.

map

The map() operator is one of the most common operators in reactive programming. It lets you transform every item of the emitted sequence with a specified function.

In the following snippet, the input sequence is a series of integers (1, 2, and 3), and every item of the sequence is multiplied by 10. Applying the map operator to an observable, a new observable will be created; this new observable will emit x*10 every time the first observable emits x.

Observable.just(1, 2, 3, 4, 5)
    .map(i -> i * 10)
    .subscribe(i -> System.out.println(i));

Our map() operator is basically an Observable that transforms an item. We can chain as many map() calls as we want together, polishing the data into a perfect, consumable form for our end Subscriber.

flatMap

The flatMap operator performs two types of actions: the map action that transforms the emitted items into observables and a flatten action that converts those observables into one observable. So, you can use flatMap() to convert the input observable into any other observable (or, in other words, to convert the input stream into a different stream).

So, like the map() operator, flatMap() applies a function to each emitted item, but this function must return an observable. Then those items are merged into one observable, so the observables may be interleaved. If you need your observables not to be interleaved, you must use the operator concatMap.

For example, suppose that you have a list of integers as input and you want to convert it in a sequence of strings in the format "Number x" where x is the next integer in the input list. You can combine flatMap and map as follows:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
Observable.just(input)
    .flatMap(i -> Observable.from(i))
    .map(i -> "Number " + i)
    .subscribe(i -> System.out.println(i));

map vs flatMap

map and flatMap are two commonly used ReactiveX operators. Both map and flatMap apply a transformational function on each item emitted by an Observable. However, map only emits one item, whereas flatMap emits zero or more items.

javarx_map_flatmap.png

In this example, the map operator applies the split function to each string and emits one item containing an array of strings. Use this when you want to transform one emitted item into another.

Sometimes, the function we apply returns multiple items, and we want to add them to a single stream. In this instance, flatMap is a good candidate. In the example above the flatMap operator "flattens" the array of words emitted into a single sequence.

concatMap

The concatMap operator behaves like flatMap, except that it ensures that the observables are not interleaved but concatenated, keeping their order.

zip

The zip() operator takes multiple observables as inputs and combines each emission via a specified function and emits the results of this function as a new sequence.

The function is applied in strict sequence. If there are two sequences as input, zip waits for the first item emitted by the first sequence, then the first item from the second sequence, applies the function to them, and emits the result of the function. It then waits for the second item from the first sequence, the second item from the second sequence, applies the function to these two items, and emits the result as a function. And so on. It will stop when the shortest sequence stops.

Here’s an example:

Observable<Integer> streamA = Observable.range(1, 3);
Observable<Integer> streamB = Observable.range(5, 10);

Observable.zip(streamA, streamB, (a, b) -> {
    return a + ":" + b;
})
.subscribe(i -> System.out.println(i));

In this example, the zip operator takes two sequences as inputs and a function that simply takes two integers and builds a string.

As you can see, the emitted sequence stops when the shortest sequence stops, and in this case the shortest sequence is the first one (1,2,3).

concat

The concat operator concatenates two or more emissions, generating one emission where all the items from the first source emission appear before the items of the second source emission. Also, the concat operator waits for each sequence to be completed before subscribing to the next observable.

In the following example, you concatenate two sequences of strings:

Observable<String> streamA = Observable.just("a", "b");
Observable<String> streamB = Observable.just("z", "x", "y");

Observable.concat(streamA, streamB)
    .subscribe(i -> System.out.println(i));

Note that you can only concatenate sequences of objects of the same type (i.e., you cannot concatenate an observable that emits strings with one that emits integers).

filter

The filter operator uses a specified function to allow only some items of the source sequence to be emitted.

Observable.from(new Integer[]{2, 30, 22, 5, 60, 1})
    .filter(i -> i > 10)
    .subscribe(i -> System.out.println(i));

distinct

The distinct operator applies a filter to the source sequence. If an item is emitted more than once, only the first occurrence will be emitted.

first

The first operator emits only the first item of a sequence. If a function is specified, it will be used to filter the items, so only the first item of the sequence that meets the conditions will be emitted.

You can take the example written for the filter operator and change it by applying the operator first instead of filter:

Observable.from(new Integer[]{2, 30, 22, 5, 60, 1})
    .first(i -> i > 10)
    .subscribe(i -> System.out.println(i));

last

If you can apply a filter to the beginning of the sequence with the operator first, you can also filter the end of the sequence with the operator last.

The last operator can take a predicate as a parameter, and only the last item from the source sequence that evaluates that predicate to true is emitted.

Observable.from(new Integer[]{2, 30, 22, 5, 60, 1})
    .first(i -> i > 10)
    .subscribe(i -> System.out.println(i));

take

The operator first is good for filtering the beginning of a sequence, but what if you are interested not only in the first item but you want the first n items? Here’s where the take operator comes in handy. It takes an integer n as a parameter, allowing only the first n items to be emitted.

Observable.from(new Integer[]{2, 30, 22, 5, 60, 1})
    .take(2)
    .subscribe(i -> System.out.println(i));

startWith

The operator startWith takes the input sequence and adds a given item to it. It can be useful if you want to force your sequence to begin with a default value, or with a cached one.

scan

The operator scan takes one sequence and applies a function to each pair of sequentially emitted items.

Observable<Integer> streamA = Observable.range(1, 5);

streamA.scan((a, b) -> a + b)
    .subscribe(i -> System.out.println(i));

After the emission of the first item from streamA, streamA will emit the same item without any transformation. When the second item is emitted from the source, streamA will apply the function to the first and the second item, emitting the result.

There are many other operators available to transform, filter, or combine observables.