【RxJava学習ログ】①基本的な5つの要素

はじめに

この記事について

この記事はRxJavaの基本的な5つの要素の学習ログです。

  • Observable
  • Operators
  • Single
  • Subject
  • Scheduler

学習のモチベーション

  • RxJavaに触れずにAndroidアプリ開発をやってきましたが、業務でRxJavaのコードを読んだり書いたりする場面が出てきたので、 RxJavaを一度学習しておきたい。
  • リアクティブプログラミングを知る。

記事中のコードについて

RxKotlinを用いたコードを載せています。

implementation("io.reactivex.rxjava3:rxkotlin:3.0.1")

ReativeXとRxJava

ReactiveXは非同期でイベント駆動型のプログラムを構成するライブラリ

RxJavaを学び始めると、まずReativeXの存在を知ります。
ReativeXについてReactiveX - Introでは以下のように説明されています。

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

ここで以下の2点がわかりました。

  • ReactiveXにはObservable(観測可能な)な処理の流れがあること
  • ReactiveXは非同期でイベント駆動型のプログラムを構成するライブラリであること

ちなみに「event-based programs」の部分を「event-driven programming(イベント駆動プログラミング)」と解釈しました。
イベント駆動プログラミングについて深掘りはしませんが、普段のアプリ開発でやっているsetOnClickListenerなどがまさにイベント駆動プログラミングであるとわかりました。

また、リアクティブプログラミングについても触れられています。
これについても深掘りはしませんが、「データのストリームがあり、そこにデータが放出されたのを契機に処理したい内容を宣言的に記述するプログラミング」というように解釈しました。

RxJavaはReactiveXをJavaで実装したもの

ReactiveXは様々な言語で実装されていて、JavaScript、Swift、Scalaなどの言語で実装されています。
その中にJavaで実装されたものがあり、それがRxJavaと呼ばれています。
ちなみに、他の言語についてはReactiveX - Languagesで確認できます。
(言語が異なってもReactiveXの考え方は同じなので、別の言語で実装することになってもReactiveXの知識を活かすことができる)

Observable

参考:ReactiveX - Observable

Observableは最も基本的なクラス

Observableはデータをストリームに放出します。
簡単な例として、以下のコードで1~3の数字を放出するObservableを作っています。
いずれも放出する値は同じですが、Observableの作り方は何パターンかあります。

// Observable.just(...)を使う場合
val observable: Observable<Int> = Observable.just(1,2,3)

// Observable.create(...)を使う場合
val observable: Observable<Int> = Observable.create { emitter ->
    emitter.onNext(1)
    emitter.onNext(2)
    emitter.onNext(3)
    emitter.onComplete()
}

// RxKotlinの拡張関数Iterable<T>.toObservable()を使う場合
val observable: Observable<Int> = listOf(1, 2, 3).toObservable()

EmitterのonNextでデータを放出する

Observable.create(...)を使う場合、emitter.onNext(...)という呼び出しでデータを放出しています。
emitterEmitterのインタフェースを実装していて、以下の3つのメソッドを持っています。
(正確にはObservableEmitterを実装していて他のメソッドもありますが、基本的な呼び出しである3つのメソッドを挙げます。)

  • onNext → Observableがデータを放出するためのメソッド
  • onComplete → データの放出が終わったら呼び出すメソッド
  • onError → エラーがあった時に呼び出すメソッド

ちなみに、Observable.create { emitter ->というラムダ式で書けるのはSAM変換によるものです。

Observable#subscribeで放出されたデータを受け取る

以下のコードではデータが放出された時のコールバックだけ追加していますが、エラー時のコールバックやExceptionを指定するオーバーロードされたsubscribeメソッドもあります。

observable.subscribe { value ->
    println(value)
}

Operator

参考:ReactiveX - Operators

OperatorはObservableが放出したデータに操作を加える

Operatorをコードで追加する場合は、Observableに続けて追記していきます。
以下のコードではfilter(...)オペレータを使って、Observableに放出されたデータを偶数かどうかでフィルタしています。

val observable: Observable<Int> = listOf(1, 2, 3).toObservable()

observable
    .filter { value ->
        value % 2 == 0 // 偶数のみ通す
    }
    .subscribe { value ->
        println(value) // 偶数のみ出力
    }

Operatorは続けて追記することができます。
Operatorはfilter(...)のいくつもあり初めから覚えようとするのは覚えるのは困難ですが、ReactiveX - Operatorsでどんなオペレータがあるのか見ることができます。

f:id:go_takahana:20211123173757p:plain

Single

参考:ReactiveX - Single

Singleは値を一つだけ放出する

Observableと似たものにSingleがあります。
Observableは一連の値を放出しますが、Singleは常に1つだけ値を放出するかエラーを返すかします。
以下のコードでは1を放出するSingleを作成しています。

val single: Single<Int> = Single.just(1)
val single: Single<Int> = Single.create { emitter ->
    emitter.onSuccess(1)
}

SingleEmitterのonSuccessでデータを放出する

データを放出する場合はonSuccess、エラーが起きた場合はonErrorを呼び出します。
onErrorが呼び出されると、subscribeも終了します。

Single#subscribeで放出されたデータを受け取る

以下のコードではデータが放出された時のコールバックを追加しています。
またmap(...)オペレータを使っています。

single
    .map { 
        value -> value * 10 // 値を10倍する
    }
    .subscribe { value ->
        println(value)
    }

Subject

参考:ReactiveX - Subject

SubjectをObservableとしてもObserverとしても使う

SubjectはObservableとObserverを継承しています。
そのため以下のコードのように、Subjectをsubscribeすることもできるし、observerとして設定することもできます。
以下のコードを実行した場合、observableが放出した1~3の値が、observable.subscribe(subject)と書いたことによりsubjectに伝播して、subject.subscribe {...}で出力されます。

val subject = PublishSubject.create<Int>()

subject.subscribe { value ->
    println(value)
}

val observable: Observable<Int> = listOf(1, 2, 3).toObservable()

observable
    .subscribe(subject)

出力結果

1
2
3

Subjectの実装クラスを使う

Subject自体はabstractクラスです。
そのためSubjectを継承したクラスを作り、具体的な処理を記述する必要があります。
RxJavaではいくつかのSubjectを継承したクラスがあり、ユースケースに合わせて使用することができます。

それらのSubjectの説明は以下の記事が参考になりました。

Rxで知っておくと便利なSubjectたち

Scheduler

参考:ReactiveX - Scheduler

observeOn(...)でオペレータの処理をするスレッドを切り替える

以下のコードはobserveOn(Schedulers.computation())でオペレータの処理をするスレッドを切り替えています。
observeOn(...)より下に記述したオペレータの処理をするスレッドを指定することになるので、filter(...)subscribe(...)のスレッドが切り替わっています。

subscribeOn(...)でObservableが動作するスレッドを指定する

以下のコードはsubscribeOn(Schedulers.io())でObservableが動作するスレッドを指定しています。

適切なスレッドを選択する

以下のコードではスレッドをよく考えずに指定していますが、本来は用途を考えて適切なスレッドを選択する必要があります。
どのスレッドが適切なのかは以下の記事などを参考にしてください。

非同期や並列処理にも役立つRxJavaの使い方

val observable: Observable<Int> = Observable.create { emitter ->
    println(Thread.currentThread().name) // RxCachedThreadScheduler - Schedulers.io()のスレッド 
    emitter.onNext(1)
    emitter.onNext(2)
    emitter.onNext(3)
    emitter.onComplete()
}
observable
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.computation())
    .filter { value ->
        println(Thread.currentThread().name) // RxComputationThreadPool - Schedulers.computation()のスレッド
        value % 2 == 0
    }
    .subscribe { value ->
        println(Thread.currentThread().name) // RxComputationThreadPool - Schedulers.computation()のスレッド
        println(value)
    }

さいごに

RxJavaの基本的な5つの要素を学習しましたが、実務で扱うには足りない知識がいくつもあります。
FlowableやCompletable、AndroidでRxJavaで扱うのに便利なRxAndroidなどたくさんありますが、引き続き勉強していこうと思います。

読んだドキュメント