오늘은 좀 더 원론적인 이야기를 해보고자 합니다. 이전 포스트에서 연산자도 모두 Observable이라고 했습니다. 그런데 살펴보면 이 연산자들이 모두 공통적으로 상속받는 객체가 있는데, 이 객체의 이름은 Producer입니다. 오늘은 이 Producer의 정체에 대해서 알아보도록 하겠습니다.

Producer의 핵심을 시그니처만 남겨보도록 하겠습니다.

class Producer<Element> : Observable<Element> {
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element

    func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element 
}

우선 Producer는 Observable을 상속 받습니다. 이는 위에서 말했듯이 연산자들이 모두 Observable이고, 모든 연산자의 부모 클래스이기 때문에 당연하다 볼 수 있습니다.

이 중 run 메소드의 본체는 다음과 같습니다.

func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
    rxAbstractMethod()
}

즉 추상 메소드입니다. 이 run 연산은 producer를 상속하는 연산자 Observable들이 구현하게 됩니다. 반환 값에서 유추하면 두개의 Disposable을 튜플 형태로 반환하게 되는데, sink는 현재 연산자 Observable을 나타내고(이것의 정체는 다음 포스트에서 자세히 알아보도록 하겠습니다.) subscription운 자신의 상위 Observable들을 구독하고 나온 Disposable입니다.

그러면 실제 Producer 클래스가 정의하는 subscribe 메소드는 어떻게 생겼을까요?

  override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // 반환되는 disposable은 dispose 되면 자기가 가진(혹은 가지게 될)
            // 모든 참조를 해제할 의무가 있습니다.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

if문으로 분기되는 것은 현재의 스레드에서 돌게 만드는 스케쥴러 관련 코드고, 핵심적인 부분은 위아래가 동일한 괄호 안입니다.

  1. SinkDisposor를 만듭니다. 이를 disposer라고 하겠습니다.

  2. run 메소드를 실행시켜 Disposable 인스턴스를 얻습니다.

  3. disposer에 2번에서 얻은 disposable들을 세팅합니다.

  4. disposer를 반환합니다.

이 subscribe 메소드는 다양한 subscribe 메소드들 중에서도 가장 기본이 되는 메소드입니다. 여기서 반환되는 SinkDisposer는 또 무엇일까요? 이 역시 하나하나 살펴보겠습니다. 프로퍼티와 시그니처만 남긴 선언만 우선 보면 다음과 같습니다.

private final class SinkDisposer: Cancelable {
    private enum DisposeState: Int32 {
        case disposed = 1
        case sinkAndSubscriptionSet = 2
    }

    private let _state = AtomicInt(0)
    private var _sink: Disposable?
    private var _subscription: Disposable?

    var isDisposed: Bool {
        return isFlagSet(self._state, DisposeState.disposed.rawValue)
    }

    func setSinkAndSubscription(sink: Disposable, subscription: Disposable) 

    func dispose() 
}

내부적으로는 Disposable 두개를 가지고, 현재 상태는 나타내기 위한 원자적으로 동작하는 정수 값을 가집니다. setSinkAndSubscription 메소드는 다음과 같이 정의되어 있습니다.

func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
    self._sink = sink
    self._subscription = subscription

    // 새로운 상태를 설정하고, 이전 설정 값을 반환한다.
    let previousState = fetchOr(self._state, DisposeState.sinkAndSubscriptionSet.rawValue) 

    // 이전 구독이 dispose되지 않았는데 새로운 구독을 설정하면 런타임 에러를 발생시킨다.
    if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 { 
        rxFatalError("Sink and subscription were already set")
    }

    // 이미 dispose되있었다면, 새로운 구독도 dispose시키고 메모리에서 해제시킨다.
    if (previousState & DisposeState.disposed.rawValue) != 0 {
        sink.dispose()
        subscription.dispose()
        self._sink = nil
        self._subscription = nil
    }
}

dispose 메소드는 다음과 같습니다.

 func dispose() {
        let previousState = fetchOr(self._state, DisposeState.disposed.rawValue)
        
        // 이미 dispose되어 있었다면 아무것도 하지 않는다.
        if (previousState & DisposeState.disposed.rawValue) != 0 {
            return
        }

        // 구독이 있었던 상태였다면 이를 모두 해제한다.
        if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
            guard let sink = self._sink else {
                rxFatalError("Sink not set")
            }
            guard let subscription = self._subscription else {
                rxFatalError("Subscription not set")
            }

            sink.dispose()
            subscription.dispose()

            self._sink = nil
            self._subscription = nil
        }
    }

이를 통해 dispose() 호출 될 때, dispose된 Observable을 구독할 때 모두 정상적으로 해제가 일어남을 알 수 있습니다. 이것이 바로 disposable을 통해 우리가 메모리를 해제할 수 있는 비결입니다. 다음 포스트에서는 아직 알아보지 않은 Sink에 대해서 알아보도록 하겠습니다.