今日たまたま同僚のAndroidエンジニアと話をしていて、
「RxSwiftにcompose
とかlift
あればいいのにね」という話があったので、試しにcompose
っぽいものを作ってみました。
もしも既に全く同じようなのがRxSwiftにあったらごめんなさい。チョットジシンナイデス。
その前にcomposeって何よ
さっくり説明すると、
Observable
のオペレーターで、引数で受け取るTransformer
を自身に適応してObservable
を返すTransformer
はObservable
を受け取って、それを加工してObservable
を返すもの
のようです。さっくりですみません。
詳しくはこちらでしっかりとした解説があります。
flatMap
と似てそうですが、
flatMap
が元のObservable
から送信されたイベント(値)を元に新たなObservable
を返却するのに対し、
compose
は元のObservable
自体を受け取って、それにTransformer
を適応して新たなObservable
を返却するので、
元のObservableを直接触れるか 、 Observableのイベント(値)を触れるか が違いになってきそうです。
このあたりの説明に関してはこちらが参考になります。
実装してみる
ということで実装してみます。
compose
オペレーターはObservableType
のextensionで実装し、それとは別にTransformer
はstruct
で実装します。
struct ComposeTransformer<T, R> {
let transformer: (Observable<T>) -> Observable<R>
init(transformer: @escaping (Observable<T>) -> Observable<R>) {
self.transformer = transformer
}
func call(_ observable: Observable<T>) -> Observable<R> {
return transformer(observable)
}
}
extension ObservableType {
func compose<T>(_ transformer: ComposeTransformer<E, T>) -> Observable<T> {
return transformer.call(self.asObservable())
}
}
便宜上TransformerはComposeTransformer
として定義しました。
使ってみる
例えば、 Observable
に対してsubscribeOnでBackgroundSchedulerを、observeOnでMainSchedulerを適応する applySchedulers(_:)
という関数を考えてみます。
func applySchedulers<T>(_ observable: Observable<T>) -> Observable<T> {
return observable
.subscribeOn(SerialDispatchQueueScheduler(qos: .default))
.observeOn(MainScheduler.instance)
}
これを使って、1~5までの数字を足し合わせる処理をbackgroundで実行し、その後mainで結果を出力するストリームを組んでみます
applySchedulers(
Observable
.just([1, 2, 3, 4, 5])
.map{ $0.reduce(0, +) }
)
.subscribe({ event in
print(event)
})
書き方が良いとは言えないですね。
ここで、先ほど定義したcompose
を使ってみます。
Observable.just([1, 2, 3, 4, 5])
.map{ $0.reduce(0, +) }
.compose(ComposeTransformer<Int, Int> { observable in
return observable
.subscribeOn(SerialDispatchQueueScheduler(qos: .default))
.observeOn(MainScheduler.instance)
}).subscribe({ event in
print(event)
})
型推論がしっかり効く状態なら、<Int, Int>
の部分は省略しても大丈夫です。この例だとObservable<Int>
にsubscribeOnとobserveOnを付与してObservable
.compose(ComposeTransformer { observable in
return observable
.subscribeOn(SerialDispatchQueueScheduler(qos: .default))
.observeOn(MainScheduler.instance)
})
もちろん、以下のようなものをextensionで加えるのもありですが、compose
とTransformer
を使ったほうが任意の処理を繋げられるので良さそうです。
extension ObservableType {
func applySchedulers() -> Observable<E> {
return self.asObservable()
.subscribeOn(SerialDispatchQueueScheduler(qos: .default))
.observeOn(MainScheduler.instance)
}
}
さいごに
flatMap
≒ compose
が最初なかなか飲み込めなかったのですが、ようやく理解できてきた感じでした。
いつものようにGistにまとめてあります。
余談
実はTransformerはobjectではなくて単なるclosureで良い気もしてきましたが、どっちが良いのでしょう…。
extension ObservableType {
func compose<T>(_ transformer: (Observable<E>) -> Observable<T>) -> Observable<T> {
return transformer(self.asObservable())
}
}
Observable.just([1,2,3,4,5])
.map{ $0.reduce(0, +) }
.compose({ observable in
return observable
.subscribeOn(SerialDispatchQueueScheduler(qos: .default))
.observeOn(MainScheduler.instance)
}).subscribe({ event in
print(event)
})
一度何かの変数に入れて再利用を考えると、前者の方が良さそうな気はします。
余談でした。