Tags :

Date :

CombineLatest

let a = PublishSubject<Int>()
let b = PublishSubject<String>()
Observable.combineLatest(a, b)

// 결과
a: 1          (b가 아직 없음  출력 없음)
b: "A"         (1, "A") 출력 시작

a: 2           (2, "A")
b: "B"         (2, "B")
  • 개념: 여려 Observable 중 하나라도 새로운 값이 나오면 각각의 최신값을 묶어서 방출한다.
  • 조건: 시작하려면 모든 Observable이 최소 1번은 방출해야한다.

CombineLatest 예시

// 로그인 버튼 활성화
Observable.combineLatest(idText, passwordText)
    .map { !$0.isEmpty && !$1.isEmpty }
  • 입력값이 바뀔 때마다 다시 계산해야 할 때 사용한다

withLatestFrom

let a = PublishSubject<Int>()
let b = PublishSubject<String>()
a.withLatestFrom(b)

// 결과
b: "A"
b: "B"        (아무  없음)

a: 1           "B"
a: 2           "B"
  • 개념: a가 emit할 때만 b의 최신값을 가져와서 방출한다
  • 조건:
    • b는 최소 1번은 값이 있어야 한다.
    • a가 emit할 때만 실행된다.
    • b가 emit해도 아무 일 없다.

withLatestFrom 예시

buttonTap
    .withLatestFrom(textFieldText)
  • 버튼을 눌렀을 때만 텍스트 값 필요할 때 적합하다

에러 처리

  • 개념: Rx에서 에러가 발생하면 스트림은 종료된다.
  • 에러를 어떻게 다루는지에 따라 여러 방법이 있다.

에러 예시1

// catch + empty(에러 무시)
input.reload
    .withUnretained(self)
    .flatMapLatest { owner, _ in
        owner.groupUsecase.loadAndFetchGroup()
            .catch { _ in .empty() }
    }
    .disposed(by: disposeBag)
  • 에러 발생하더라도 아무 값도 방출하지 않는다
  • 스트림 유지된다
  • UI는 에러를 모른다
  • 실패해도 조용히 무시할 경우 또는 재시도 트리거가 있을 경우 사용한다
  • 디버깅이 어렵다

에러 예시2

input.reload
    .withUnretained(self)
    .flatMapLatest { owner, _ in
        owner.groupUsecase.loadAndFetchGroup()
            .catchAndReturn(HCGroup.empty)
            // 위아래 같음
            .catch { _ in
              .just(HCGroup.empty)
          }
          .disposed(by: disposeBag)
    }
  • 에러발생시 기본값을 방출한다
  • 스트림 유지된다
  • UI는 계속 동작한다

에러 예시3

// vm
let errorRelay = PublishRelay<Error>()

input.reload
    .withUnretained(self)
    .flatMapLatest { owner, _ in
        owner.groupUsecase.loadAndFetchGroup()
            .do(onError: { owner.errorRelay.accept($0) })
            .catchAndReturn(HCGroup.empty)
    }
    .disposed(by: disposeBag)

// vc
viewModel.errorRelay
    .emit(onNext: { error in
        // Alert 또는 Toast 표시
    })
    .disposed(by: disposeBag)
  • 데이터 스트림은 유지
  • 에러는 따로 UI로 전달
  • Alert, Toast 표시 가능

에러 예시4

input.reload
    .flatMapLatest { [weak self] _ in
        self?.groupUsecase.loadAndFetchGroup()
            .asDriver(onErrorDriveWith: .empty())
        ?? .empty()
    }
    .disposed(by: disposeBag)
  • UI 바인딩이면 아예 Driver로 변환하는 것이 안전하다.
  • Main Thread 보장
  • error 방출하지 않음
  • share된다(중복 subscribe 방지)

materialize

// Event<Element> 타입 종류
.next(Element)
.error(Error)
.completed

// 이 스트림은 HCGroup이 아니라 Event<HCGroup>을 방출한다.
input.reload
    .withUnretained(self)
    .flatMapLatest { owner, _ in
        owner.groupUsecase.loadAndFetchGroup()
            .materialize()
    }
    .share()
    .disposed(by: disposeBag)

let success = result
    .compactMap { $0.element }

let error = result
    .compactMap { $0.error }
  • 에러와 완료 이벤트를 값(Event)으로 바꿔서 방출하는 연산자.
  • 원래 Rx는 .next, .error, .completed 3가지 이벤트가 있으나 .error 발생 시 스트림이 종료된다.
    materialize를 쓰면 에러를 종료시키지 않고 값처럼 다룰 수 있다.
  • 장점
    • 스트림이 종료되지 않고 성공과 실패를 동시에 처리 가능하다
    • 에러와 데이터를 분리 가능하다
    • MVVM에서 매우 깔끔한 구조 가능

예시

let result = input.reload
    .flatMapLatest { [weak self] _ in
        self?.groupUsecase.loadAndFetchGroup()
            .materialize()
        ?? .empty()
    }
    .share()

result
    .compactMap { $0.element }
    .bind(to: postsRelay)
    .disposed(by: disposeBag)

result
    .compactMap { $0.error }
    .bind(to: errorRelay)
    .disposed(by: disposeBag)

share

// replay: 0 → 이전 값 저장 안 함
// whileConnected → 구독자가 있을 때만 유지
share() = .share(replay: 0, scope: .whileConnected)

// 최근 값 1개 저장
// 늦게 구독해도 마지막 값 받음
.share(replay: 1)

let request = api.loadData()
request.subscribe()
request.subscribe()
// 네트워크 요청이 2번 실행됨

let request = api.loadData()
    .share()
request.subscribe()
request.subscribe()
// 네트워크 요청 1번만 실행되고 두 구독자가 공유

  • 개념: 하나의 Observable 실행 결과를 여러 subscriber가 공유하도록 만드는 연산자
  • 설명: Rx는 기본적으로 Cold Observable이기 때문에 두번 subscribe하면 네트워크 요청이 2번 실행된다.
  • share(replay:)는 구독자가 1명 이상 존재하는 동안 처음 실행된 결과를 재사용한다
  • 모든 구독자가 사라지면 다음 구독 시 다시 실행된다.

share 예시1

let request = api.load()
    .share(replay: 1)

request.subscribe()  // A
request.subscribe()  // B
request.subscribe()  // C
// 네트워크 1번 요청된다.
// 그 결과를 A, B, C가 같이 나누어 쓴다.

모두 dispose된  다시 구독
let disposable1 = request.subscribe()
disposable1.dispose()   // 구독자 0명 됨
request.subscribe()
// 네트워크 1번 다시 실행된다

map

Observable.of(1, 2, 3)
    .map { $0 * 10 }

Observable.of("1", "2", "3")
  .map { Int($0) }
  • 들어온 값을 다른 값으로 변환하는 연산자

compactMap

Observable.of("1", "2", "three", "4")
    .compactMap { Int($0) }
// 1, 2, 4
  • swift의 compactMap과 동일
  • nil 제거하고 값만 망출

예시

let result: Observable<Event<String>> = buttonTap
    .flatMapLatest { _ in
        apiCall()               // Observable<String>
            .materialize()      // → Observable<Event<String>>
    }
    .share()

func apiCall() -> Observable<String> {
    let success = Bool.random()

    if success {
        return Observable.just("Success Data")
    } else {
        return Observable.error(NSError(domain: "Network", code: -1))
    }
}

// 성공 값만 가져오기
result
    .compactMap { $0.element }

// 에러만 가져오기
result
    .compactMap { $0.error }

flatMap

Observable.of(1, 2, 3)
    .flatMap { value in
        Observable.just(value * 10)
    }
// 10 20 30

buttonTap
    .flatMap { _ in
        api.loadData()   // 네트워크 요청
    }
// 버튼 3번 누르면 네트워크 요청 3개 동시 실행됨 -> 의도에 따라 flatMapLatest, flatMapFirst 사용가능
  • 개념: 각 이벤트를 새로운 Observable로 바꾼뒤 모두 동시에 merge해서 방출
  • 들어온 이벤트마다 새로운 비동기 작업을 만들고 그 결과를 전부 흘려보낸다.

flatMapFirst

A 시작
B 발생  무시
C 발생  무시
A 완료
  • 개념: 작업이 진행 중이면 새로운 이벤트는 무시
  • 중복 요청 방지, 버튼 연타 방지, 한번만 처리하는 작업
  • 장점: 동시 실행 방지
  • 단점: 후속 이벤트가 사라짐

flatMapLatest

A 시작
B 발생  A 취소
C 발생  B 취소
 C만 실행됨
  • 개념: 새로운 이벤트가 오면 이전 작업을 취소하고 최신 작업만 유지한다
  • 검색 자동완성, 텍스트 입력 API, 최신 결과만 의미 있는 경우
  • 장점: 불필요한 네트워크 낭비 방지
  • 단점: 중간 작업 취소됨

concatMap

A 시작
B 대기
C 대기

A 완료
B 실행
B 완료
C 실행
  • 개념: 이벤트를 순서대로 큐에 쌓아 처리
  • 삭제, 서버 상태 변경, 순서가 중요한 요청
  • 장점: 안정적, 작업 누락 없음, 서버 상태 불일치 방지
  • 단점: 느릴 수 있음