jsdw_ios/QuickLocation/RxSwift/Action.swift

138 lines
4.7 KiB
Swift

import Foundation
import RxSwift
import RxCocoa
/// Typealias for compatibility with UIButton's rx.action property.
public typealias CocoaAction = Action<Void, Void>
/// Typealias for actions with work factory returns `Completable`.
public typealias CompletableAction<Input> = Action<Input, Never>
/// Possible errors from invoking execute()
public enum ActionError: Error {
case notEnabled
case underlyingError(Error)
}
/**
Represents a value that accepts a workFactory which takes some Observable<Input> as its input
and produces an Observable<Element> as its output.
When this excuted via execute() or inputs subject, it passes its parameter to this closure and subscribes to the work.
*/
public final class Action<Input, Element> {
public typealias WorkFactory = (Input) -> Observable<Element>
public let _enabledIf: Observable<Bool>
public let workFactory: WorkFactory
/// Inputs that triggers execution of action.
/// This subject also includes inputs as aguments of execute().
/// All inputs are always appear in this subject even if the action is not enabled.
/// Thus, inputs count equals elements count + errors count.
public let inputs = InputSubject<Input>()
/// Errors aggrevated from invocations of execute().
/// Delivered on whatever scheduler they were sent from.
public let errors: Observable<ActionError>
/// Whether or not we're currently executing.
/// Delivered on whatever scheduler they were sent from.
public let elements: Observable<Element>
/// Whether or not we're currently executing.
public let executing: Observable<Bool>
/// Observables returned by the workFactory.
/// Useful for sending results back from work being completed
/// e.g. response from a network call.
public let executionObservables: Observable<Observable<Element>>
/// Whether or not we're enabled. Note that this is a *computed* sequence
/// property based on enabledIf initializer and if we're currently executing.
/// Always observed on MainScheduler.
public let enabled: Observable<Bool>
private let disposeBag = DisposeBag()
public convenience init<O: ObservableConvertibleType>(
enabledIf: Observable<Bool> = Observable.just(true),
workFactory: @escaping (Input) -> O
) where O.Element == Element {
self.init(enabledIf: enabledIf) {
workFactory($0).asObservable()
}
}
public init(
enabledIf: Observable<Bool> = Observable.just(true),
workFactory: @escaping WorkFactory) {
self._enabledIf = enabledIf
self.workFactory = workFactory
let enabledSubject = BehaviorSubject<Bool>(value: false)
enabled = enabledSubject.asObservable()
let errorsSubject = PublishSubject<ActionError>()
errors = errorsSubject.asObservable()
executionObservables = inputs
.withLatestFrom(enabled) { input, enabled in (input, enabled) }
.flatMap { input, enabled -> Observable<Observable<Element>> in
if enabled {
return Observable.of(workFactory(input)
.do(onError: { errorsSubject.onNext(.underlyingError($0)) })
.share(replay: 1, scope: .forever))
} else {
errorsSubject.onNext(.notEnabled)
return Observable.empty()
}
}
.share()
elements = executionObservables
.flatMap { $0.catch { _ in Observable.empty() } }
executing = executionObservables.flatMap {
execution -> Observable<Bool> in
let execution = execution
.flatMap { _ in Observable<Bool>.empty() }
.catch { _ in Observable.empty() }
return Observable.concat([Observable.just(true),
execution,
Observable.just(false)])
}
.startWith(false)
.share(replay: 1, scope: .forever)
Observable
.combineLatest(executing, enabledIf) { !$0 && $1 }
.bind(to: enabledSubject)
.disposed(by: disposeBag)
}
@discardableResult
public func execute(_ value: Input) -> Observable<Element> {
defer {
inputs.onNext(value)
}
let subject = ReplaySubject<Element>.createUnbounded()
let work = executionObservables
.map { $0.catch { throw ActionError.underlyingError($0) } }
let error = errors
.map { Observable<Element>.error($0) }
work.amb(error)
.take(1)
.flatMap { $0 }
.subscribe(subject)
.disposed(by: disposeBag)
return subject.asObservable()
}
}