import Foundation private final class SignalQueueState : Disposable { var lock: OSSpinLock = 0 var executingSignal = false var terminated = false var disposable: Disposable = EmptyDisposable let currentDisposable = MetaDisposable() let subscriber: Subscriber var queuedSignals: [Signal] = [] let queueMode: Bool init(subscriber: Subscriber, queueMode: Bool) { self.subscriber = subscriber self.queueMode = queueMode } func beginWithDisposable(disposable: Disposable) { self.disposable = disposable } func enqueueSignal(signal: Signal) { var startSignal = false OSSpinLockLock(&self.lock) if self.queueMode && self.executingSignal { self.queuedSignals.append(signal) } else { self.executingSignal = true startSignal = true } OSSpinLockUnlock(&self.lock) if startSignal { let disposable = signal.start(next: { next in self.subscriber.putNext(next) }, error: { error in self.subscriber.putError(error) }, completed: { self.headCompleted() }) self.currentDisposable.set(disposable) } } func headCompleted() { var nextSignal: Signal! = nil var terminated = false OSSpinLockLock(&self.lock) self.executingSignal = false if self.queueMode { if self.queuedSignals.count != 0 { nextSignal = self.queuedSignals[0] self.queuedSignals.removeAtIndex(0) self.executingSignal = true } else { terminated = self.terminated } } else { terminated = self.terminated } OSSpinLockUnlock(&self.lock) if terminated { self.subscriber.putCompletion() } else if nextSignal != nil { let disposable = nextSignal.start(next: { next in self.subscriber.putNext(next) }, error: { error in self.subscriber.putError(error) }, completed: { self.headCompleted() }) } } func beginCompletion() { var executingSignal = false OSSpinLockLock(&self.lock) executingSignal = self.executingSignal self.terminated = true OSSpinLockUnlock(&self.lock) if !executingSignal { self.subscriber.putCompletion() } } func dispose() { self.currentDisposable.dispose() self.disposable.dispose() } } public func switchToLatest(signal: Signal, E>) -> Signal { return Signal { subscriber in let state = SignalQueueState(subscriber: subscriber, queueMode: false) state.beginWithDisposable(signal.start(next: { next in state.enqueueSignal(next) }, error: { error in subscriber.putError(error) }, completed: { state.beginCompletion() })) return state } } public func queue(signal: Signal, E>) -> Signal { return Signal { subscriber in let state = SignalQueueState(subscriber: subscriber, queueMode: true) state.beginWithDisposable(signal.start(next: { next in state.enqueueSignal(next) }, error: { error in subscriber.putError(error) }, completed: { state.beginCompletion() })) return state } } public func mapToSignal(f: T -> Signal)(signal: Signal) -> Signal { return signal |> map { f($0) } |> switchToLatest } public func mapToQueue(f: T -> Signal)(signal: Signal) -> Signal { return signal |> map { f($0) } |> queue } public func then(nextSignal: Signal)(signal: Signal) -> Signal { return Signal { subscriber in let disposable = DisposableSet() disposable.add(signal.start(next: { next in subscriber.putNext(next) }, error: { error in subscriber.putError(error) }, completed: { disposable.add(nextSignal.start(next: { next in subscriber.putNext(next) }, error: { error in subscriber.putError(error) }, completed: { subscriber.putCompletion() })) })) return disposable } }