RxJava

Biblioteki

  |   12 min czytania
(RxJava 2.2, RxAndroid 2.1)

Programowanie reaktywne

Programowanie reaktywne (reactive programming) jest paradygmatem programowania asynchronicznego zorientowanym na przepływ danych w postaci strumieni i propagacje zmian w formie zdarzeń. Strumieniem danych może być prawie wszystko, np. zmiana wartości obiektu, zdarzenia interfejsu użytkownika, zapytania sieciowe, operacje na danych czy błędy, a każde zadanie może być wykonywane na własnym wątku. Sposób działania opiera się na wzorcu Obserwator, tzn. strumienie mogą emitować wartości i być obserwowane. Realizacją programowania reaktywnego dla Android jest moduł RxAndroid rozszerzający bibliotekę RxJava (implementacja ReactiveX dla Java VM) o harmonogramy (Schedulers) wspierające wielowątkowość w środowisku Android. RxJava upraszcza zarządzanie asynchronicznymi i współbieżnymi operacjami, ułatwia komunikacje zadań w tle z wątkiem głównym, pozwala na wczesne wykrycie błędów i zmniejsza zapotrzebowanie na zmienne stanu.

Wzorzec

Zadaniem wzorca Obserwator jest realizacja mechanizmu komunikacji między obiektami zależnymi poprzez powiadamianie obiektów subskrybentów o zmianie stanu obiektu obserwowanego czy statusie operacji. Obiekt może być obserwowany (Observable) przez wielu obserwatorów (Observer) i sam może być obserwatorem innych obiektów. Obiekt obserwowany emituje zdarzenia, które trafiają do obserwatorów, a następnie są przez nich przetwarzane.

Obserwator

Obiekt obserwatora dokonuje subskrybcji obiektu obserwowanego za pomocą metody subscribe wywołanej na obiekcie obserwowanym oraz implementuje metody onSubscribe, onNext, onError, onComplete interfejsu Observer w celu podjęcia działania w odpowiedzi na stan i emisje obiektu obserwowanego.

private Observer<String> createObserver() {
    Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            //Observer subscribe to Observable
        }

        @Override
        public void onNext(String s) {
            //Observer received some data stream
        }

        @Override
        public void onError(Throwable e) {
            //Observer received emitted error
        }

        @Override
        public void onComplete() {
            //Observer completed receiving data from Observable
        }
    };
    return observer;
}

Obserwowany

Obiekt obserwowany w RxJava jest strumieniem danych wykonującym operacje i emitującym dane. Podstawowym typem obiektu obserwowanego jest Observable - emituje dowolną ilość elementów i kończy się sukcesem lub zdarzeniem błędu. Wyróżnia się również także: Flowable - rozszerza Observable o wsparcie dla backpressure (zarządzanie czasem emitowania i konsumowania zdarzenia), Single - emituje jedno zdarzenie lub błąd, Maybe - może emitować jedno zdarzenie lub błąd, Completable - nie emituje zdarzenia, kończy się sukcesem lub błędem. Obiekt obserwowany może być tworzony za pomocą kilku operatorów jak np.: create, just, defer, from, interval, range, repeat, timer, które różnią się sposobem definicji, czasem, częstotliwością i zwracanym typem danych emisji.

private Observable<String> createObservableJust() {
    //up to 10 items
    Observable<String> observable = Observable.just("Catan", "Splendor", "7 Wonders", "Codenames", "Taboo");
    return observable;
}

private Observable<String> createObservableFrom() {
    //create Observable by one of: fromArray, fromCallable, fromFuture, fromIterable, fromPublisher
    Observable<String> observable = Observable.fromArray("Catan", "Splendor", "7 Wonders", "Codenames", "Taboo");
    return observable;
}

private Observable<String> createObservable() {
    final List<String> data = prepareData();
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            //emit all data one by one
            for(String item : data) {
                if(!emitter.isDisposed()) {
                    emitter.onNext(item);
                }
            }
            //emit completed event
            if(!emitter.isDisposed()) {
                emitter.onComplete();
            }
        }
    });
    return observable;
}

private List<String> prepareData() {
    List<String> data = new ArrayList<>();
    data.add("Catan");
    data.add("Splendor");
    data.add("7 Wonders");
    data.add("Codenames");
    data.add("Taboo");
    return data;
}

Harmonogramy

Harmonogramy decydują o tym na jakim wątku obiekt obserwowany będzie wykonywał zadanie oraz na jakim wątku obiekt obserwatora będzie odbierał i przetwarzał wyemitowane dane. Najpopularniejszymi harmonogrami są: AndroidSchedulers.mainThread - odpowiada za główny wątek aplikacji (wprowadzony w module RxAndroid), Schedulers.io - wykorzystywany w nie wymagających operacjach i Schedulers.computation - przeznaczony do przetwarzania intensywnych zadań. Wyróżnia się także Schedulers.newThread, Schedulers.single, Schedulers.immediate, Schedulers.trampoline, Schedulers.from, które różnią się synchronicznością, kolejnością, czasem rozpoczęcia i ilością przetwarzanych zadań. Przypisanie harmonogramu dokonywane jest na obiekcie obserwowanym za pomocą metod subscribeOn i observeOn.

private void startWorkAndSubscribe() {
    //create Observable and Observer
    Observable<String> observable = createObservable();
    Observer<String> observer = createObserver();

    //subscribe on choosen schedulers
    observable
        .observeOn(Schedulers.io()) //emit data on background thread
        .subscribeOn(AndroidSchedulers.mainThread()) //receive data on UI thread
        .subscribe(observer); //subscribe observer to observable
}

Operatory

Operatory pozwalają na manipulacje i modyfikacje emitowanych danych za pomocą zadań transformacji, filtrowania, łączenia, agregacji czy tworzenia. RxJava dostarcza szeroki zbiór operatorów podzielonych na kategorie w zależności od rodzaju operacji, a ich łączenie umożliwia uzyskanie dowolnego złożonego strumienia danych. Poza operatorami odpowiedzialnymi za tworzenie obiektów obserwowanych (create, just, from itp) do często wykorzystywanych należą m.in. filter, map, skip, take, concat.

private void startWorkAndSubscribeWithOperators() {
    Observable<String> observable = createObservable();
    Observer<Object> observer = createObserver();

    observable
        .observeOn(Schedulers.io())
        .subscribeOn(AndroidSchedulers.mainThread())
        .filter(new Predicate<String>() { //emit only some elements
            @Override
            public boolean test(String s) throws Exception {
                return s.toLowerCase().startsWith("c");
            }
        })
        .map(new Function<String, Object>() { //modify data
            @Override
            public Object apply(String s) {
                return s.toUpperCase();
            }
        })
        .subscribe(observer);
}

Uchwyt

W celu uniknięcia wycieków pamięci związanych z niepożądaną dłużej subskrypcją obserwatora należy przypisać referencje Disposable z poziomu obserwatora oraz wypisać go z subskrypcji.

public class SingleObserverActivity extends AppCompatActivity {
    
    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_single_observer);

        Observable<String> observable = createObservableWithDispose();
        Observer<String> observer = createObserver();

        observable
            .observeOn(Schedulers.io())
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose(); //don't send event anymore
    }


    private Observer<String> createObserverWithDispose() {
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                //assign subscription to dispoable
                disposable = d;
            }
            @Override
            public void onNext(String s) { }
            @Override
            public void onError(Throwable e) { }
            @Override
            public void onComplete() { }
        };
        return observer;
    }
	
    private Observable<String> createObservableJust() {
        Observable<String> observable = Observable.just("Catan", "Splendor", "7 Wonders", "Codenames", "Taboo");
        return observable;
    }
}

W przypadku wielu obserwatorów manualne niszczenie subskrypcji może być żmudne i podatne błędy dlatego w takiej sytuacji warto użyć CompositeDisposable, który utrzymuje listę subskrypcji obiektów DisposableObserver w puli i potrafi zlikwidować je wszystkie naraz.

public class MultipleObserversActivity extends AppCompatActivity {

    private CompositeDisposable compositeDisposable = new CompositeDisposable();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_multiple_observers);
		
        Observable<String> observable = createObservable();
        DisposableObserver<String> observerGames = createDisposableObserver();
        DisposableObserver<String> observerFilteredGames = createDisposableObserver();

        compositeDisposable.add(observable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(observerGames));

        compositeDisposable.add(observable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .filter(new Predicate<String>() {
                @Override
                public boolean test(String s) throws Exception {
                    return s.toLowerCase().startsWith("c");
                }
            })
            .subscribeWith(observerFilteredGames));
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        compositeDisposable.clear(); //clear all disposable
    }
	
    private Observable<String> createObservableJust() {
        Observable<String> observable = Observable.just("Catan", "Splendor", "7 Wonders", "Codenames", "Taboo");
        return observable;
    }

    private DisposableObserver<String> createDisposableObserver() {
        DisposableObserver<String> observer = new DisposableObserver<String>() {
            @Override
            public void onNext(String s) { }
            @Override
            public void onError(Throwable e) { }
            @Override
            public void onComplete() { }
        };
        return observer;
    }	
}

RxBinding

Przechwycenie zdarzeń interfejsu użytkownika takich jak np. kliknięcie na przycisk, wpisanie tekstu itp. w programowaniu imperatywnym wymagają metod zwrotnych (callback). W RxJava zdarzenie te mogą być emitowane za pomocą strumieni danych przy użyciu RxBinding dzięki czemu komunikacja kontrolek widoku z innymi warstwami aplikacji staje się uproszczona. W tym celu wystarczy na kontrolce widoku wywołać metodę oczekiwanej akcji co w efekcie zwróci obiekt typu Observable.

public class RxBindingActivity extends AppCompatActivity {

    private Button button, buttonBinding;

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxbinding);
        
        button = findViewById(R.id.button);
        buttonBinding = findViewById(R.id.buttonBinding);
        
        //provide click actions
        setButtonClickByListener();
        setButtonClickByRxBinding();
    }

    //standard callback way
    private void setButtonClickByListener() {
        button.setOnClickListener(v -> {
            //perform job, for reactive programming emmit stream for Observer
            Observable.just(true).subscribe(aVoid -> {
                //action form Observer
            });
        });
    }

    //RxBinding way
    private void setButtonClickByRxBinding() {
        RxView.clicks(buttonBinding).subscribe(aVoid -> {
            //action form Observer
        });
    }
    
    //provide RxBinding for other widgets
}