250x250
반응형
04-28 11:37
Today
Total
«   2024/04   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
Notice
Recent Posts
Recent Comments
Link
Archives
관리 메뉴

Bill Kim's Life...

[RxSwift] Subjects( + Relay) : Subject 와 Relay 의 기본 개념 및 특징을 살펴보자 본문

CS(컴퓨터 과학)/RxSwift

[RxSwift] Subjects( + Relay) : Subject 와 Relay 의 기본 개념 및 특징을 살펴보자

billnjoyce 2020. 9. 21. 12:44
728x90
반응형
RxSwift의 Subject 및 Relay의 기본 개념 및 특징에 대해서 살펴봅니다.

 

 

#. 개발 환경

  • Xcode 11.x 이상
  • Swift 5.x 이상
  • RxSwift 5.x 이상

 

 


 

 

Subject란?

 

우리가 앱 개발을 하다보면 실시간으로 Observable에 값을 추가하고 Subscriber에게 이벤트나 값을 방출할 수 있는 것이 필요합니다.
이때 Observable이자 Observer인 것을 Subject라고 하며 Subject를 통하여 실시간으로 값들 추가하고 구독할 수 있는 것을 구현할 수 있습니다.

 

Subject = Observable + Observer

 

Subject를 사용하면 Cold Observable을 Hot Observable로 변환할 수 있다.

Cold Observable와 Hot Observable은 본 강의 마지막 부분에서 설명하도록 하겠습니다.

 

 

우선 RxSwift에서 제공되는 Subject의 종류를 살펴보면 아래와 같은 것들이 있습니다.

 

  • PublishSubject
  • BehaviorSubject
  • ReplaySubject
  • Variables : 현재는 Derprecated 됨, BehaviorRelay를 대체로 사용하여야 함

그렇다면 각 Subject에 대해서 하나씩 살펴보도록 하겠습니다.

 

 

 

 


 

 

PublishSubject

 

  • default값(초기값)이 없습니다 
  • 다른 옵져버가 subscribe한 순간 아무것도 주지 않지만 데이터가 발생하면 모두 방출합니다.
  • .completed, .error 이벤트를 통해 Subject가 종료될 때까지 지속됩니다.
  • 종료되었을 때 존재하는 구독자 뿐만 아니라 이후에 구독한 subscriber에게도 종료 이벤트를 알려줍니다.

 

 

아래의 코드는 실제 PublishSubject의 구현 코드 부분입니다.

public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType {
    public typealias SubjectObserverType = PublishSubject<Element>

    typealias Observers = AnyObserver<Element>.s
    typealias DisposeKey = Observers.KeyType
    
    /// Indicates whether the subject has any observers
    public var hasObservers: Bool {
        self._lock.lock()
        let count = self._observers.count > 0
        self._lock.unlock()
        return count
    }
    
    private let _lock = RecursiveLock()
    
    // state
    private var _isDisposed = false
    private var _observers = Observers()
    private var _stopped = false
    private var _stoppedEvent = nil as Event<Element>?

    #if DEBUG
        private let _synchronizationTracker = SynchronizationTracker()
    #endif

    /// Indicates whether the subject has been isDisposed.
    public var isDisposed: Bool {
        return self._isDisposed
    }
    
    /// Creates a subject.
    public override init() {
        super.init()
        #if TRACE_RESOURCES
            _ = Resources.incrementTotal()
        #endif
    }
    
    /// Notifies all subscribed observers about next event.
    ///
    /// - parameter event: Event to send to the observers.
    public func on(_ event: Event<Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        dispatch(self._synchronized_on(event), event)
    }

    func _synchronized_on(_ event: Event<Element>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        switch event {
        case .next:
            if self._isDisposed || self._stopped {
                return Observers()
            }
            
            return self._observers
        case .completed, .error:
            if self._stoppedEvent == nil {
                self._stoppedEvent = event
                self._stopped = true
                let observers = self._observers
                self._observers.removeAll()
                return observers
            }

            return Observers()
        }
    }
    
    /**
    Subscribes an observer to the subject.
    
    - parameter observer: Observer to subscribe to the subject.
    - returns: Disposable object that can be used to unsubscribe the observer from the subject.
    */
    public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

    func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        
        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }

    func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
        self._lock.lock()
        self._synchronized_unsubscribe(disposeKey)
        self._lock.unlock()
    }

    func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
        _ = self._observers.removeKey(disposeKey)
    }
    
    /// Returns observer interface for subject.
    public func asObserver() -> PublishSubject<Element> {
        return self
    }
    
    /// Unsubscribe all observers and release resources.
    public func dispose() {
        self._lock.lock()
        self._synchronized_dispose()
        self._lock.unlock()
    }

    final func _synchronized_dispose() {
        self._isDisposed = true
        self._observers.removeAll()
        self._stoppedEvent = nil
    }

    #if TRACE_RESOURCES
        deinit {
            _ = Resources.decrementTotal()
        }
    #endif
}

 

실제 사용 예제를 살펴보면 아래와 같습니다.

let subject = PublishSubject<Int>()

let subjectOne = subject
    .subscribe(onNext: { (num) in
        print("subjectOne :",num)
    })

subject.onNext(1)
subject.onNext(2)

let subjectTwo = subject
    .subscribe(onNext: { (num) in
        print("subjectTwo :", num)
    })

subject.onNext(3)
subject.onNext(4)
subject.onNext(5)

// subjectOne : 1
// subjectOne : 2
// subjectOne : 3
// subjectTwo : 3
// subjectOne : 4
// subjectTwo : 4
// subjectOne : 5
// subjectTwo : 5

 

 

 


 

 

 

BehaviorSubject

 

  • PublishSubject와 다르게 default(초기값) 을 넣어 주어야 합니다.
  • 값이 없을때 (초기값만 있을 때) subscribe하면 초기값을 주고, 값이 있을 때는가장 최신의 .next 이벤트를 전달합니다.
  • 중간에 error가 나면 subscribe하고 있는 모든 옵져버들한테 에러가 갑니다.
  • 주로 항상 최신 데이터로 채워놓아야 하는 경우에 사용(현재 상태 및 유저 프로필 정보 등)

 

 

마찬가지로 BehaviorSubject의 소스를 살펴보면 아래와 같습니다.

public final class BehaviorSubject<Element>
    : Observable<Element>
    , SubjectType
    , ObserverType
    , SynchronizedUnsubscribeType
    , Cancelable {
    public typealias SubjectObserverType = BehaviorSubject<Element>

    typealias Observers = AnyObserver<Element>.s
    typealias DisposeKey = Observers.KeyType
    
    /// Indicates whether the subject has any observers
    public var hasObservers: Bool {
        self._lock.lock()
        let value = self._observers.count > 0
        self._lock.unlock()
        return value
    }
    
    let _lock = RecursiveLock()
    
    // state
    private var _isDisposed = false
    private var _element: Element
    private var _observers = Observers()
    private var _stoppedEvent: Event<Element>?

    #if DEBUG
        private let _synchronizationTracker = SynchronizationTracker()
    #endif

    /// Indicates whether the subject has been disposed.
    public var isDisposed: Bool {
        return self._isDisposed
    }
 
    /// Initializes a new instance of the subject that caches its last value and starts with the specified value.
    ///
    /// - parameter value: Initial value sent to observers when no other value has been received by the subject yet.
    public init(value: Element) {
        self._element = value

        #if TRACE_RESOURCES
            _ = Resources.incrementTotal()
        #endif
    }
    
    /// Gets the current value or throws an error.
    ///
    /// - returns: Latest value.
    public func value() throws -> Element {
        self._lock.lock(); defer { self._lock.unlock() } // {
            if self._isDisposed {
                throw RxError.disposed(object: self)
            }
            
            if let error = self._stoppedEvent?.error {
                // intentionally throw exception
                throw error
            }
            else {
                return self._element
            }
        //}
    }
    
    /// Notifies all subscribed observers about next event.
    ///
    /// - parameter event: Event to send to the observers.
    public func on(_ event: Event<Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        dispatch(self._synchronized_on(event), event)
    }

    func _synchronized_on(_ event: Event<Element>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        if self._stoppedEvent != nil || self._isDisposed {
            return Observers()
        }
        
        switch event {
        case .next(let element):
            self._element = element
        case .error, .completed:
            self._stoppedEvent = event
        }
        
        return self._observers
    }
    
    /// Subscribes an observer to the subject.
    ///
    /// - parameter observer: Observer to subscribe to the subject.
    /// - returns: Disposable object that can be used to unsubscribe the observer from the subject.
    public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

    func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        let key = self._observers.insert(observer.on)
        observer.on(.next(self._element))
    
        return SubscriptionDisposable(owner: self, key: key)
    }

    func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
        self._lock.lock()
        self._synchronized_unsubscribe(disposeKey)
        self._lock.unlock()
    }

    func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
        if self._isDisposed {
            return
        }

        _ = self._observers.removeKey(disposeKey)
    }

    /// Returns observer interface for subject.
    public func asObserver() -> BehaviorSubject<Element> {
        return self
    }

    /// Unsubscribe all observers and release resources.
    public func dispose() {
        self._lock.lock()
        self._isDisposed = true
        self._observers.removeAll()
        self._stoppedEvent = nil
        self._lock.unlock()
    }

    #if TRACE_RESOURCES
        deinit {
        _ = Resources.decrementTotal()
        }
    #endif
}

 

예제로 살펴보면 아래와 같습니다.

let subject = PublishSubject<Int>()

let subjectOne = subject
    .subscribe(onNext: { (num) in
        print("subjectOne :",num)
    }, onError: { (error) in
        print("subjectOne Erorr: ", error)
    }, onCompleted: {
        print("subjectOne onCompleted")
    })

subject.onNext(1)
subject.onNext(2)

subject.onCompleted() // 추가

let subjectTwo = subject
    .subscribe(onNext: { (num) in
        print("subjectTwo :",num)
    }, onError: { (error) in
        print("subjectTwo Erorr: ", error)
    }, onCompleted: {
        print("subjectTwo onCompleted")
    })

subject.onNext(3)
subject.onNext(4)
subject.onNext(5)

// subjectOne : 1
// subjectOne : 2
// subjectOne onCompleted
// subjectTwo onCompleted

 

 

 


 

 

 

ReplaySubject

 

  • default값(초기값)이 없습니다 
  • ReplaySubject는 생성시 선택한 특정 크기만큼 일시적으로 캐시하거나 버퍼를 저장해서 최신 요소를 모두 방출합니다.
  • 다른 옵져버가 subscribe하면 여태까지 발생했던것 다 방출합니다. 
  • 최근의 일정 수의 데이터를 보여줄 경우 등에 사용하면 유용합니다.(최근 검색어 등)

 

 

RelaySubject의 소스는 아래와 같습니다.

public class ReplaySubject<Element>
    : Observable<Element>
    , SubjectType
    , ObserverType
    , Disposable {
    public typealias SubjectObserverType = ReplaySubject<Element>

    typealias Observers = AnyObserver<Element>.s
    typealias DisposeKey = Observers.KeyType

    /// Indicates whether the subject has any observers
    public var hasObservers: Bool {
        self._lock.lock()
        let value = self._observers.count > 0
        self._lock.unlock()
        return value
    }
    
    fileprivate let _lock = RecursiveLock()
    
    // state
    fileprivate var _isDisposed = false
    fileprivate var _isStopped = false
    fileprivate var _stoppedEvent = nil as Event<Element>? {
        didSet {
            self._isStopped = self._stoppedEvent != nil
        }
    }
    fileprivate var _observers = Observers()

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    func unsubscribe(_ key: DisposeKey) {
        rxAbstractMethod()
    }

    final var isStopped: Bool {
        return self._isStopped
    }
    
    /// Notifies all subscribed observers about next event.
    ///
    /// - parameter event: Event to send to the observers.
    public func on(_ event: Event<Element>) {
        rxAbstractMethod()
    }
    
    /// Returns observer interface for subject.
    public func asObserver() -> SubjectObserverType {
        return self
    }
    
    /// Unsubscribe all observers and release resources.
    public func dispose() {
    }

    /// Creates new instance of `ReplaySubject` that replays at most `bufferSize` last elements of sequence.
    ///
    /// - parameter bufferSize: Maximal number of elements to replay to observer after subscription.
    /// - returns: New instance of replay subject.
    public static func create(bufferSize: Int) -> ReplaySubject<Element> {
        if bufferSize == 1 {
            return ReplayOne()
        }
        else {
            return ReplayMany(bufferSize: bufferSize)
        }
    }

    /// Creates a new instance of `ReplaySubject` that buffers all the elements of a sequence.
    /// To avoid filling up memory, developer needs to make sure that the use case will only ever store a 'reasonable'
    /// number of elements.
    public static func createUnbounded() -> ReplaySubject<Element> {
        return ReplayAll()
    }

    #if TRACE_RESOURCES
        override init() {
            _ = Resources.incrementTotal()
        }

        deinit {
            _ = Resources.decrementTotal()
        }
    #endif
}

private class ReplayBufferBase<Element>
    : ReplaySubject<Element>
    , SynchronizedUnsubscribeType {
    
    func trim() {
        rxAbstractMethod()
    }
    
    func addValueToBuffer(_ value: Element) {
        rxAbstractMethod()
    }
    
    func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        rxAbstractMethod()
    }
    
    override func on(_ event: Event<Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        dispatch(self._synchronized_on(event), event)
    }

    func _synchronized_on(_ event: Event<Element>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        if self._isDisposed {
            return Observers()
        }
        
        if self._isStopped {
            return Observers()
        }
        
        switch event {
        case .next(let element):
            self.addValueToBuffer(element)
            self.trim()
            return self._observers
        case .error, .completed:
            self._stoppedEvent = event
            self.trim()
            let observers = self._observers
            self._observers.removeAll()
            return observers
        }
    }
    
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

    func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
     
        let anyObserver = observer.asObserver()
        
        self.replayBuffer(anyObserver)
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        else {
            let key = self._observers.insert(observer.on)
            return SubscriptionDisposable(owner: self, key: key)
        }
    }

    func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
        self._lock.lock()
        self._synchronized_unsubscribe(disposeKey)
        self._lock.unlock()
    }

    func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
        if self._isDisposed {
            return
        }
        
        _ = self._observers.removeKey(disposeKey)
    }
    
    override func dispose() {
        super.dispose()

        self.synchronizedDispose()
    }

    func synchronizedDispose() {
        self._lock.lock()
        self._synchronized_dispose()
        self._lock.unlock()
    }

    func _synchronized_dispose() {
        self._isDisposed = true
        self._observers.removeAll()
    }
}

private final class ReplayOne<Element> : ReplayBufferBase<Element> {
    private var _value: Element?
    
    override init() {
        super.init()
    }
    
    override func trim() {
        
    }
    
    override func addValueToBuffer(_ value: Element) {
        self._value = value
    }

    override func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        if let value = self._value {
            observer.on(.next(value))
        }
    }

    override func _synchronized_dispose() {
        super._synchronized_dispose()
        self._value = nil
    }
}

private class ReplayManyBase<Element>: ReplayBufferBase<Element> {
    fileprivate var _queue: Queue<Element>
    
    init(queueSize: Int) {
        self._queue = Queue(capacity: queueSize + 1)
    }
    
    override func addValueToBuffer(_ value: Element) {
        self._queue.enqueue(value)
    }

    override func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        for item in self._queue {
            observer.on(.next(item))
        }
    }

    override func _synchronized_dispose() {
        super._synchronized_dispose()
        self._queue = Queue(capacity: 0)
    }
}

private final class ReplayMany<Element> : ReplayManyBase<Element> {
    private let _bufferSize: Int
    
    init(bufferSize: Int) {
        self._bufferSize = bufferSize
        
        super.init(queueSize: bufferSize)
    }
    
    override func trim() {
        while self._queue.count > self._bufferSize {
            _ = self._queue.dequeue()
        }
    }
}

private final class ReplayAll<Element> : ReplayManyBase<Element> {
    init() {
        super.init(queueSize: 0)
    }
    
    override func trim() {
        
    }
}

 

이번에도 역시 실제 예제 소스를 살펴보겠습니다.

 

let subject = ReplaySubject<Int>.create(bufferSize: 3)

subject.onNext(1)
subject.onNext(2)
subject.onNext(3)
subject.onNext(4)
subject.onNext(5)
subject.onNext(6)

let subjectOne = subject.subscribe(onNext: { (value) in
    print(value)
})

subject.onNext(7)
subject.onNext(8)
subject.onNext(9)

// 4
// 5
// 6
// 7
// 8
// 9

 

 

 


 

 

 

Relay

 

앞서서 우리는 Subject에 대해서 종류 및 개념에 대해서 살펴보았습니다.

Subject는 ReSwift 내의 클래스로 제공되는 기능인 반면 RxCocoa의 클래스로서 제공되는 Relay라는 객체가 있습니다.

 

Relay 객체의 큰 특징을 살펴보면 Subject는 .completed, .error의 이벤트가 발생하면 subscribe가 종료되는 반면 Relay 객체는 .completed, .error를 발생하지 않고 Dispose되기 전까지 계속 작동하기 때문에 UI 등에서 사용하기 적절합니다.

 

Relay는 Subject와 다르게 onNext(_:)가 아닌 accept(_:)를 통해 새로운 이벤트를 전달합니다.

또한 subscribe 하고 싶을 때는 asObservable을 사용합니다.

 

이러한 Relay에는 아래와 같이 두 가지 형태의 객체를 포함하고 있습니다.

 

  • PublishRelay : PublishSubject의 Wrapper 클래스
  • BehaviorRelay : BehaviorSubject의 Wrapper 클래스로서 variable 대체 클래스로도 사용 가능

 

그렇다면 PublishRelay와 BehaviorRelay에 대해서도 한번 살펴보겠습니다.

 

 

 

 


 

 

 

PublishRelay

 

PublishRelay는 PublishSubject의 Wrapper 클래스입니다.
PublishSubject의 특성처럼 구독 이후의 발생하는 이벤트들을 모두 방출합니다.

 

실제 PublishSubject의 소스를 살펴보면 아래와 같이 PublishSubject를 멤버로 가지고 있는 것을 볼 수 있습니다. 또한 이벤트 종류로는 

accept만 존재하면 해당 메소드에서 onNext 이벤트를 방출됨을 알 수 있습니다.

 

public final class PublishRelay<Element>: ObservableType {
    private let _subject: PublishSubject<Element>
    
    // Accepts `event` and emits it to subscribers
    public func accept(_ event: Element) {
        self._subject.onNext(event)
    }
    
    /// Initializes with internal empty subject.
    public init() {
        self._subject = PublishSubject()
    }

    /// Subscribes observer
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        return self._subject.subscribe(observer)
    }
    
    /// - returns: Canonical interface for push style sequence
    public func asObservable() -> Observable<Element> {
        return self._subject.asObservable()
    }
}

 

실제 사용 예제는 아래와 같습니다.

let publishRelay = PublishRelay<Int>()
publishRelay.subscribe { print($0) }
publishRelay.accept(5) 

// next(5)

 

 

 


 

 

 

BehaviorRelay

 

BehaviorSubject를 wrapping 해서 가지고 있는 객체로서 .value를 사용해 현재의 값을 꺼낼 수 있습니다.(읽기 전용)

value를 변경하기 위해서 .accept()를 사용합니다

 

BehaviorRelay의 구현 소스를 보면 아래와 같습니다.

public final class BehaviorRelay<Element>: ObservableType {
    private let _subject: BehaviorSubject<Element>

    /// Accepts `event` and emits it to subscribers
    public func accept(_ event: Element) {
        self._subject.onNext(event)
    }

    /// Current value of behavior subject
    public var value: Element {
        // this try! is ok because subject can't error out or be disposed
        return try! self._subject.value()
    }

    /// Initializes behavior relay with initial value.
    public init(value: Element) {
        self._subject = BehaviorSubject(value: value)
    }

    /// Subscribes observer
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        return self._subject.subscribe(observer)
    }

    /// - returns: Canonical interface for push style sequence
    public func asObservable() -> Observable<Element> {
        return self._subject.asObservable()
    }
}

 

BehaviorRelay 사용 예제 소스는 아래와 같습니다.

let behaviorRelay = BehaviorRelay(value: 5)
behaviorRelay.accept(6)
behaviorRelay.subscribe { print($0) } 
behaviorRelay.accept(4) 
print(behaviorRelay.value) // 4 (읽기전용)

// next(6)
// next(4)
// 4

 

 

 


 

 

 

Hot Observable vs Cold Observable

 

본 강의 처음부분에서 잠깐 언급하였덧 Hot Observable과 Cold Observable에 대해서 간략하게 살펴보겠습니다.

 

Observable은 두가지 종류로 구분될 수 있습니다. Hot Observable과 Cold Observable이 그것입니다. 이 두가지는 같은 Observable이지만 그 특성은 다릅니다.

 

서로 다른 특성을 한번 살펴보면 아래와 같습니다.

 

 

Hot Observable

 

  • 생성과 동시에 이벤트를 방출합니다.
  • subscribe 되는 시점과 상관없이 옵저버들에게 이벤트를 중간부터 방출합니다.
  • Hot Observable은 구독 여부에 상관 없이 이벤트를 발생시키기 때문에 일단 동작하기 시작하면 리소스를 사용하게 됩니다. 또한 이 동작을 시작하는 시점을 조절할 수 있는 메소드를 제공합니다. 
  • Hot Observable은 여러 Observer가 하나의 Observable을 공유할 수 있습니다.
  • RxSwift에서는 다른 말로 Connectable Observable 이라고 부릅니다.
  • 또한 Cold Observable을 Hot Observable로 바꿔주는 연산자들도 제공합니다.

 

Cold Observable

 

  • 옵저버가 subscribe 되는 시점부터 이벤트를 생성하고 방출합니다.
  • 즉 이벤트는 구독을 시작하면 발생하고 시퀀스(Sequence)가 처음부터 관찰을 할 수 있는 형태입니다.
  • Cold Observable은 구독과 동시에 무조건 동작하며, 그 이전에는 연산 자원을 소모하지 않습니다.
  • Cold Observable은 Observer마다 별도의 Observable 인스턴스를 가지게 됩니다. 
  • Hot Observable을 Cold Observable과 유사하게 취급할 수 있도록 ReplaySubject 같은 것도 제공합니다.
  • 예를 들면 실시간 스트리밍 방송이 아닌 지난 방송 VOD 처럼 구독자가 구독들 시작하면 해당 영상을 처음부터 끝까지 볼 수 있는 것을 말합니다.

 

 

 


 

 

 

이상으로 RxSwift의 Subject 및 Relay의 기본 개념 및 특징에 대해서 살펴보았습니다.

 

 

 

감사합니다.

 

 

 

 


[참고 자료(References)]

 

[1] RxSwift: ReactiveX for Swift : github.com/ReactiveX/RxSwiftgithub.com/ReactiveX/RxSwift/blob/master/Documentation/GettingStarted.md

[2] [RxSwift] Subject와 Relay - (3) : jinshine.github.io/2019/01/05/RxSwift/3.Subject란/

[3] RxSwift #2 — Observable 과 Subject (+ Relay) 알아보기 : medium.com/@ggaa96/rxswift-2-observable-subject-relay-8fcd9b01913d

[4] [RxSwift] Subject / Relay / Driver / Variable : eunjin3786.tistory.com/37

[5] [RxSwift/RxCocoa] Subject는 알겠는데, Relay는 뭐지? : rhammer.tistory.com/354

[6] RxSwift의 Subjects : usinuniverse.bitbucket.io/blog/subjects.html

[7] Rxswift기초 - Hot vs Cold Observable : jcsoohwancho.github.io/2019-10-20-RxSwift기초-Hot-vs-Cold-Observable/

[8] RxSwift, Hot & Cold Observable : brunch.co.kr/@tilltue/18

 

728x90
반응형
Comments