Newer Post

Slack Tips Tuesday: Always use reactions on announcements

Older Post

How to set up an ergonomic workplace and why you should care

Introduction to ReactiveX on Android

ReactiveX, also known as Reactive Extensions or RX, is a library for composing asynchronous and event-based programs by using observable sequences. This is perfect for Android, which is event-driven and user-focused platform.

ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming

The library is widely available for different programming languages. There's even a dedicated Android version which we're going to use in this tutorial (although we could also manage with plain Java flavor). Please refer to the website in order to learn more about ideas and reasoning behind the project.

Introduction to ReactiveX

The two main actors in RX world are:

  • Observable -- it emits events (in form of data)
  • Observer -- it subscribes to Observable in order to receive its events

Many Observers can subscribe to single Observable, manipulate data streams using operators (more on this later on), which is the real strength of RX approach. Functional programming allows us to greatly decouple dependencies and separate business logic, that operates within clearly defined boundaries of simple streams of data objects.

Although ReactiveX has very well defined concepts and API is mature, at first everyone struggles a bit with more advanced concepts. It's not easy to shift your mind from classic imperative programming to functional programming, as well as intuitively pick the right RX operators from vast array of possible solutions. Live examples are far better than theory in case of RX. Familiarity with Java 8 Streams helps a lot, but all you need to understand basics of RX is here anyway.

The following tutorial is specifically tailored to Android, but concepts and techniques explained are true for Reactive Extensions in general -- regardless the actual implementation, whether it's .NET, Java, JavaScript or others.

Prerequisites

Please refer to base tutorial How To Get Started With Android Programming for explanation of basic Android concepts. This tutorial directly expands on that base project -- you can download starting source code here.

Project setup

Open your app's build.gradle file and add RxAndroid library in dependencies section. Sync project.

dependencies {  
    compile fileTree(dir: 'libs', include: ['*.jar'])
    testCompile 'junit:junit:4.12'
    compile 'com.android.support:appcompat-v7:23.1.1'

    compile 'io.reactivex:rxandroid:1.1.0'
}

Basic Observable & Observer in action

1. Define an Observable

We're going to use one of many available helper operators to create our first Observable. Go to MainActivity.java and define new class field along with its initializer:

Observable<Integer> integerObservable = Observable.just(4, 8, 15, 16, 23, 42);  

We've declared a field integerObservable that holds an Observable parameterized by Integer. Observable.just() method creates an Observable from parameters passed in -- this is the simplest way to create an object that will emit those ints in a ReactiveX way. Now integerObservable is ready to do its job, but it's waiting for some interested party to subscribe to it.

Don't hesitate to look at the documentation at any time, it's comprehensive and contains very helpful marble diagrams.

2. Define an Observer

Define second field in your Activity class.

    Observer<Integer> integerObserver = new Observer<Integer>() {
        @Override
        public void onCompleted() {
            Log.d("PLAYGROUND", "onCompleted");
        }

        @Override
        public void onError(Throwable e) {
            Log.d("PLAYGROUND", "onError", e);
        }

        @Override
        public void onNext(Integer integer) {
            Log.d("PLAYGROUND", "onNext: " + integer);
        }
    };

Observer is going to consume data events emmited by Observable through its onNext method. Invocation of onCompleted is going to tell us that there are no additional events coming -- Observable finished its work and we can do the cleanup. Likewise, onError is going to be called in case of an unrecoverable error -- source of events can not continue emitting them, because there has been an error that potentially compromises data and system integrity. This conventions are essential for ensuring proper health of your system, because data streams are theoretically endless -- we need to know when special events happen.

3. Subscribe Observer to Observable

Up until this point, we've got a source of events in form of Integer objects and a consumer that is able to process them. Observable is not emitting events until is not subscribed to, Observer has no source of data attached. Let's change that in onCreate method of MainActivity:

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        integerObservable.subscribe(integerObserver);
    }

4. Run the code

Run the Android app and look at the logcat, you should see every single event being written out by Observer handler method.

01-24 07:43:47.511 27439-27439/? D/PLAYGROUND: onNext: 4  
01-24 07:43:47.511 27439-27439/? D/PLAYGROUND: onNext: 8  
01-24 07:43:47.511 27439-27439/? D/PLAYGROUND: onNext: 15  
01-24 07:43:47.511 27439-27439/? D/PLAYGROUND: onNext: 16  
01-24 07:43:47.511 27439-27439/? D/PLAYGROUND: onNext: 23  
01-24 07:43:47.511 27439-27439/? D/PLAYGROUND: onNext: 42  
01-24 07:43:47.511 27439-27439/? D/PLAYGROUND: onCompleted  

5. Inline the Observer code

ReactiveX not only simplifies logic workflow, it also allows you to type less code than usual. This is how we can inline the Observer code that actually does the work:

        integerObservable.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d("PLAYGROUND", "call: " + integer);
            }
        });

Notice this peculiar interface named Action1. This number says how many parameters can be taken by call() method of any given instance of that interface. This looks ugly, but it's helpful for us and compilers. We're going to remedy this in the future.

Command-click (CTRL-click on Windows) on Action1 and go to the source. It's defined as follows:

package rx.functions;

/**
 * A one-argument action.
 */
public interface Action1<T> extends Action {  
    void call(T t);
}

This and similar interfaces are there because up to Java 7 there were no Lambda Expressions support in the language. It comes in handy in our Android programming and it's a neat convention that you should keep in mind. You'll see more of that in the next section.

Taking inline approach we supply only code for handling new events. We're not going to know when the stream ends and that might be fine for some needs. The actual problem lies in absence of error handling routine. If Observable throws any error at us, we're going to end up with OnErrorNotImplementedException. Be sure to know what you're doing.

Operators

The most important tool at your disposal when working with ReactiveX are Operators. They can compose and transform streams of data, in order to make computations such as mapping or filtering in functional way.

Let's do the classic example of calculating squares of numbers, ReactiveX way. Add this code to onCreate:

        integerObservable.map(new Func1<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) {
                return integer * integer;
            }
        }).subscribe(integerObserver);

As you can see, we reuse existing Observable that emits integers, apply map function to it and subscribe to new Observable returned by this transformation. Func1<T, R> represents a Function that takes type T as only argument and returns result of type R.

More complicated, yet powerful and concise example:

        integerObservable
                .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        return Integer.toBinaryString(integer);
                    }
                })
                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        return s.endsWith("1");
                    }
                })
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        return Integer.parseInt(s, 2);
                    }
                })
                .subscribe(integerObserver);

We can freely change data types along the chain of transformations. We:

  • map Integer values to their binary representations as Strings
  • pass further only those binary Strings that end with 1 -- return Boolean value says if the data should be filtered out or not
  • map String representations back to Integer, so the original Observer (without touching its code) could use values matching only our desired criteria

There are many other useful operators and these are only presented to demonstrate functional possibilities that are made possible using ReactiveX, on platforms without FP support.

Handling background work in Android

Every long-running piece of code in Android is required to be performed on background thread, leaving UI Thread available to render UI and process user's actions such as touch events and input. ReactiveX supports scheduling jobs on different threads, making it suitable for every type of computation we might perform on Android.

1. Define custom Observable

We're going to create and example worker Observable by hand this time, to really illustrate what's going on internally. Create a field in your Activity:

    Observable<Integer> computeNumbersObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {

            int i = 0;

            while (true) {

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    subscriber.onError(e); // report error
                }

                subscriber.onNext(i++); // emit data

                if (i == 10) {
                    break;
                }

            }

            subscriber.onCompleted(); // indicate stream completion
        }
    });

Notice we're emitting events by hand, making sure to call onError and onCompleted when necessary. It's important to follow conventions and be a good citizen in RX world. As you can see, we're doing fairly long-running computation, simulating it by calling Thread.sleep().

2. Subscribe

Now, subscribe to this Observable, as usual in onCreate method:

computeNumbersObservable.subscribe(integerObserver);  

Run the app and witness total unresponsiveness until the computation finishes. It turns out, we're performing those computations on UI Thread, which is unacceptable in real world apps.

3. Declare Scheduler to use

Let's quickly fix this, moving it to background thread:

computeNumbersObservable.subscribeOn(Schedulers.computation()).subscribe(integerObserver);  

This is very straightforward way we can utilize powerful concept of Schedulers supplied by RxJava library. Take notice of different kinds of them. One method call can move our jobs to desired Thread optimized for various kinds of computations, without having us managing threads by hand -- all you do is supply business logic code.

4. Use AndroidScheduler to update UI

The documentation for subscribe() method states:

subscribe does not operate by default on a particular Scheduler

As we could see so far it has always been the UI Thread. To be on the safe side hovewer, let's always explicitly define schedulers both for subscribing and observing. RxAndroid provides custom Scheduler specific to Android, to ensure that any work that needs to be done in UI Thread, can be simply declared this way. The following code requests computing the data on the background thread, but results will be delivered on UI Thread, so we can safely update UI from Observer's callbacks:

        computeNumbersObservable
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(integerObserver);

One last thing -- Subscriptions

Subscription is a reference to a link between Observable and Observer, that allows us to unsubscribe should we not need it anymore. It is a good practice to unsubscribe using Android's lifecycle callbacks. The following code shows how to obtain Subscription object and unsubscribe on button click:

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        textView = (TextView) findViewById(R.id.textView);
        button = (Button) findViewById(R.id.button);

        final Subscription subscription = computeNumbersObservable
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(integerObserver);

        button.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                subscription.unsubscribe();
            }
        });

    }

Summary

In this tutorial you've learned how to setup and use basic ReactiveX components on Android platform, such as Observables, Observers, Subscriptions and Operators. You are now able to apply functional programming approach to your apps as well as greatly simplify and shorten the source code, shifting from imperative to reactive programming style.

Source code for this tutorial can be found on GitHub.

by Adam Jodłowski

We'll help you unleash.

Join the 20,000 developers who subscribe to our newsletter.

Scale your
Development team

We help you execute projects by providing trusted developers who can join your team and immediately start delivering high-quality code.

Hire Developers
code, android