Programowanie reaktywne
Programowanie reaktywne
(reactive programming
) jest paradygmatem programowania asynchronicznego zorientowanym na przepływ danych w postaci strumieni i propagacje zmian w formie zdarzeń. Strumieniem danych może być prawie wszystko, np. zmiana wartości obiektu, zdarzenia interfejsu użytkownika, zapytania sieciowe, operacje na danych czy błędy, a każde zadanie może być wykonywane na własnym wątku. Sposób działania opiera się na wzorcu Obserwator
, tzn. strumienie mogą emitować wartości i być obserwowane. Realizacją programowania reaktywnego dla Android
jest moduł RxAndroid
rozszerzający bibliotekę RxJava
(implementacja ReactiveX
dla Java VM
) o harmonogramy (Schedulers
) wspierające wielowątkowość w środowisku Android. RxJava
upraszcza zarządzanie asynchronicznymi i współbieżnymi operacjami, ułatwia komunikacje zadań w tle z wątkiem głównym, pozwala na wczesne wykrycie błędów i zmniejsza zapotrzebowanie na zmienne stanu.
Wzorzec
Zadaniem wzorca Obserwator
jest realizacja mechanizmu komunikacji między obiektami zależnymi poprzez powiadamianie obiektów subskrybentów o zmianie stanu obiektu obserwowanego czy statusie operacji. Obiekt może być obserwowany (Observable
) przez wielu obserwatorów (Observer
) i sam może być obserwatorem innych obiektów. Obiekt obserwowany emituje zdarzenia, które trafiają do obserwatorów, a następnie są przez nich przetwarzane.
Obserwator
Obiekt obserwatora dokonuje subskrybcji obiektu obserwowanego za pomocą metody subscribe
wywołanej na obiekcie obserwowanym oraz implementuje metody onSubscribe
, onNext
, onError
, onComplete
interfejsu Observer
w celu podjęcia działania w odpowiedzi na stan i emisje obiektu obserwowanego.
private Observer<String> createObserver() {
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//Observer subscribe to Observable
}
@Override
public void onNext(String s) {
//Observer received some data stream
}
@Override
public void onError(Throwable e) {
//Observer received emitted error
}
@Override
public void onComplete() {
//Observer completed receiving data from Observable
}
};
return observer;
}
Obserwowany
Obiekt obserwowany w RxJava
jest strumieniem danych wykonującym operacje i emitującym dane. Podstawowym typem obiektu obserwowanego jest Observable
- emituje dowolną ilość elementów i kończy się sukcesem lub zdarzeniem błędu. Wyróżnia się również także: Flowable
- rozszerza Observable
o wsparcie dla backpressure
(zarządzanie czasem emitowania i konsumowania zdarzenia), Single
- emituje jedno zdarzenie lub błąd, Maybe
- może emitować jedno zdarzenie lub błąd, Completable
- nie emituje zdarzenia, kończy się sukcesem lub błędem. Obiekt obserwowany może być tworzony za pomocą kilku operatorów jak np.: create
, just
, defer
, from
, interval
, range
, repeat
, timer
, które różnią się sposobem definicji, czasem, częstotliwością i zwracanym typem danych emisji.
private Observable<String> createObservableJust() {
//up to 10 items
Observable<String> observable = Observable.just("Catan", "Splendor", "7 Wonders", "Codenames", "Taboo");
return observable;
}
private Observable<String> createObservableFrom() {
//create Observable by one of: fromArray, fromCallable, fromFuture, fromIterable, fromPublisher
Observable<String> observable = Observable.fromArray("Catan", "Splendor", "7 Wonders", "Codenames", "Taboo");
return observable;
}
private Observable<String> createObservable() {
final List<String> data = prepareData();
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//emit all data one by one
for(String item : data) {
if(!emitter.isDisposed()) {
emitter.onNext(item);
}
}
//emit completed event
if(!emitter.isDisposed()) {
emitter.onComplete();
}
}
});
return observable;
}
private List<String> prepareData() {
List<String> data = new ArrayList<>();
data.add("Catan");
data.add("Splendor");
data.add("7 Wonders");
data.add("Codenames");
data.add("Taboo");
return data;
}
Harmonogramy
Harmonogramy decydują o tym na jakim wątku obiekt obserwowany będzie wykonywał zadanie oraz na jakim wątku obiekt obserwatora będzie odbierał i przetwarzał wyemitowane dane. Najpopularniejszymi harmonogrami są: AndroidSchedulers.mainThread
- odpowiada za główny wątek aplikacji (wprowadzony w module RxAndroid
), Schedulers.io
- wykorzystywany w nie wymagających operacjach i Schedulers.computation
- przeznaczony do przetwarzania intensywnych zadań. Wyróżnia się także Schedulers.newThread
, Schedulers.single
, Schedulers.immediate
, Schedulers.trampoline
, Schedulers.from
, które różnią się synchronicznością, kolejnością, czasem rozpoczęcia i ilością przetwarzanych zadań. Przypisanie harmonogramu dokonywane jest na obiekcie obserwowanym za pomocą metod subscribeOn
i observeOn
.
private void startWorkAndSubscribe() {
//create Observable and Observer
Observable<String> observable = createObservable();
Observer<String> observer = createObserver();
//subscribe on choosen schedulers
observable
.observeOn(Schedulers.io()) //emit data on background thread
.subscribeOn(AndroidSchedulers.mainThread()) //receive data on UI thread
.subscribe(observer); //subscribe observer to observable
}
Operatory
Operatory pozwalają na manipulacje i modyfikacje emitowanych danych za pomocą zadań transformacji, filtrowania, łączenia, agregacji czy tworzenia. RxJava dostarcza szeroki zbiór operatorów podzielonych na kategorie w zależności od rodzaju operacji, a ich łączenie umożliwia uzyskanie dowolnego złożonego strumienia danych. Poza operatorami odpowiedzialnymi za tworzenie obiektów obserwowanych (create
, just
, from
itp) do często wykorzystywanych należą m.in. filter
, map
, skip
, take
, concat
.
private void startWorkAndSubscribeWithOperators() {
Observable<String> observable = createObservable();
Observer<Object> observer = createObserver();
observable
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.filter(new Predicate<String>() { //emit only some elements
@Override
public boolean test(String s) throws Exception {
return s.toLowerCase().startsWith("c");
}
})
.map(new Function<String, Object>() { //modify data
@Override
public Object apply(String s) {
return s.toUpperCase();
}
})
.subscribe(observer);
}
Uchwyt
W celu uniknięcia wycieków pamięci związanych z niepożądaną dłużej subskrypcją obserwatora należy przypisać referencje Disposable
z poziomu obserwatora oraz wypisać go z subskrypcji.
public class SingleObserverActivity extends AppCompatActivity {
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_single_observer);
Observable<String> observable = createObservableWithDispose();
Observer<String> observer = createObserver();
observable
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
@Override
protected void onDestroy() {
super.onDestroy();
disposable.dispose(); //don't send event anymore
}
private Observer<String> createObserverWithDispose() {
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//assign subscription to dispoable
disposable = d;
}
@Override
public void onNext(String s) { }
@Override
public void onError(Throwable e) { }
@Override
public void onComplete() { }
};
return observer;
}
private Observable<String> createObservableJust() {
Observable<String> observable = Observable.just("Catan", "Splendor", "7 Wonders", "Codenames", "Taboo");
return observable;
}
}
W przypadku wielu obserwatorów manualne niszczenie subskrypcji może być żmudne i podatne błędy dlatego w takiej sytuacji warto użyć CompositeDisposable
, który utrzymuje listę subskrypcji obiektów DisposableObserver
w puli i potrafi zlikwidować je wszystkie naraz.
public class MultipleObserversActivity extends AppCompatActivity {
private CompositeDisposable compositeDisposable = new CompositeDisposable();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_multiple_observers);
Observable<String> observable = createObservable();
DisposableObserver<String> observerGames = createDisposableObserver();
DisposableObserver<String> observerFilteredGames = createDisposableObserver();
compositeDisposable.add(observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(observerGames));
compositeDisposable.add(observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return s.toLowerCase().startsWith("c");
}
})
.subscribeWith(observerFilteredGames));
}
@Override
protected void onDestroy() {
super.onDestroy();
compositeDisposable.clear(); //clear all disposable
}
private Observable<String> createObservableJust() {
Observable<String> observable = Observable.just("Catan", "Splendor", "7 Wonders", "Codenames", "Taboo");
return observable;
}
private DisposableObserver<String> createDisposableObserver() {
DisposableObserver<String> observer = new DisposableObserver<String>() {
@Override
public void onNext(String s) { }
@Override
public void onError(Throwable e) { }
@Override
public void onComplete() { }
};
return observer;
}
}
RxBinding
Przechwycenie zdarzeń interfejsu użytkownika takich jak np. kliknięcie na przycisk, wpisanie tekstu itp. w programowaniu imperatywnym wymagają metod zwrotnych (callback
). W RxJava
zdarzenie te mogą być emitowane za pomocą strumieni danych przy użyciu RxBinding
dzięki czemu komunikacja kontrolek widoku z innymi warstwami aplikacji staje się uproszczona. W tym celu wystarczy na kontrolce widoku wywołać metodę oczekiwanej akcji co w efekcie zwróci obiekt typu Observable
.
public class RxBindingActivity extends AppCompatActivity {
private Button button, buttonBinding;
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxbinding);
button = findViewById(R.id.button);
buttonBinding = findViewById(R.id.buttonBinding);
//provide click actions
setButtonClickByListener();
setButtonClickByRxBinding();
}
//standard callback way
private void setButtonClickByListener() {
button.setOnClickListener(v -> {
//perform job, for reactive programming emmit stream for Observer
Observable.just(true).subscribe(aVoid -> {
//action form Observer
});
});
}
//RxBinding way
private void setButtonClickByRxBinding() {
RxView.clicks(buttonBinding).subscribe(aVoid -> {
//action form Observer
});
}
//provide RxBinding for other widgets
}