9 minute read

★Rx의 4가지 개념 : Observable, Observer, Subscriber, Subject

  • 발행자와 수신자 분류
    1. 발행자 : Observable, Single, Maybe, Subject, Completable
    2. 수신자 : Subscriber, Observer, Consumer(Observer를 함수단위로 쪼개놓은 함수형 인터페이스). 이 개념들은 혼용된다.
  • Observable
    • Observable : Observer패턴을 구현한다. 객체의 상태 변화를 관찰하는 옵저버객체의 목록을 갖고 있다가 상태 변화가 발생하면 전달한다. onNext(), onComplete(), onError() 등 3종류의 알림이 있다. Observable 클래스는 직접 생성자를 사용하여 인스턴스를 만들지 않고, 팩토리 함수를 사용한다. create(), just(), from(), fromArray(), fromIterable(), fromCallable(), fromFuture(), fromPublisher(), interval(), range(), defer()…
      • just() : 인자로 넣은 이미 존재하는 단일 데이터를 발행한다. 최대 10개까지 인자로 넣을 수 있다.
      • subscribe() : Observable은 팩토리 함수로 데이터의 흐름을 정의한 후 subscribe()를 호출해야만 비로소 데이터 발행을 실행한다. 즉 데이터 발행 시점을 조절할 수 있다.
        • 인자의 갯수에 따라 onNext, onComplete, onError중 어떤 알림을 받을 지 정한다.
        • Disposable 인터페이스 객체를 리턴한다. 이 객체는 Observable이 더 이상 데이터를 발행하지 않도록 구독을 해지하는데 사용한다(dispose()).
        • 모든 종류의 Observable은 내부적으로 subscribeActual()을 공통적으로 호출하게 된다.
        • subsribeActiaul()은 인터페이스의 함수이고, 각 Source Observable의 종류별로 다르게 구현되어 있다.
      • 실행 알고리즘을 명시하지 않고 무엇을 실행할 것인지 만을 명시한다는 점에서 RxJava는 선언형 프로그래밍을 지향한다고 한다.
      • create() : just와 다르게 직접 onNext()를 호출해 줘야 한다. 마지막에는 onComplete()를 호출 create()의 인자에는 ObservableEmitter를 넣는다.
      • 람다 표현식의 장점 : 람다를 사용하지 않으면 subscribe(Consumer)함수의 원형을 알아야 하고, Consumer클래스의 메소드도 매번 입력해줘야 하며 T의 타입을 일일이 맞추는 것도 까다롭다. 람다 표현식은 메소드의 원형을 지키지 않아도 되어 가독성이 높고, 개발자가 모두 기억할 필요도 없다.
      • fromArray() : 배열에 있는 데이터를 발행, 기본자료형 배열은 박싱클래스의 배열로 변환 후 사용.
      • fromIterable() : Iterator 인터페이스의 데이터를 발행.
      • fromCallable() : callable 인터페이스의 call메소드 실행결과를 발행. emitter가 따로 없고 마지막에 반환하는 값이 반환값이 된다. API호출 등에 유용하게 사용할 수 있고, dispose()도 자동으로 됨.
      • fromFuture() : Future객체의 실행 결과를 발행. (Executor 인터페이스를 구현한 클래스에 Callable객체를 인자로 넣어 Future객체를 반환하게 함. get()을 호출하면 Callable 객체에서 구현한 결과가 나올 때 까지 블로킹 됨.)
      • fromPublisher() : Publisher가 발행하는 데이터를 발행.
    • Single : 딱 하나의 데이터를 발행하기 위함. API통신에 주로 사용. 데이터가 발행되면 onSuccess()호출. null을 발행하지 않음. onNext()와 onComplete()가 onSuccess()로 통합. Single.fromObservable(), Observable.single() 등으로 얻을 수 있음. 데이터를 발행한 후 알아서 dispose됨.

    • Maybe : Single인데 데이터가 있을수도, 없을 수(Nullable하다는 말)도 있음. 값이 있으면 onSuccess(), null이면 onComplete(). Single 클래스에 onComplete()가 추가. 데이터를 발행한 후 알아서 dispose됨.

    • Completable : 이벤트를 실행만 하고 결과는 필요없을 때 사용. 이벤트가 끝나면 onComplete()호출.

    • Cold Observable : 옵저버가 옵저빙할 준비가 되면 발행을 시작함. 옵저버는 처음부터 옵저빙이 보장 됨. 쉽게말해 subscribe()를 호출하지 않으면 데이터발행을 시작하지 않음. 레트로핏이나 create로 Observable을 만들었을 경우 등 대부분의 경우에 사용. 원타임 쿼리(API나 DB 등). subscribe()의 타이밍이 중요하지 않음. 옵저버 관점에서 subscribe()는 notify의 의미

    • Hot Observable : 아이템이 생성되면 바로 발행. 옵저버는 시퀀스의 중간쯤 부터 옵저빙. 구독자 존재여부에 관계없이 데이터를 발행. 키보드, 마우스, 시스템이벤트, 센서 데이터, 주식 가격 등이 있음. Subject나 ConnectableObservable등. 언제 subscribe()하는지에 따라 받는 데이터가 다름. 옵저버 관점에서 subscribe()는 observers.add() 의미
  • Subject : Observable이면서 구독자. Cold를 Hot으로 바꿔준다. 새로운 아이템을 발행하거나 옵저빙한 아이템을 바이패스 할 수도 있음. 1. PublishSubject : 일반적인 Subject. subscribe()이후의 아이템을 받는다. 아이템의 발행 타이밍은 중요하지 않으며 발행했을 때의 상황만 고려하면 됨. create()로 생성. 자주 사용함. 2. BehaviorSubject : subscribe()하면 가장 최근에 관찰된 아이템과 그 이후의 아이템을 받는다. PublishSubject와 비슷한데 최근값 하나를 먼저 받고 시작함. State의 관리에 많이 써야 함. createDefault()로 초기값과 함께 생성. 자주 사용함. 3. ReplaySubject : 모든 아이템을 버퍼에 저장해 두고 옵저버에게 발행함 4. AsyncSubject : 발행이 완료되었을 때 마지막 값만을 옵저버에게 발행. create()로 생성. onComplete()되었을 때의 마지막 값만 발행함. 잘 안씀.

  • ConnectableObservable : Cold Observable을 Hot으로 변환한다.
    • publish() : Observable.publish()로 생성. subscribe()호출 이후 connect()를 호출해야 구독자들에게 데이터를 발행하기 시작한다. connect()이후 subscribe()를 호출한 구독자는 이후부터 발행된 데이터를 전달받음. connect()이후는 Hot Observable이 된다.
    • refCount() : publish()이후에 호출한다. connect()하지 않아도 바로 발행 시작한다. 옵저버 수가 0이 되면 다시 처음부터 데이터를 발행한다.
    • share() : publish().refCount()와 정확히 똑같다. 그냥 네이밍만 구독자 관점에서 한 것.
  • Thread관리 Schedulers : 세부작업 관리자 개념. 어떤 동작을 실행할 스레드를 지정할 수 있게 해줌.
    • subscribeOn() : subscribe()를 호출해 구독하는 스레드를 지정. 처음 지정한 스레드가 고정됨.
    • observeOn() : 데이터 흐름이 처리될 때 동작이 일어날 스레드를 지정. 체이닝 단계에 따라 변경 가능.
    • 예를 들어 Observable.create {…}에서 일어나는 일은 subscribeOn()에 지정된 스레드에서, subscribe{…}에서 일어나는 일은 observeOn()에 지정된 스레드에서 실행된다.

    • 종류(Schedulers)
      1. newThread() : 새 스레드
      2. single() : 싱글 스레드하나를 생성하여 돌려씀.
      3. computation() : 입출력없는 계산용. 스레드풀의 스레드 갯수는 프로세서 갯수와 동일.
      4. io() : IO. 필요할 때마다 스레드를 계속 생성.
      5. trampoline() : 트램펄린. 새 스레드 생성없이 현재 스레드에 이벤트 큐를 생성. 주로 테스트할 때 많이 사용
      6. executer를 변환하여 사용 : Schedulers.from(…). 기존의 Executer와의 호환용.
  • RxJava의 함수형 인터페이스
    1. Predicate : test(t: T) -> Boolean : t값을 입력받아 Boolean을 반환.
    2. Consumer : accept(t: T) : t값을 받아 처리. 반환 없음
    3. Function<T, R> : apply(t: T) -> R : t값을 받아 R타입 결과를 반환, BiFunction<T, U, R>은 입력 타입 2개인 Function
  • RxJava의 연산자(Operator)들 : Kotlin이 가진 Collectio Operator의 아이디어를 그대로 채용함.

    • 생성 연산자 : just(), fromXXX(), create(), range(), timer(), intervalRange(), defer(), repeat()
      • interval(Long, TimeUnit) : Long, TimeUnit간격으로 정수를 발행
      • timer(Long, TimeUnit) : 일정 시간 후 데이터를 딱 한번 발행
      • range(start, count) : start부터 count개 만큼 Int를 발행
      • intervalRange(start, count, initialDelay, period, unit)
        initialDelay후 start부터 count개를 period 간격으로 발행
      • defer(Callable) : Callable에서 반환된 Observable을 발
      • repeat(count) : count회 반복발행

      • interval 등 시간관련 연산자는 앱이 활성상태일 때만 정상 동작한다. 아마 도즈모드에서의 CPU가 유휴상태이기 때문일 것이다. 따라서 타이머 등의 기능을 구현할 때 앱이 유휴 -> 활성상태로 전환되는 경우 시간의 보정이 필요하다.
    • 변환 연산자
      • map(Function<T, R>) -> Observable
        T를 R로 변환하여 발행
      • flatMap(Function<T, Observable>) -> Observable
        T를 R로 변환 후 R로부터 만들어진 Observable을 발행, 일대다 관계가 가능해짐. 이 Observable간에, 즉 데이터간 인터리빙이 가능하다. 1,2,3 -> (1,1,2,1,3,2,3,2,3)
      • concatMap()
        flatMap과 같으나 인터리빙이 불가능하다. 즉 순서가 뒤바뀔 수 없고 약간 느리다. 1,2,3 -> (1,1,1,2,2,2,3,3,3)
      • switchMap()
        concatMap()처럼 순서를 보장하지만 인터리빙이 생기려하면 기존의 작업을 취소한다. 가장 최근 값만을 사용하고싶을 때 사용할 수 있음. 1,2,3 -> (1,2,3,3,3)
      • groupBy()
        groupBy()에서는 각 데이터요소에 대한 키 값을 반환한다. 이후 키를 기준으로 데이터를 구분하여 GroupedObservable을 발행한다. key값을 참조할 수 있어 filter등에 이용할 수 있다. 1,2,3,1,2,3 -> (1,1), (2,2), (3,3)
      • scan()
        reduce()와 비슷하나 중간 결과를 계속 반환한다. scan()과 reduce()는 필요에 따라 대체 가능한 듯.
    • 필터 연산자
      • filter(Predicate) -> Observable
        T를 이용해 Boolean을 반환하며 true일 때만 해당 값을 발행.
      • first(default), last(default), take(N), takeLast(N), skip(N), skipLast(N)
        이름값 함.
    • 결합 연산자 : 복수의 Observable을 결합
      • zip(ObservableSource, ObservableSource, BiFunction<T1,T2,R>)
        -> Observable
        두 Observable의 결과 하나씩을 결합해 R타입의 Observable을 이룬다. 기존 2가지의 Observable이 모두 발행되면 결과 Observable의 데이터는 그 것을 기반으로 발행된다. 발행속도에 관계없이 무조건 차례대로 같은 회차에 발행된 것들끼리 소비하여 발행해 감. Api작업시 유용하게 사용할 수 있음. 동시에 수행한다.
      • combineLatest(ObservableSource, ObservableSource, BiFunction<T1,T2,R>)
        -> Observable
        zip과 비슷하나 둘 중 하나만 발행되면 각 Observable의 최신 데이터만을 이용해 결과 데이터를 발행한다.
      • merge(ObservableSource, ObservableSource...) -> Observable
        여러 플로우를 단지 합치는 역할. 합성이 안된다. Completable의 merge는 zip과 같다.
      • concat(ObservableSource, ObservableSource) -> Observable
        입력 Observable순서대로 데이터를 발행함. 반드시 onComplete()를 호출해야 다음 Observable로 넘어가며 이는 메모리 누수 위험을 암시함.
    • 조건 연산자
      • amb(Interable<Observable>) -> Observable
        여러 Observable중 가장 먼저 데이터를 발행하는 Observable만 채택.
      • takeUntil(Observable) -> Observable
        인자로 받은 Observable이 데이터를 발행할 때 까지만 현재 Observable이 데이터를 발행
      • skipUntil(Observable) -> Observable
        인자로 받은 Observable이 데이터를 발행하기 시작하면 현재 Observable이 데이터를 발행
      • all(Predicate) -> Single
        Observable의 모든 데이터를 Predicate에서 판정하여 Boolean을 발행
    • 기타
      • reduce(BiFunction<T,T,T>) -> Maybe
        입력값A와 변환값B를 이용해 반환값 C를 도출 후 C를 다시 입력함. 입력값들을 모두 반영할 때 사용, 반환값은 Maybe
      • delay(Long, TimeUnit) : 일정 시간후 데이터 발행
      • timeInterval() -> Observable<Time>
        데이터 발행 소요시간과 데이터를 함께 발행
  • 메모리 누수 : Acivity가 사라져도 Observer가 Context를 참조하고 있으면 메모리가 해제되지 않는다. 따라서 수동으로 Observable이 가진 Observer을 제거해 연결을 끊어버려야 한다. 그 메소드가 Disposable.dispose()임. Observer자체가 Disposable의 구현체이며 Disposable은 subscribe(Consumer)나 subscribeWith(Observer)의 반환값이다. CompositeDisposable에 add()하여 한번에 dispose()할 수 있다. +=오퍼레이터로도 추가할 수 있음. Single, Completable의 경우 onComplete(), onError()의 경우 자동 dispose()된다.

    • dispose()필요 케이스 : Observer가 View에 관한 동작을 포함할 때. Fragment와 같이 View가 사라질 가능성이 있을 때. Observable이나 Flowable은 직접 dispose()해 줘야 한다.
  • 디버깅
    • Hooking 함수들
      • doOnNext() : 어떤 데이터를 발행하기 전 호출됨.
      • doOnComplete() : Observable이 Complete()되기 전 호출됨.
      • doOnError() : onError()가 호출되기 전 호출됨.
      • doOnEach() : 위 세가지를 한꺼번에 처리.
      • doOnSubscribe() : subscribe할 때 호출 됨
      • doOnDispose() : dispose할 때 호출 됨.
      • doOnLifecycle() : 위 두가지를 한거번에 처리.
      • doOnTerminate() : onError(), onComplete()를 한꺼번에 처리.
      • doOnFinally() : onError(), onDispose(), onComplete()를 한꺼번에 처리
    • 예외 처리 : RxJava에서는 try~catch를 이용하여 에러처리를 할 수 없다. onError(Exception())으로 에러 객체를 전달하더라도 내부의 Consumer의 accept메소드에 인자로 전달하고있고, 메소드의 구현은 subscribe()의 함수의 두번째 파라메터에서 하게 되어있다. 따라서 onError()를 구현하지 않았다면 onErrorNotImplementedException만 뜨게 된다.
      • 한편 onError()는 발생 순간 데이터 플로우가 바로 중단되고, 모든 구독자가 모든 에러가능성을 짐작하기 어렵다. 그래서 Observable에서 선처리를 하여 구독자가 에러발생을 알 수 있도록 하는 것이 좋다.
      • onErrorReturn(Function<Throwable, T>)
        에러가 발생했을 때 다른 데이터로 대체하여 발행
      • onErrorReturnItem(T)
        onErrorReturn()과 같음, 에러객체를 전달하지 않고 사용
      • onErrorResumeNext(Observable)
        에러가 발생했을 때 인자로받은 Observable로 플로우를 대체, 로깅이나 추가 작업이 필요한 경우 사용.
      • retry(BiPredicate<Int, Throwable>) -> Observable
        onError()이벤트가 발생하면 subscribe()를 재호출함. 기본 무조건, 무한정 재시도 하는데, 재시도 횟수(Int값)나 재시도 조건(BiPredicate의 결과값)을 설정할 수 있다.
      • retryUntil(BooleanSupplier) -> Observable
        retry와 비슷. 다만 재시도를 중단할 조건이 충족되면 재시도를 중단한다. BooleanSupplier는 인자없이 Boolean을 반환하는 함수형 인터페이스이다. true를 반환하면 재시도를 중단.
      • retryWhen(Function<Observable, Observable<?>) -> Observable
        Function의 반환인 Observable가 값을 발행할 때 마다 재시도. ex) 무조건 1초 간격 3회 재시도 .retryWhen { attempt -> Observable.timer(1, TimeUnit.SECONDS).repeat(3) }
      • onError대신에 Observer를 상속받아 onError를 디폴트로 만들어 두면 활용하면 편하게 사용할 수 있음. 강사님 꿀팁. View제어 등의 경우는 오버라이드 할 수 있음
    • RxJavaPlugin : RxJava 전체에 어떠한 설정을 할 수 있는 클래스. 따라서 내부적으로 모든 케이스에 등장한다.
      • setXXSchedulerHandler…
      • setErrorHandler(Consumer) : 에러가 발생했을 때 Observer에 onError가 구현되지 않았다면 이 메서드의 Consumer가 작동한다. 커스터마이징을 여기도 하고 각 케이스에서도 하는 것이 좋다. 또한 onError에서 Error가 발생할 때도 이 메서드로 온다.
      • onAssembly…
    • 흐름 제어 : Observable의 발행 속도와, Observer의 처리 속도의 차이가 발생했을 때 제어해야 한다.
      • sample(Long, TimeUnit) : 일정 시간 간격동안 발행된 데이터 중 가장 최근의 데이터만 발행. 샘플링임
      • buffer(Int) -> Observable<List>
        일정 시간 간격동안 발행된 데이터를 모아 List형태로 발행.
      • throtleFirst/Last(Long, TimeUnit) : 지정된 시간동안 데이터를 발행하지 않도록 막음. debounce로 버튼이벤트 제어하는 것 보다 자연스럽게 할 수 있음.
            //  ViewBinding을 사용하지 않고 버튼 이벤트를 Observable로 사용
            class ClickObservable {
                fun makeObservable(view: View): Observable<Boolean> {
                    return Observable.create<Boolean> { emitter ->
                        view.setOnClickListener {
                            emitter.onNext(true)
                        }
                    }
                }
            }
            
    * window(Long, TimeUnit) -> Observable<T>
        : groupBy처럼 데이터를 그룹화한 Observable을 발행. buffer를 Observable로 바꾼거라고 보면 됨. 시간, 갯수 등을 지정할 수 있음
    * debounce(Long, TimeUnit) : 데이터입력 후 일정시간 동안 새 데이터가 들어오지 않았을 때만 발행. 
        버튼 이벤트 같은데에 쓸 수 있음. Single로는 사용할 수 없음. 지속적으로 구독해야하기 때문.
        debounce - switchMap은 검색기능에서 국룰임.
  • BackPressure 이슈 : 발행에 비해 소비가 너무 느려서 정상적으로 실행이 되지 않거나 폭발하는 현상.
    • debounce(), throtleFirst(), buffer(), window()등의 함수로 소비 정책을 세워 대응할 수 있음.
  • Flowable : Backpressure 이슈를 위해 별도 분리한 특수 Observable Observable과 사용법은 동일하며 상호간의 변환도 쉽게 가능. BackPressureStrategy를 선택할 수 있다.

    • Observable 선택 기준
      1. 초당 1000개 미만의 데이터 흐름, OOM위험 없는 경우
      2. GUI 프로그래밍시
      3. 데이터 흐름이 본질적으로 동기방식이지만 플랫폼이 자바 Stream API나 그에 준하는 기능을 제공하지 않을 때. Observable은 보통 Flowable보다 성능 오버헤드가 낮다.
    • Flowable 선택 기준
      1. 10000개 이상의 데이터 흐름.
      2. 디스크에서 파일을 읽어들일 경우.
      3. JDBC를 활용해 데이터를 쿼리하는 경우.
      4. 네트워크 IO를 실행하는 경우.
      5. 다수의 블로킹 방식을 사용하거나 pull-based방식의 데이터 소스가 미래에는 논 블로킹 방식의 리액티브 API나 드라이버를 제공할 수도 있는 경우.
    • 디스크IO, JDBC쿼리, 네트워크IO 등은 Cold Observable에 해당하므로 결과데이터를 모두 한번에 가져온다. 이 경우에도 업스트림에서 발행하는 속도와 다운스트림에서 처리하는 속도가 크기 않다면 Observable을 활용해도 되며 차이가 나더라도 sample(), throttle(), debounce()같은 제어 함수를 사용하는 것이 좋다. 그럼에도 해결하기 어려울 때 Flowable클래스로 전환한다.

    • BackPressure 이슈 대응 함수
      • onBackpressureBuffer(capacity, onOverFlow, overflowStrategy)
        배압 이슈 발생 시 별도의 버퍼에 저장. 기본 128개의 버퍼를 가짐
        1. capacity : 버퍼의 갯수
        2. onOverFlow : 버퍼가 넘쳤을 때의 Action
        3. overflowStrategy : 버퍼가 가득찼을 때 추가로 실행하는 전략
          • ERROR : MissingBackpressureException을 던지고 데이터 발행 중단
          • DROP_LATEST : 버퍼에 쌓여 있는 최근 값 제거
          • DROP_OLDEST : 버퍼에 쌓여 있는 오래된 값 제거
      • onBackpressureDrop() : 배압 이슈 발생 시 데이터를 무시. 한 아이템을 구독하는 동안 발행되는 데이터는 다 버린다.
      • onBackpressureLatest() : 쌓이는 데이터를 무시하면서 최신 데이터만 유지
  • 꿀팁 : Operator를 모르겠다? 그러면 커맨드클릭해서 내용 보면 그림있는 문서의 링크가 있음. 어렵지 않게 참고 가능.

  • EventBus : 전역적으로 이벤트를 전달할수 있게 해주는 개념.

  • RxBus : EventBus를 RxSubject로 구현하는 개념. 어떤 작업이 종료되었음을 알리는데 주로 사용. 남발하면 큰일난다. 비동기 통신 종료알림, 서비스 전제를 어긋내는 전역 Exception, DataObserver(특정 데이터를 추적)등에서 사용.

  • 데코레이터 패턴

Categories:

Updated: