今日たまたま同僚のAndroidエンジニアと話をしていて、
「RxSwiftにcomposeとかliftあればいいのにね」という話があったので、試しにcomposeっぽいものを作ってみました。
もしも既に全く同じようなのがRxSwiftにあったらごめんなさい。チョットジシンナイデス。
その前にcomposeって何よ
さっくり説明すると、
Observableのオペレーターで、引数で受け取るTransformerを自身に適応してObservableを返すTransformerはObservableを受け取って、それを加工してObservableを返すもの
のようです。さっくりですみません。
詳しくはこちらでしっかりとした解説があります。
flatMapと似てそうですが、
flatMapが元のObservableから送信されたイベント(値)を元に新たなObservableを返却するのに対し、
composeは元のObservable自体を受け取って、それにTransformerを適応して新たなObservableを返却するので、
元のObservableを直接触れるか 、 Observableのイベント(値)を触れるか が違いになってきそうです。
このあたりの説明に関してはこちらが参考になります。
実装してみる
ということで実装してみます。
composeオペレーターはObservableTypeのextensionで実装し、それとは別にTransformerはstructで実装します。
struct ComposeTransformer<T, R> {
let transformer: (Observable<T>) -> Observable<R>
init(transformer: @escaping (Observable<T>) -> Observable<R>) {
self.transformer = transformer
}
func call(_ observable: Observable<T>) -> Observable<R> {
return transformer(observable)
}
}
extension ObservableType {
func compose<T>(_ transformer: ComposeTransformer<E, T>) -> Observable<T> {
return transformer.call(self.asObservable())
}
}
便宜上TransformerはComposeTransformerとして定義しました。
使ってみる
例えば、 Observableに対してsubscribeOnでBackgroundSchedulerを、observeOnでMainSchedulerを適応する applySchedulers(_:) という関数を考えてみます。
func applySchedulers<T>(_ observable: Observable<T>) -> Observable<T> {
return observable
.subscribeOn(SerialDispatchQueueScheduler(qos: .default))
.observeOn(MainScheduler.instance)
}
これを使って、1~5までの数字を足し合わせる処理をbackgroundで実行し、その後mainで結果を出力するストリームを組んでみます
applySchedulers(
Observable
.just([1, 2, 3, 4, 5])
.map{ $0.reduce(0, +) }
)
.subscribe({ event in
print(event)
})
書き方が良いとは言えないですね。
ここで、先ほど定義したcomposeを使ってみます。
Observable.just([1, 2, 3, 4, 5])
.map{ $0.reduce(0, +) }
.compose(ComposeTransformer<Int, Int> { observable in
return observable
.subscribeOn(SerialDispatchQueueScheduler(qos: .default))
.observeOn(MainScheduler.instance)
}).subscribe({ event in
print(event)
})
型推論がしっかり効く状態なら、<Int, Int>の部分は省略しても大丈夫です。この例だとObservable<Int>にsubscribeOnとobserveOnを付与してObservable
.compose(ComposeTransformer { observable in
return observable
.subscribeOn(SerialDispatchQueueScheduler(qos: .default))
.observeOn(MainScheduler.instance)
})
もちろん、以下のようなものをextensionで加えるのもありですが、composeとTransformerを使ったほうが任意の処理を繋げられるので良さそうです。
extension ObservableType {
func applySchedulers() -> Observable<E> {
return self.asObservable()
.subscribeOn(SerialDispatchQueueScheduler(qos: .default))
.observeOn(MainScheduler.instance)
}
}
さいごに
flatMap ≒ composeが最初なかなか飲み込めなかったのですが、ようやく理解できてきた感じでした。
いつものようにGistにまとめてあります。
余談
実はTransformerはobjectではなくて単なるclosureで良い気もしてきましたが、どっちが良いのでしょう…。
extension ObservableType {
func compose<T>(_ transformer: (Observable<E>) -> Observable<T>) -> Observable<T> {
return transformer(self.asObservable())
}
}
Observable.just([1,2,3,4,5])
.map{ $0.reduce(0, +) }
.compose({ observable in
return observable
.subscribeOn(SerialDispatchQueueScheduler(qos: .default))
.observeOn(MainScheduler.instance)
}).subscribe({ event in
print(event)
})
一度何かの変数に入れて再利用を考えると、前者の方が良さそうな気はします。
余談でした。