jsdw_ios/Pods/RxSwift/RxSwift/Observables/Amb.swift

158 lines
4.8 KiB
Swift

//
// Amb.swift
// RxSwift
//
// Created by Krunoslav Zaher on 6/14/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
extension ObservableType {
/**
Propagates the observable sequence that reacts first.
- seealso: [amb operator on reactivex.io](http://reactivex.io/documentation/operators/amb.html)
- returns: An observable sequence that surfaces any of the given sequences, whichever reacted first.
*/
public static func amb<Sequence: Swift.Sequence>(_ sequence: Sequence) -> Observable<Element>
where Sequence.Element == Observable<Element> {
sequence.reduce(Observable<Sequence.Element.Element>.never()) { a, o in
a.amb(o.asObservable())
}
}
}
extension ObservableType {
/**
Propagates the observable sequence that reacts first.
- seealso: [amb operator on reactivex.io](http://reactivex.io/documentation/operators/amb.html)
- parameter right: Second observable sequence.
- returns: An observable sequence that surfaces either of the given sequences, whichever reacted first.
*/
public func amb<O2: ObservableType>
(_ right: O2)
-> Observable<Element> where O2.Element == Element {
Amb(left: self.asObservable(), right: right.asObservable())
}
}
private enum AmbState {
case neither
case left
case right
}
final private class AmbObserver<Observer: ObserverType>: ObserverType {
typealias Element = Observer.Element
typealias Parent = AmbSink<Observer>
typealias This = AmbObserver<Observer>
typealias Sink = (This, Event<Element>) -> Void
private let parent: Parent
fileprivate var sink: Sink
fileprivate var cancel: Disposable
init(parent: Parent, cancel: Disposable, sink: @escaping Sink) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self.parent = parent
self.sink = sink
self.cancel = cancel
}
func on(_ event: Event<Element>) {
self.sink(self, event)
if event.isStopEvent {
self.cancel.dispose()
}
}
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
}
final private class AmbSink<Observer: ObserverType>: Sink<Observer> {
typealias Element = Observer.Element
typealias Parent = Amb<Element>
typealias AmbObserverType = AmbObserver<Observer>
private let parent: Parent
private let lock = RecursiveLock()
// state
private var choice = AmbState.neither
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self.parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
let subscription1 = SingleAssignmentDisposable()
let subscription2 = SingleAssignmentDisposable()
let disposeAll = Disposables.create(subscription1, subscription2)
let forwardEvent = { (o: AmbObserverType, event: Event<Element>) -> Void in
self.forwardOn(event)
if event.isStopEvent {
self.dispose()
}
}
let decide = { (o: AmbObserverType, event: Event<Element>, me: AmbState, otherSubscription: Disposable) in
self.lock.performLocked {
if self.choice == .neither {
self.choice = me
o.sink = forwardEvent
o.cancel = disposeAll
otherSubscription.dispose()
}
if self.choice == me {
self.forwardOn(event)
if event.isStopEvent {
self.dispose()
}
}
}
}
let sink1 = AmbObserver(parent: self, cancel: subscription1) { o, e in
decide(o, e, .left, subscription2)
}
let sink2 = AmbObserver(parent: self, cancel: subscription1) { o, e in
decide(o, e, .right, subscription1)
}
subscription1.setDisposable(self.parent.left.subscribe(sink1))
subscription2.setDisposable(self.parent.right.subscribe(sink2))
return disposeAll
}
}
final private class Amb<Element>: Producer<Element> {
fileprivate let left: Observable<Element>
fileprivate let right: Observable<Element>
init(left: Observable<Element>, right: Observable<Element>) {
self.left = left
self.right = right
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AmbSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}