스케줄러
RxJava에서는 스케줄러(Scheduler)를 통해 멀티 스레드와 같은 비동기 작업을 돕습니다. RxJava에서는 Schedulers 클래스에서 제공하는 정적 팩토리 메서드를 통해 스케줄러를 설정할 수 있습니다.
val ioScheduler : Scheduler = Schedulers.io()
val computationScheduler : Scheduler = Schedulers.computation()
val trampolineScheduler : Scheduler = Schedulers.trampoline()
val newScheduler : Scheduler = Schedulers.newThread()
//for Android
val mainScheduler : Scheduler = Schedulers.mainThread()
스케줄러의 종류
IO 스케줄러
- 네트워크, 데이터베이스, 파일 시스템 환경 등의 블로킹 이슈가 발생하는 곳에서 비동기적인 작업을 위해 사용될 수 있습니다.
newThread 스케줄러
- 매번 새로운 스케줄러(스레드)를 생성합니다.
Computation 스케줄러
- 단순 반복적인 작업, 콜백 처리 그리고 다른 계산적인 작업에 사용됩니다. 블로킹 이슈가 발생하는 곳에서 사용하는 것을 추천하지 않습니다.
Trampoline 스케줄러
- 새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 큐를 생성하는 스케줄러 입니다. 모든 작업을 순차적으로 실행하는 것을 보장 합니다.
subscribeOn
과 observeOn
RxJava에서는 subscribeOn
과 observeOn
연산자를 통해 스케줄러를 이용하며, 이를통해 간단히 멀티 스레딩을 구현할 수 있습니다.
기본적으로 Observer가 선언되고 구독되는 스레드에서 동작 합니다.(안드로이드의 경우 메인스레드(UI 스레드))
@Test
fun test() {
val src: Observable<Int> = Observable.create { emitter ->
(0..2).forEach {
val threadName = Thread.currentThread().name
println("#Subs on $threadName : $it")
emitter.onNext(it)
Thread.sleep(100)
}
emitter.onComplete()
}
src.subscribe {s ->
val threadName = Thread.currentThread().name
println("#Obsv on $threadName : $s")
}
}
/**
* 실행 결과
* #Subs on main : 0
* #Obsv on main : 0
* #Subs on main : 1
* #Obsv on main : 1
* #Subs on main : 2
* #Obsv on main : 2
*/
스레드 이름을 출력해 보면 모두 메인 스레드에서 동작하는 것을 확인할 수 있습니다.
subscribeOn
연산자
subscribeOn
연산자를 사용하여 스레드를 지정 한 경우는 아래와 같습니다.
@Test
fun test() {
val src: Observable<Int> = Observable.create { emitter ->
(0..2).forEach {
val threadName = Thread.currentThread().name
println("#Subs on $threadName : $it")
emitter.onNext(it)
Thread.sleep(100)
}
emitter.onComplete()
}
src.subscribeOn(Schedulers.io())
.subscribe { s ->
val threadName = Thread.currentThread().name
println("#Obsv on $threadName : $s")
}
Thread.sleep(500)
}
/**
* 실행 결과
* #Subs on RxCachedThreadScheduler-1 : 0
* #Obsv on RxCachedThreadScheduler-1 : 0
* #Subs on RxCachedThreadScheduler-1 : 1
* #Obsv on RxCachedThreadScheduler-1 : 1
* #Subs on RxCachedThreadScheduler-1 : 2
* #Obsv on RxCachedThreadScheduler-1 : 2
*/
subscribeOn
연산자는 Observable 소스에 어떤 스케줄러를 사용하여 아이템을 발행할지 알려 줍니다.
subscribeOn
연산자만 있고 ObserveOn
연산자가 없다면 해당 스케줄러는 아이템 발행 및 구독 까지 작용합니다.
observeOn
연산자
아래는 observeOn
연산자를 사용한 예시 입니다.
@Test
fun test() {
val src: Observable<Int> = Observable.create { emitter ->
(0..2).forEach {
val threadName = Thread.currentThread().name
println("#Subs on $threadName : $it")
emitter.onNext(it)
Thread.sleep(100)
}
emitter.onComplete()
}
src.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe { s ->
val threadName = Thread.currentThread().name
println("#Obsv on $threadName : $s")
}
Thread.sleep(500)
}
/**
* 실행 결과
* #Subs on RxCachedThreadScheduler-1 : 0
* #Obsv on RxComputationThreadPool-1 : 0
* #Subs on RxCachedThreadScheduler-1 : 1
* #Obsv on RxComputationThreadPool-1 : 1
* #Subs on RxCachedThreadScheduler-1 : 2
* #Obsv on RxComputationThreadPool-1 : 2
*/
observeOn
연산자를 이용하면 Observable에서 발행된 아이템을 가로채어 해당 스케줄러로 아이템을 구독합니다.
아이템 발행시(subscribeOn
)와 구독 시(observeOn
)에 스레드의 이름이 다른것을 확인할 수 있습니다.
interval, timer, replay, buffer
등의 연산자는 computation 스케줄러로 고정되어 다른 스케줄러를 지정하더라도 무시 됩니다.
참고자료