はじめに
この記事について
この記事はRxJavaの基本的な5つの要素の学習ログです。
- Observable
- Operators
- Single
- Subject
- Scheduler
学習のモチベーション
- RxJavaに触れずにAndroidアプリ開発をやってきましたが、業務でRxJavaのコードを読んだり書いたりする場面が出てきたので、 RxJavaを一度学習しておきたい。
- リアクティブプログラミングを知る。
記事中のコードについて
RxKotlinを用いたコードを載せています。
implementation("io.reactivex.rxjava3:rxkotlin:3.0.1")
ReativeXとRxJava
ReactiveXは非同期でイベント駆動型のプログラムを構成するライブラリ
RxJavaを学び始めると、まずReativeXの存在を知ります。
ReativeXについてReactiveX - Introでは以下のように説明されています。
ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.
ここで以下の2点がわかりました。
- ReactiveXにはObservable(観測可能な)な処理の流れがあること
- ReactiveXは非同期でイベント駆動型のプログラムを構成するライブラリであること
ちなみに「event-based programs」の部分を「event-driven programming(イベント駆動プログラミング)」と解釈しました。
イベント駆動プログラミングについて深掘りはしませんが、普段のアプリ開発でやっているsetOnClickListener
などがまさにイベント駆動プログラミングであるとわかりました。
また、リアクティブプログラミングについても触れられています。
これについても深掘りはしませんが、「データのストリームがあり、そこにデータが放出されたのを契機に処理したい内容を宣言的に記述するプログラミング」というように解釈しました。
RxJavaはReactiveXをJavaで実装したもの
ReactiveXは様々な言語で実装されていて、JavaScript、Swift、Scalaなどの言語で実装されています。
その中にJavaで実装されたものがあり、それがRxJavaと呼ばれています。
ちなみに、他の言語についてはReactiveX - Languagesで確認できます。
(言語が異なってもReactiveXの考え方は同じなので、別の言語で実装することになってもReactiveXの知識を活かすことができる)
Observable
Observableは最も基本的なクラス
Observableはデータをストリームに放出します。
簡単な例として、以下のコードで1~3の数字を放出するObservableを作っています。
いずれも放出する値は同じですが、Observableの作り方は何パターンかあります。
// Observable.just(...)を使う場合 val observable: Observable<Int> = Observable.just(1,2,3) // Observable.create(...)を使う場合 val observable: Observable<Int> = Observable.create { emitter -> emitter.onNext(1) emitter.onNext(2) emitter.onNext(3) emitter.onComplete() } // RxKotlinの拡張関数Iterable<T>.toObservable()を使う場合 val observable: Observable<Int> = listOf(1, 2, 3).toObservable()
EmitterのonNextでデータを放出する
Observable.create(...)を使う場合、emitter.onNext(...)
という呼び出しでデータを放出しています。
emitter
はEmitter
のインタフェースを実装していて、以下の3つのメソッドを持っています。
(正確にはObservableEmitter
を実装していて他のメソッドもありますが、基本的な呼び出しである3つのメソッドを挙げます。)
- onNext → Observableがデータを放出するためのメソッド
- onComplete → データの放出が終わったら呼び出すメソッド
- onError → エラーがあった時に呼び出すメソッド
ちなみに、Observable.create { emitter ->
というラムダ式で書けるのはSAM変換によるものです。
Observable#subscribeで放出されたデータを受け取る
以下のコードではデータが放出された時のコールバックだけ追加していますが、エラー時のコールバックやExceptionを指定するオーバーロードされたsubscribe
メソッドもあります。
observable.subscribe { value ->
println(value)
}
Operator
OperatorはObservableが放出したデータに操作を加える
Operatorをコードで追加する場合は、Observableに続けて追記していきます。
以下のコードではfilter(...)
オペレータを使って、Observableに放出されたデータを偶数かどうかでフィルタしています。
val observable: Observable<Int> = listOf(1, 2, 3).toObservable() observable .filter { value -> value % 2 == 0 // 偶数のみ通す } .subscribe { value -> println(value) // 偶数のみ出力 }
Operatorは続けて追記することができます。
Operatorはfilter(...)
のいくつもあり初めから覚えようとするのは覚えるのは困難ですが、ReactiveX - Operatorsでどんなオペレータがあるのか見ることができます。
Single
Singleは値を一つだけ放出する
Observableと似たものにSingleがあります。
Observableは一連の値を放出しますが、Singleは常に1つだけ値を放出するかエラーを返すかします。
以下のコードでは1
を放出するSingleを作成しています。
val single: Single<Int> = Single.just(1) val single: Single<Int> = Single.create { emitter -> emitter.onSuccess(1) }
SingleEmitterのonSuccessでデータを放出する
データを放出する場合はonSuccess
、エラーが起きた場合はonError
を呼び出します。
onError
が呼び出されると、subscribeも終了します。
Single#subscribeで放出されたデータを受け取る
以下のコードではデータが放出された時のコールバックを追加しています。
またmap(...)
オペレータを使っています。
single .map { value -> value * 10 // 値を10倍する } .subscribe { value -> println(value) }
Subject
SubjectをObservableとしてもObserverとしても使う
SubjectはObservableとObserverを継承しています。
そのため以下のコードのように、Subjectをsubscribeすることもできるし、observerとして設定することもできます。
以下のコードを実行した場合、observable
が放出した1~3の値が、observable.subscribe(subject)
と書いたことによりsubject
に伝播して、subject.subscribe {...}
で出力されます。
val subject = PublishSubject.create<Int>() subject.subscribe { value -> println(value) } val observable: Observable<Int> = listOf(1, 2, 3).toObservable() observable .subscribe(subject)
出力結果
1 2 3
Subjectの実装クラスを使う
Subject自体はabstractクラスです。
そのためSubjectを継承したクラスを作り、具体的な処理を記述する必要があります。
RxJavaではいくつかのSubjectを継承したクラスがあり、ユースケースに合わせて使用することができます。
それらのSubjectの説明は以下の記事が参考になりました。
Scheduler
observeOn(...)でオペレータの処理をするスレッドを切り替える
以下のコードはobserveOn(Schedulers.computation())
でオペレータの処理をするスレッドを切り替えています。
observeOn(...)
より下に記述したオペレータの処理をするスレッドを指定することになるので、filter(...)
とsubscribe(...)
のスレッドが切り替わっています。
subscribeOn(...)でObservableが動作するスレッドを指定する
以下のコードはsubscribeOn(Schedulers.io())
でObservableが動作するスレッドを指定しています。
適切なスレッドを選択する
以下のコードではスレッドをよく考えずに指定していますが、本来は用途を考えて適切なスレッドを選択する必要があります。
どのスレッドが適切なのかは以下の記事などを参考にしてください。
val observable: Observable<Int> = Observable.create { emitter -> println(Thread.currentThread().name) // RxCachedThreadScheduler - Schedulers.io()のスレッド emitter.onNext(1) emitter.onNext(2) emitter.onNext(3) emitter.onComplete() } observable .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .filter { value -> println(Thread.currentThread().name) // RxComputationThreadPool - Schedulers.computation()のスレッド value % 2 == 0 } .subscribe { value -> println(Thread.currentThread().name) // RxComputationThreadPool - Schedulers.computation()のスレッド println(value) }
さいごに
RxJavaの基本的な5つの要素を学習しましたが、実務で扱うには足りない知識がいくつもあります。
FlowableやCompletable、AndroidでRxJavaで扱うのに便利なRxAndroidなどたくさんありますが、引き続き勉強していこうと思います。
読んだドキュメント
- Home · ReactiveX/RxJava Wiki
- Getting Started · ReactiveX/RxJava Wiki
- How To Use RxJava · ReactiveX/RxJava Wiki
- Reactive Streams · ReactiveX/RxJava Wiki
- Observable · ReactiveX/RxJava Wiki
- Scheduler · ReactiveX/RxJava Wiki
- Subject · ReactiveX/RxJava Wiki
- Error Handling · ReactiveX/RxJava Wiki
- Plugins · ReactiveX/RxJava Wiki
- Backpressure (2.0) · ReactiveX/RxJava Wiki)
- ReactiveX - intro
- ReactiveX - Observable
- ReactiveX - Operators
- ReactiveX - Single
- ReactiveX - Subject
- ReactiveX - Scheduler