이번 포스트에서는 Producer에서 등장했던 Sink에 대해서 알아보도록 하겠습니다. Sink라는 이름은 ‘이벤트 스트림(stream)이 흘러 들어가는 곳’이라는 의미로 추정이 됩니다. 연산자 정의는 내부에 각자 Sink 클래스를 상속받아서 구현하고 있습니다. Sink 클래스를 살펴보겠습니다.(디버깅 관련 코드는 모두 생략했습니다.)

연산자 클래스는 Observable로써의 역할과 Observer로써의 면모를 모두 가지고 있습니다. Observable로써의 면모를 보여주는 게 Producer라면, Observer로써의 면모를 보여주는 것이 바로 Sink 객체 입니다.

class Sink<Observer: ObserverType> : Disposable {
    fileprivate let _observer: Observer // 해당 Sink를 통해 데이터를 제공받는 객체
    fileprivate let _cancel: Cancelable // Sink를 Dispose하기 위한 Disposable
    private let _disposed = AtomicInt(0)

    init(observer: Observer, cancel: Cancelable) {
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<Observer.Element>) { // Sink에 이벤트를 전달하기 위한 메소드
        if isFlagSet(self._disposed, 1) { // dispose된 상태라면 이벤트를 처리하지 않는다.
            return
        }
        self._observer.on(event) // 이벤트를 Observer로 흘려 보낸다.
    }

    final func forwarder() -> SinkForward<Observer> { // Sink를 Observer 형태로 변환해준다.
        return SinkForward(forward: self)
    }

    final var disposed: Bool {
        return isFlagSet(self._disposed, 1)
    }

    func dispose() {
        fetchOr(self._disposed, 1)
        self._cancel.dispose()
    }
}

보다시피, sink 자체는 Observer가 아닙니다. 하지만 각 연산자들은 ObserverType 프로토콜을 채택해서 이벤트를 받습니다. 이를 이해하기 위해서, 가장 간단한 Sink 구현체중 하나인 filter를 살펴보도록 하겠습니다.

final private class FilterSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Predicate = (Element) throws -> Bool
    typealias Element = Observer.Element
    
    private let _predicate: Predicate
    
    init(predicate: @escaping Predicate, observer: Observer, cancel: Cancelable) {
        self._predicate = predicate
        super.init(observer: observer, cancel: cancel)
    }
    
    func on(_ event: Event<Element>) { // ObserverType의 필수 메소드
        switch event {
        case .next(let value):
            do {
                let satisfies = try self._predicate(value) // 이벤트를 흘려 보낼 지 결정한다.
                if satisfies {
                    self.forwardOn(.next(value)) // Sink로 이벤트를 흘려보낸다.
                }
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        case .completed, .error:
            self.forwardOn(event) // completed, error는 그냥 흘려 보낸다.
            self.dispose()
        }
    }
}

각 Sink 구현체는 상위 Observable을 subscribe하는 Observer가 되어, 이벤트를 각자 처리해서 자신이 프로퍼티로 가지고 있는 Observer에게 흘려보냅니다. 이 과정을 통해서 아래로 계속 흘려보내다가, 사용자가 subscribe를 직접 호출해서 Disposable을 반환 받을 때까지 가게 되는 것입니다. 다른 연산자도 복잡할지언정, 기본적인 원리는 동일합니다.

다음으로 filter 연산자의 run함수를 살펴보도록 하겠습니다.(run 함수를 왜 살펴보는지 모른다면, 이전 포스트를 보고 오시길 바랍니다!)

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = FilterSink(predicate: self._predicate, observer: observer, cancel: cancel) 
        let subscription = self._source.subscribe(sink) // sink가 자기 위의 Observable들을 구독하여 이벤트를 받아서 처리 후,run의 인자로 받은 observer로 흘려보낸다.
        return (sink: sink, subscription: subscription) // SinkDisposer에 세팅될 Disposable들
    }

결론적으로, Sink는 연산자의 실제 처리를 담당하는 가장 중추적인 클래스라고 할 수 있습니다.

마지막으로, 위에서 Sink를 Observer로 바꿔주는 SinkForward 클래스를 확인해보겠습니다.

final class SinkForward<Observer: ObserverType>: ObserverType {
    typealias Element = Observer.Element 

    private let _forward: Sink<Observer>

    init(forward: Sink<Observer>) {
        self._forward = forward
    }

    final func on(_ event: Event<Element>) { // 별다른 처리 없이, 데이터를 observer로 흘려 보낸다.
        switch event {
        case .next:
            self._forward._observer.on(event) 
        case .error, .completed:
            self._forward._observer.on(event)
            self._forward._cancel.dispose()
        }
    }
}

이로써, 연산자가 이벤트를 처리하고 흘려보내는 비밀들을 모두 살펴보았습니다.