以前こちらのQittaの記事でtakeしてもcompletedさせない!っていうのを書きましたが、
改めてこちらで詳しく書こうかなと思います
よくハマる罠
takeオペレーターを使うと指定した回数分までイベントが流れるように制限を加えることができます。
ただ、ここでうっかりハマってしまうのが、指定回数分イベントが流れた後に、completedが送信されて完了してしまうということです。
Observable.of(1,2,3,4,5).take(2)
.subscribe { print($0) }
.disposed(by: disposeBag)
// next(1)
// next(2)
// completed
単体で使う分にはあまり影響がないのですが、他のストリームと組み合わせたりしているときに、このcompletedが流れるとつられてcompletedになってしまってイベントが流れない…なんてことになったりします。
「takeして指定回数分流れたらcompletedを流す」という通常使用でしたらそのままで良いのですが、「takeして指定回数分流れたらそのストリームは以降何も流れないようにストップさせる」という形で使うのであれば、次のようにします
take,concat,neverを組み合わせる
Rxにはneverという、以降completedにならなくなるオペレーターがあります。これをtakeと組み合わせることで、指定回数イベントを流した後にtakeから流れてくるcompletedをせき止めることができます。
組み合わせる時には、concatオペレーターを使います。 concatとmergeの違いはこちらの記事でも触れています。
extension ObservableType {
public func takeNoCompleted(_ count: Int) -> Observable<E> {
return .concat(take(count), .never())
// 略さず書くとこんな感じ
// return Observable.concat(self.take(count), Observable.never())
}
}
実際に使ってみると次のようになります。
Observable.of(1,2,3,4,5).takeNoCompleted(2)
.subscribe { print($0) }
.disposed(by: disposeBag)
// next(1)
// next(2)
// completedは流れない!
singleオペレーターとasSingle()の場合だと?
特にtake(1)で1回だけに制限したい場合に、他の代替案としてsingleオペレーターを使うことも考えられますが、2回目以降はエラーが流れてくるので注意が必要です。
Observable.of(1,2,3,4,5).single()
.subscribe { print($0) }
.disposed(by: disposeBag)
// next(1)
// error(Sequence contains more than one element.)
もしsingleで同じことを実現するには、エラーを次のように握りつぶしてあげれば良さそうです。
Observable.of(1,2,3,4,5).single()
.catchError { _ in Observable.never() }
.subscribe { print($0) }
.disposed(by: disposeBag)
// next(1)
asSingle()でSingle Unitに変換した場合も同様にエラーが流れてきます。
Observable.of(1,2,3,4,5).asSingle()
.subscribe { print($0) }
.disposed(by: disposeBag)
// error(Sequence contains more than one element.)
こちらの場合は内部的にonNextが1回きりだったか見ているため、そうでなければ値すら流れず、「Sequence contains more than one element.」のエラーが送信されます。
まとめ
takeにハマったら.debug()を使ってcompletedが流れていないか確認してみましょう。