import Foundation public func deliverOn(queue: Queue)(signal: Signal) -> Signal { return Signal { subscriber in return signal.start(next: { next in queue.dispatch { subscriber.putNext(next) } }, error: { error in queue.dispatch { subscriber.putError(error) } }, completed: { queue.dispatch { subscriber.putCompletion() } }) } } public func deliverOnMainQueue(signal: Signal) -> Signal { return signal |> deliverOn(Queue.mainQueue()) } public func deliverOn(threadPool: ThreadPool)(signal: Signal) -> Signal { return Signal { subscriber in let queue = threadPool.nextQueue() return signal.start(next: { next in queue.addTask(ThreadPoolTask { state in if !state.cancelled { subscriber.putNext(next) } }) }, error: { error in queue.addTask(ThreadPoolTask { state in if !state.cancelled { subscriber.putError(error) } }) }, completed: { queue.addTask(ThreadPoolTask { state in if !state.cancelled { subscriber.putCompletion() } }) }) } } public func runOn(threadPool: ThreadPool)(signal: Signal) -> Signal { return Signal { subscriber in var cancelled = false let disposable = MetaDisposable() let task = ThreadPoolTask { state in if cancelled || state.cancelled { return } disposable.set(signal.start(next: { next in subscriber.putNext(next) }, error: { error in subscriber.putError(error) }, completed: { subscriber.putCompletion() })) } disposable.set(ActionDisposable { task.cancel() }) threadPool.addTask(task) return disposable } }