티스토리 뷰

반응형
SMALL

스케쥴러는 프로그램의 세부 일정을 주관하는 관리자라고 생각하면 좋습니다.

그 동안 RxJava를 사용하여 구현해온 코드들은 모두 main Thread에서 동작하는 코드입니다.

그런데 실무에서는 요구 사항에 따라 비동기로 동작할 수 있도록 스레드를 지정할 수 있어야 합니다.

(메인 스레드로 고정된 것을 바꾸어야 함)

 

스케쥴러는 스레드를 지정할 수 있게 해줍니다.

새로운 스레드를 생성하거나 기존의 Executors를 활용하는 것을 넘어서 새로운 방식으로 동작이 이루어 집니다.

스레드를 만들면서 경쟁 조건이나 synchronized 키워드를 사용했다면 스케쥴러는 이와 같은 것들을 신경 쓸 필요가 없어집니다.

 

flip() 함수의 마블 다이어그램

flip()의 기능 보다는 시간 표시줄에 더 집중해서 살필 필요가 있습니다. 위 마블 다이어그램을 코드로 표현하면 아래와 같습니다.

String[] objs = {"1-S", "2-T", "3-P"};
Observable<String> source = Observable.fromArray(objs)
    .doOnNext(data -> Log.v("Original data = " + data)
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.newThread())
    .map(Shape::flip);

source.subscribe(Log::i);
CommonUtils.sleep(500);

objs에는 도형의 모양도 담겨 있습니다. Shape.flip()을 통해 도형을 뒤집고 있습니다.ㄹ

살펴볼 코드는 subscribeOn과 observeOn 코드입니다. 위 함수의 인자 값으로 무엇이 들어가고 있는지 살펴보면 Schedulers.newThread()가 들어가 있는 것을 살펴볼 수 있습니다.

이는 새로운 스레드를 생성한다는 의미입니다. doOnNext함수는 onNext가 실행될 때 원래의 값을 보여주는 함수로써 onNext가 호출될 때마다 실행되는 함수입니다.

 

subscribeOn은 구독을 할 때 어떤 스레드를 사용할 지 정하는 함수이며 observeOn은 Observable이 데이터를 발행할 때 어떤 스레드를 사용할 지 정하는 함수입니다.

 

위 코드에서 observeOn 함수를 제거한다면 어떻게 될까요? 데이터를 발행할 때에도 subscribeOn과 동일한 스레드를 사용하게 됩니다.

 

지금까지의 내용을 정리하자면 다음과 같습니다.

1. 스케쥴러는 RxJava 코드를 어느 스레드에서 실행할 지정할 수 있다.
2. subscribeOn() 함수와 observeOn() 함수를 모두 지정하면 Observable에서 데이터 흐름이 발생하는 스레드와 처리된 결과를
    구독자에게 발행하는 스레드를 분리시킬 수 있다.
3. subscribeOn() 함수만 호출하면 Observable의 모든 흐름이 동일한 스레드에서 실행된다.
4. 스케쥴러를 별도로 지정하지 않으면 현재 스레드(main Thread)에서 동작을 실행한다.

 

스케줄러의 종류(RxJava 2.0 기준)

RxJava 2.0에서는 뉴 스레드 스케줄러, 싱글 스레드 스케줄러, 계산 스케줄러, IO 스케줄러, 트램펄린 스케줄러, 메인 스레드 스케줄러의 다양한 스케줄러를 제공하고 있습니다.

 

뉴 스레드 스케줄러 (New Thread Scheduler)

새로운 스레드를 만들고 싶을 때 사용하는 스케줄러입니다. 사용법은 Schedulers.newThread()를 호출해주면 됩니다.

RxJava에서 스케줄러는 subscribeOn과 observeOn에 나눠서 적용할 수 있는데 두 함수의 개념을 완벽하게 이해하지 못한다면 오히려 더 못 사용할 수 있으니 이번에는 subscribeOn 함수에만 스케줄러를 사용하도록 하겠습니다.

String[] orgs = {"1", "3", "5"};
Observable.fromArray(orgs)
    .doOnNext(data -> Log.v("Original data : " + data))
    .map(data -> "<<" + data + ">>")
    .subscribeOn(Schedulers.newThread())
    .subscribe(Log::i);
CommonUtils.sleep(500);

Observable.fromArray(orgs)
    .doOnNext(data -> Log.v("Original data : " + data))
    .map(data -> "##" + data + "##")
    .subscribeOn(Schedulers.newThread())
    .subscribe(Log::i);
CommonUtils.sleep(500);

new Thread를 하였기 때문에 각 Observable은 각각의 스레드에서 작업을 수행하게 됩니다.

만약에 각 Observable의 마지막 부분에 CommonUtils.sleep(500);이 없다면 어떻게 될까요? 아래처럼 말이죠.

String[] orgs = {"1", "3", "5"};
Observable.fromArray(orgs)
    .doOnNext(data -> Log.v("Original data : " + data))
    .map(data -> "<<" + data + ">>")
    .subscribeOn(Schedulers.newThread())
    .subscribe(Log::i);

Observable.fromArray(orgs)
    .doOnNext(data -> Log.v("Original data : " + data))
    .map(data -> "##" + data + "##")
    .subscribeOn(Schedulers.newThread())
    .subscribe(Log::i);

계산 스케줄러

RxJava에서 추천하는 스케줄러는 크게 3가지인데 계산 스케줄러, 트램펄린 스케줄러, IO 스케줄러입니다. 

계산 스케줄러는 수차례 등장했던 친구들입니다. 

interval() 함수의 원형이 어떨까요?

@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Observable<Long> interval(long period, TimeUnit unit)

위와 같은 interval 함수는 계산 스케줄러에서 동작합니다. 만약 다른 스케줄러를 사용하고 싶다면 아래와 같이 사용 가능합니다.

@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Observable<Long> interval(long period, TimeUnit unit,
                                        Scheduler scheduler)

 

계산 스케줄러를 사용하는 방법은 아래 예시 코드와 같이 사용할 수 있습니다.

String[] orgs = {"1", "3", "5"};
Observable<String> source = Observable.fromArray(orgs)
    .zipWith(Observable.interval(100L, TimeUnit.MILLIS?ECONDS), (a, b) -> a);
    
// Subscription #1
source.map(item -> "<<" + item + ">>")
    .subscribeOn(Schedulers.computation())
    .subscribe(Log::i);
    
// Subscription #2
source.map(item -> "##" + item + "##")
    .subscribeOn(Schedulers.computation())
    .subscribe(Log::i);
    
CommonUtils.sleep(1000);

interval 함수가 있기 때문에 두 구독자가 번갈아면서 출력이 이루어집니다.

하지만 두 구독자가 구독을 거의 동시에 하고 있기 때문에 스레드를 같은 스레드에 할당해줄 수도 있습니다.

이 경우에는 출력이 모두 같은 스레드를 나타내게 됩니다.

 

IO 스케줄러

IO 스케줄러는 계산 스케줄러와는 다르게 네트워크상의 요청을 처리하거나 각종 입/출력 작업을 수행하기 위한 스케줄러입니다.

계산 스케줄러와 다른 점은 기본으로 생성되는 스레드 개수가 다르다는 것입니다.

계산 스케줄러는 CPU의 개수 만큼 스레드를 생성허지만 IO 스케줄러는 필요할 때마다 스레드를 계속 생성합니다.

입 출력 작업은 비동기이지만 결과를 얻기까지는 대기 시간이 깁니다.

 

계산 스케줄러 용도 : 일반적인 계산 작업

IO 스케줄러 용도 : 네트워크상의 요청, 파일 입출력, DB 쿼리 등

String root = "c:\\";
File[] files = new File(root).listFiles();
Observable<String> source = Observable.fromArray(files)
    .filter(f -> !f.isDirectory())
    .map(f -> f.getAbsolutePath())
    .subscribeOn(Schedulers.io());
    
    source.subscribe(Log::i);
    CommonUtils.sleep(500);

source 변수는 File[] 배열에 있는 파일의 절대 경로를 발행하는 Observable입니다. IO 스케줄러에서 실행됩니다.

출력 값은 c:\\에 있는 파일 목록이 나오게 됩니다.

 

트램펄린 스케줄러

트램펄린 스케줄러는 새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 대기 큐를 생성하는 스케줄러입니다.

트램펄린 스케줄러를 사용하는 예제는 다음과 같습니다.

String[] orgs = {"1", "3", "5"};
Observable<String> source = Observable.fromArray(orgs);

// Subscription #1
source.subscribeOn(Schedulers.trampoline())
    .map(data -> "<<" + data + ">>"
    .subscribe(Log::i);
    
// Subscription #2
source.subscribeOn(Schedulers.trampoline())
    .map(data -> "##" + data + "##"
    .subscribe(Log::i);
    
CommonUtils.sleep(500);

위 코드를 실행하면 새로운 스레드를 생성하지 않고 main Thread에서 모든 작업을 실행합니다.

큐에 작업을 넣고 실행하기 때문에 작업의 순서가 뒤바뀌는 일은 발생하지 않습니다.

 

싱글 스레드 스케줄러

싱글 스레드 스케줄러는 RxJava 내부에서 단일 스레드를 별도로 생성하여 구독작업을 처리합니다.

단, 생성된 스레드는 여러 번 구독 요청이 오더라도 공통으로 사용합니다.

Schedulers.single()로 사용할 수 있는데 리액티브 프로그래밍을 비동기 목적으로 보통 사용하고 비동기를 지향하기 때문에 싱글 스레드 스케줄러는 사용할 일은 거의 없습니다.

 

Executor 변환 스케줄러

Java에서는 java.util.curretn에서 제공하는 Executor(실행자)를 변환하여 스케줄러를 생성할 수 있습니다.

하지만 Executor와 Scheduler의 동작 방식은 다릅니다. (비추천)

기존에 사용하던 Executor를 재사용할 때에만 사용합니다.

final int THREAD_NUM = 10;

String[] data = {"1", "3", "5"};
Observable<String> source = Observable.fromArray(data);
Executor executor = Executors.newFixedThreadPool(THREAD_NUM);

source.subscribeOn(Schedulers.from(executor))
    .subscribe(Log::i);
source.subscribeOn(Schedulers.from(executor))
    .subscribe(Log::i);

CommonUtils.sleep(500);

위 코드에서 executor는 고정된 THREAD_NUM(10) 개의 스레드 풀을 생성합니다. 그리고 첫 번째 Observable과 두 번째 Observable에 subscribeOn() 함수를 호출하여 Executor 변환 스케줄러를 지정하였습니다. (Schedulers.from(executor))

만약에 executor를 생성할 때 singleThreadExecutor로 생성했다면 single()과 같이 1개의 스레드에서 실행이 될 것입니다.

 

 

이번 포스트에서는 스케줄러의 종류에 대해 살펴보았습니다.

다음 포스트에서는 스케줄러를 활용하여 콜백 지옥 벗어나기라는 주제로 스케줄러를 마무리 짓도록 하겠습니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

반응형
LIST

'프로그래밍언어 > RxJava' 카테고리의 다른 글

리액티브 연산자 - (1) 변환 연산자  (0) 2019.08.23
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함