no message

This commit is contained in:
Peter
2016-06-19 15:19:00 +03:00
parent 586d105aad
commit bb9062c684
27 changed files with 653 additions and 640 deletions

View File

@@ -67,7 +67,7 @@
D0085B531B282BEE00EAF753 /* ThreadPool.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0085B3F1B282BEE00EAF753 /* ThreadPool.swift */; };
D0085B541B282BEE00EAF753 /* Timer.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0085B401B282BEE00EAF753 /* Timer.swift */; };
D0085B551B282BEE00EAF753 /* Queue.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0085B411B282BEE00EAF753 /* Queue.swift */; };
D0085B561B282BEE00EAF753 /* Pipe.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0085B421B282BEE00EAF753 /* Pipe.swift */; };
D0085B561B282BEE00EAF753 /* ValuePipe.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0085B421B282BEE00EAF753 /* ValuePipe.swift */; };
D0085B571B282BEE00EAF753 /* Bag.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0085B431B282BEE00EAF753 /* Bag.swift */; };
D0085B581B282BEE00EAF753 /* Signal_Take.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0085B441B282BEE00EAF753 /* Signal_Take.swift */; };
D0085B591B282BEE00EAF753 /* Signal_Catch.swift in Sources */ = {isa = PBXBuildFile; fileRef = D0085B451B282BEE00EAF753 /* Signal_Catch.swift */; };
@@ -180,7 +180,7 @@
D0085B3F1B282BEE00EAF753 /* ThreadPool.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ThreadPool.swift; sourceTree = "<group>"; };
D0085B401B282BEE00EAF753 /* Timer.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Timer.swift; sourceTree = "<group>"; };
D0085B411B282BEE00EAF753 /* Queue.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Queue.swift; sourceTree = "<group>"; };
D0085B421B282BEE00EAF753 /* Pipe.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Pipe.swift; sourceTree = "<group>"; };
D0085B421B282BEE00EAF753 /* ValuePipe.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValuePipe.swift; sourceTree = "<group>"; };
D0085B431B282BEE00EAF753 /* Bag.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Bag.swift; sourceTree = "<group>"; };
D0085B441B282BEE00EAF753 /* Signal_Take.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Signal_Take.swift; sourceTree = "<group>"; };
D0085B451B282BEE00EAF753 /* Signal_Catch.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Signal_Catch.swift; sourceTree = "<group>"; };
@@ -259,7 +259,7 @@
D0085B3F1B282BEE00EAF753 /* ThreadPool.swift */,
D0085B401B282BEE00EAF753 /* Timer.swift */,
D0085B411B282BEE00EAF753 /* Queue.swift */,
D0085B421B282BEE00EAF753 /* Pipe.swift */,
D0085B421B282BEE00EAF753 /* ValuePipe.swift */,
D0085B431B282BEE00EAF753 /* Bag.swift */,
D0085B441B282BEE00EAF753 /* Signal_Take.swift */,
D0085B451B282BEE00EAF753 /* Signal_Catch.swift */,
@@ -553,7 +553,7 @@
attributes = {
LastSwiftMigration = 0700;
LastSwiftUpdateCheck = 0700;
LastUpgradeCheck = 0710;
LastUpgradeCheck = 0800;
ORGANIZATIONNAME = Telegram;
TargetAttributes = {
D0085B211B282B9800EAF753 = {
@@ -627,7 +627,7 @@
buildActionMask = 2147483647;
files = (
D0085B521B282BEE00EAF753 /* Signal_Dispatch.swift in Sources */,
D0085B561B282BEE00EAF753 /* Pipe.swift in Sources */,
D0085B561B282BEE00EAF753 /* ValuePipe.swift in Sources */,
D0085B551B282BEE00EAF753 /* Queue.swift in Sources */,
D0085B591B282BEE00EAF753 /* Signal_Catch.swift in Sources */,
D0085B601B282BEE00EAF753 /* Disposable.swift in Sources */,
@@ -807,6 +807,7 @@
LD_RUNPATH_SEARCH_PATHS = "$(inherited) @executable_path/Frameworks @loader_path/Frameworks";
PRODUCT_BUNDLE_IDENTIFIER = "org.telegram.$(PRODUCT_NAME:rfc1034identifier)";
PRODUCT_NAME = "$(TARGET_NAME)";
SWIFT_OPTIMIZATION_LEVEL = "-Owholemodule";
};
name = Release;
};
@@ -835,6 +836,7 @@
ENABLE_TESTABILITY = YES;
GCC_C_LANGUAGE_STANDARD = gnu99;
GCC_DYNAMIC_NO_PIC = NO;
GCC_NO_COMMON_BLOCKS = YES;
GCC_OPTIMIZATION_LEVEL = 0;
GCC_PREPROCESSOR_DEFINITIONS = (
"DEBUG=1",
@@ -881,6 +883,7 @@
ENABLE_NS_ASSERTIONS = NO;
ENABLE_STRICT_OBJC_MSGSEND = YES;
GCC_C_LANGUAGE_STANDARD = gnu99;
GCC_NO_COMMON_BLOCKS = YES;
GCC_WARN_64_TO_32_BIT_CONVERSION = YES;
GCC_WARN_ABOUT_RETURN_TYPE = YES_ERROR;
GCC_WARN_UNDECLARED_SELECTOR = YES;
@@ -1004,6 +1007,7 @@
ENABLE_NS_ASSERTIONS = NO;
ENABLE_STRICT_OBJC_MSGSEND = YES;
GCC_C_LANGUAGE_STANDARD = gnu99;
GCC_NO_COMMON_BLOCKS = YES;
GCC_WARN_64_TO_32_BIT_CONVERSION = YES;
GCC_WARN_ABOUT_RETURN_TYPE = YES_ERROR;
GCC_WARN_UNDECLARED_SELECTOR = YES;

View File

@@ -8,7 +8,7 @@ public final class Atomic<T> {
self.value = value
}
public func with<R>(f: T -> R) -> R {
public func with<R>(_ f: (T) -> R) -> R {
OSSpinLockLock(&self.lock)
let result = f(self.value)
OSSpinLockUnlock(&self.lock)
@@ -16,7 +16,7 @@ public final class Atomic<T> {
return result
}
public func modify(f: T -> T) -> T {
public func modify(_ f: (T) -> T) -> T {
OSSpinLockLock(&self.lock)
let result = f(self.value)
self.value = result
@@ -25,7 +25,7 @@ public final class Atomic<T> {
return result
}
public func swap(value: T) -> T {
public func swap(_ value: T) -> T {
OSSpinLockLock(&self.lock)
let previous = self.value
self.value = value

View File

@@ -9,7 +9,7 @@ public final class Bag<T> {
public init() {
}
public func add(item: T) -> Index {
public func add(_ item: T) -> Index {
let key = self.nextIndex
self.nextIndex += 1
self.items.append(item)
@@ -18,7 +18,7 @@ public final class Bag<T> {
return key
}
public func get(index: Index) -> T? {
public func get(_ index: Index) -> T? {
var i = 0
for key in self.itemKeys {
if key == index {
@@ -29,12 +29,12 @@ public final class Bag<T> {
return nil
}
public func remove(index: Index) {
public func remove(_ index: Index) {
var i = 0
for key in self.itemKeys {
if key == index {
self.items.removeAtIndex(i)
self.itemKeys.removeAtIndex(i)
self.items.remove(at: i)
self.itemKeys.remove(at: i)
break
}
i += 1

View File

@@ -35,7 +35,7 @@ public final class MetaDisposable : Disposable {
public init() {
}
public func set(disposable: Disposable?) {
public func set(_ disposable: Disposable?) {
var previousDisposable: Disposable! = nil
var disposeImmediately = false
@@ -89,7 +89,7 @@ public final class DisposableSet : Disposable {
}
public func add(disposable: Disposable) {
public func add(_ disposable: Disposable) {
var disposeImmediately = false
OSSpinLockLock(&self.lock)

View File

@@ -11,7 +11,7 @@ public final class Lock {
pthread_mutex_destroy(&self.mutex)
}
public func locked(@noescape f: () -> ()) {
public func locked(_ f: @noescape() -> ()) {
pthread_mutex_lock(&self.mutex)
f()
pthread_mutex_unlock(&self.mutex)

View File

@@ -2,7 +2,7 @@ import Foundation
private final class MulticastInstance<T> {
let disposable: Disposable
var subscribers = Bag<T -> Void>()
var subscribers = Bag<(T) -> Void>()
var lock = Lock()
init(disposable: Disposable) {
@@ -31,7 +31,7 @@ public final class Multicast<T> {
}
}
var index: Bag<T -> Void>.Index!
var index: Bag<(T) -> Void>.Index!
instance.lock.locked {
index = instance.subscribers.add({ next in
subscriber.putNext(next)
@@ -40,7 +40,7 @@ public final class Multicast<T> {
if let beginDisposable = beginDisposable {
beginDisposable.set(signal.start(next: { next in
var subscribers: [T -> Void]!
var subscribers: [(T) -> Void]!
instance.lock.locked {
subscribers = instance.subscribers.copyItems()
}
@@ -49,11 +49,11 @@ public final class Multicast<T> {
}
}, error: { _ in
self.lock.locked {
self.instances.removeValueForKey(key)
let _ = self.instances.removeValue(forKey: key)
}
}, completed: {
self.lock.locked {
self.instances.removeValueForKey(key)
self.instances.removeValue(forKey: key)
}
}))
}
@@ -69,7 +69,7 @@ public final class Multicast<T> {
if remove {
self.lock.locked {
self.instances.removeValueForKey(key)
let _ = self.instances.removeValue(forKey: key)
}
}
}
@@ -78,7 +78,7 @@ public final class Multicast<T> {
}
public final class MulticastPromise<T> {
public let subscribers = Bag<T -> Void>()
public let subscribers = Bag<(T) -> Void>()
public let lock = Lock()
public var value: T?

View File

@@ -4,7 +4,7 @@ public class Promise<T> {
private var value: T?
private var lock: OSSpinLock = 0
private let disposable = MetaDisposable()
private let subscribers = Bag<T -> Void>()
private let subscribers = Bag<(T) -> Void>()
public init(_ value: T) {
self.value = value
@@ -17,7 +17,7 @@ public class Promise<T> {
self.disposable.dispose()
}
public func set(signal: Signal<T, NoError>) {
public func set(_ signal: Signal<T, NoError>) {
OSSpinLockLock(&self.lock)
self.value = nil
OSSpinLockUnlock(&self.lock)

View File

@@ -1,16 +1,15 @@
import Foundation
private let _QueueSpecificKey = NSObject()
private let QueueSpecificKey: UnsafePointer<Void> = UnsafePointer<Void>(Unmanaged<AnyObject>.passUnretained(_QueueSpecificKey).toOpaque())
private let QueueSpecificKey = DispatchSpecificKey<NSObject>()
private let globalMainQueue = Queue(queue: dispatch_get_main_queue(), specialIsMainQueue: true)
private let globalMainQueue = Queue(queue: DispatchQueue.main, specialIsMainQueue: true)
public final class Queue {
private let nativeQueue: dispatch_queue_t
private var specific: UnsafeMutablePointer<Void>
private let nativeQueue: DispatchQueue
private var specific = NSObject()
private let specialIsMainQueue: Bool
public var queue: dispatch_queue_t {
public var queue: DispatchQueue {
get {
return self.nativeQueue
}
@@ -21,102 +20,63 @@ public final class Queue {
}
public class func concurrentDefaultQueue() -> Queue {
return Queue(queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), specialIsMainQueue: false)
return Queue(queue: DispatchQueue.global(attributes: [.qosDefault]), specialIsMainQueue: false)
}
public class func concurrentBackgroundQueue() -> Queue {
return Queue(queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), specialIsMainQueue: false)
return Queue(queue: DispatchQueue.global(attributes: [.qosBackground]), specialIsMainQueue: false)
}
public init(queue: dispatch_queue_t) {
public init(queue: DispatchQueue) {
self.nativeQueue = queue
self.specific = nil
self.specialIsMainQueue = false
}
private init(queue: dispatch_queue_t, specialIsMainQueue: Bool) {
private init(queue: DispatchQueue, specialIsMainQueue: Bool) {
self.nativeQueue = queue
self.specific = nil
self.specialIsMainQueue = specialIsMainQueue
}
public init(name: String? = nil) {
if let name = name {
self.nativeQueue = dispatch_queue_create(name, DISPATCH_QUEUE_SERIAL)
} else {
self.nativeQueue = dispatch_queue_create(nil, DISPATCH_QUEUE_SERIAL)
}
self.specific = nil
self.nativeQueue = DispatchQueue(label: name ?? "", attributes: [.serial], target: nil)
self.specialIsMainQueue = false
self.specific = UnsafeMutablePointer<Void>(Unmanaged<Queue>.passUnretained(self).toOpaque())
dispatch_queue_set_specific(self.nativeQueue, QueueSpecificKey, self.specific, nil)
dispatch_set_target_queue(self.nativeQueue, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0))
self.nativeQueue.setSpecific(key: QueueSpecificKey, value: self.specific)
}
func isCurrent() -> Bool {
if self.specific != nil && dispatch_get_specific(QueueSpecificKey) == self.specific {
if DispatchQueue.getSpecific(key: QueueSpecificKey) === self.specific {
return true
} else if self.specialIsMainQueue && NSThread.isMainThread() {
} else if self.specialIsMainQueue && Thread.isMainThread() {
return true
} else {
return false
}
}
public func async(f: Void -> Void) {
if self.specific != nil && dispatch_get_specific(QueueSpecificKey) == self.specific {
f()
} else if self.specialIsMainQueue && NSThread.isMainThread() {
public func async(_ f: (Void) -> Void) {
if self.isCurrent() {
f()
} else {
dispatch_async(self.nativeQueue, f)
self.nativeQueue.async(execute: f)
}
}
public func sync(f: Void -> Void) {
if self.specific != nil && dispatch_get_specific(QueueSpecificKey) == self.specific {
f()
} else if self.specialIsMainQueue && NSThread.isMainThread() {
public func sync(_ f: (Void) -> Void) {
if self.isCurrent() {
f()
} else {
dispatch_sync(self.nativeQueue, f)
self.nativeQueue.sync(execute: f)
}
}
public func dispatch(f: Void -> Void) {
if self.specific != nil && dispatch_get_specific(QueueSpecificKey) == self.specific {
f()
} else if self.specialIsMainQueue && NSThread.isMainThread() {
f()
} else {
dispatch_async(self.nativeQueue, f)
}
public func justDispatch(_ f: (Void) -> Void) {
self.nativeQueue.async(execute: f)
}
public func justDispatch(f: Void -> Void) {
dispatch_async(self.nativeQueue, f)
}
public func dispatchWithHighQoS(f: () -> Void) {
let block = dispatch_block_create_with_qos_class(DISPATCH_BLOCK_ENFORCE_QOS_CLASS, QOS_CLASS_USER_INTERACTIVE, 0, {
f()
})
dispatch_async(self.nativeQueue, block)
}
public func dispatchTiming(f: Void -> Void, _ file: String = #file, _ line: Int = #line) {
self.justDispatch {
let startTime = CFAbsoluteTimeGetCurrent()
f()
let delta = CFAbsoluteTimeGetCurrent() - startTime
if delta > 0.002 {
print("dispatchTiming \(delta * 1000.0) ms \(file):\(line)")
}
}
}
public func after(delay: Double, _ f: Void -> Void) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, Int64(delay * Double(NSEC_PER_SEC))), self.queue, f)
public func after(_ delay: Double, _ f: (Void) -> Void) {
let time: DispatchTime = DispatchTime.now() + Double(Int64(delay * Double(NSEC_PER_SEC)))
self.nativeQueue.after(when: time, execute: f)
}
}

View File

@@ -10,7 +10,7 @@ public func identity<A>(a: A) -> A {
infix operator |> { associativity left precedence 95 }
public func |> <T, U>(value: T, function: (T -> U)) -> U {
public func |> <T, U>(value: T, function: ((T) -> U)) -> U {
return function(value)
}
@@ -30,20 +30,20 @@ private final class SubscriberDisposable<T, E> : Disposable {
}
public struct Signal<T, E> {
private let generator: Subscriber<T, E> -> Disposable
private let generator: (Subscriber<T, E>) -> Disposable
public init(_ generator: Subscriber<T, E> -> Disposable) {
public init(_ generator: (Subscriber<T, E>) -> Disposable) {
self.generator = generator
}
public func start(next next: (T -> Void)! = nil, error: (E -> Void)! = nil, completed: (() -> Void)! = nil) -> Disposable {
public func start(next: ((T) -> Void)! = nil, error: ((E) -> Void)! = nil, completed: (() -> Void)! = nil) -> Disposable {
let subscriber = Subscriber<T, E>(next: next, error: error, completed: completed)
let disposable = self.generator(subscriber)
subscriber.assignDisposable(disposable)
return SubscriberDisposable(subscriber: subscriber, disposable: disposable)
}
public static func single(value: T) -> Signal<T, E> {
public static func single(_ value: T) -> Signal<T, E> {
return Signal<T, E> { subscriber in
subscriber.putNext(value)
subscriber.putCompletion()
@@ -60,7 +60,7 @@ public struct Signal<T, E> {
}
}
public static func fail(error: E) -> Signal<T, E> {
public static func fail(_ error: E) -> Signal<T, E> {
return Signal<T, E> { subscriber in
subscriber.putError(error)

View File

@@ -1,36 +1,38 @@
import Foundation
public func `catch`<T, E, R>(f: E -> Signal<T, R>)(signal: Signal<T, E>) -> Signal<T, R> {
return Signal<T, R> { subscriber in
let disposable = DisposableSet()
disposable.add(signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
let anotherSignal = f(error)
public func `catch`<T, E, R>(_ f: (E) -> Signal<T, R>) -> (Signal<T, E>) -> Signal<T, R> {
return { signal in
return Signal<T, R> { subscriber in
let disposable = DisposableSet()
disposable.add(anotherSignal.start(next: { next in
disposable.add(signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
let anotherSignal = f(error)
disposable.add(anotherSignal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
}))
}, completed: {
subscriber.putCompletion()
}))
}, completed: {
subscriber.putCompletion()
}))
return disposable
return disposable
}
}
}
private func recursiveFunction(f: (Void -> Void) -> Void) -> (Void -> Void) {
private func recursiveFunction(_ f: ((Void) -> Void) -> Void) -> ((Void) -> Void) {
return {
f(recursiveFunction(f))
}
}
public func restart<T, E>(signal: Signal<T, E>) -> Signal<T, E> {
public func restart<T, E>(_ signal: Signal<T, E>) -> Signal<T, E> {
return Signal { subscriber in
let shouldRestart = Atomic(value: true)
let currentDisposable = MetaDisposable()
@@ -55,76 +57,80 @@ public func restart<T, E>(signal: Signal<T, E>) -> Signal<T, E> {
return ActionDisposable {
currentDisposable.dispose()
shouldRestart.swap(false)
let _ = shouldRestart.swap(false)
}
}
}
public func recurse<T, E>(latestValue: T?)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal { subscriber in
let shouldRestart = Atomic(value: true)
let currentDisposable = MetaDisposable()
let start = recursiveFunction { recurse in
let currentShouldRestart = shouldRestart.with { value in
return value
public func recurse<T, E>(_ latestValue: T?) -> (Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal { subscriber in
let shouldRestart = Atomic(value: true)
let currentDisposable = MetaDisposable()
let start = recursiveFunction { recurse in
let currentShouldRestart = shouldRestart.with { value in
return value
}
if currentShouldRestart {
let disposable = signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
recurse()
})
currentDisposable.set(disposable)
}
}
if currentShouldRestart {
let disposable = signal.start(next: { next in
subscriber.putNext(next)
start()
return ActionDisposable {
currentDisposable.dispose()
let _ = shouldRestart.swap(false)
}
}
}
}
public func retry<T, E>(_ delayIncrement: Double, maxDelay: Double, onQueue queue: Queue) -> (signal: Signal<T, E>) -> Signal<T, NoError> {
return { signal in
return Signal { subscriber in
let shouldRetry = Atomic(value: true)
let currentDelay = Atomic(value: 0.0)
let currentDisposable = MetaDisposable()
let start = recursiveFunction { recurse in
let currentShouldRetry = shouldRetry.with { value in
return value
}
if currentShouldRetry {
let disposable = signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
let delay = currentDelay.modify { value in
return min(maxDelay, value + delayIncrement)
}
let time: DispatchTime = DispatchTime.now() + Double(Int64(delay * Double(NSEC_PER_SEC)))
queue.queue.after(when: time, execute: {
recurse()
})
}, completed: {
recurse()
})
currentDisposable.set(disposable)
let _ = shouldRetry.swap(false)
subscriber.putCompletion()
})
currentDisposable.set(disposable)
}
}
start()
return ActionDisposable {
currentDisposable.dispose()
let _ = shouldRetry.swap(false)
}
}
start()
return ActionDisposable {
currentDisposable.dispose()
shouldRestart.swap(false)
}
}
}
public func retry<T, E>(delayIncrement: Double, maxDelay: Double, onQueue queue: Queue)(signal: Signal<T, E>) -> Signal<T, NoError> {
return Signal { subscriber in
let shouldRetry = Atomic(value: true)
let currentDelay = Atomic(value: 0.0)
let currentDisposable = MetaDisposable()
let start = recursiveFunction { recurse in
let currentShouldRetry = shouldRetry.with { value in
return value
}
if currentShouldRetry {
let disposable = signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
let delay = currentDelay.modify { value in
return min(maxDelay, value + delayIncrement)
}
let delayTime = dispatch_time(DISPATCH_TIME_NOW, Int64(delay * Double(NSEC_PER_SEC)))
dispatch_after(delayTime, queue.queue) {
recurse()
}
}, completed: {
shouldRetry.swap(false)
subscriber.putCompletion()
})
currentDisposable.set(disposable)
}
}
start()
return ActionDisposable {
currentDisposable.dispose()
shouldRetry.swap(false)
}
}
}

View File

@@ -6,7 +6,7 @@ private struct SignalCombineState {
let error: Bool
}
private func combineLatestAny<E, R>(signals: [Signal<Any, E>], combine: [Any] -> R, initialValues: [Int : Any]) -> Signal<R, E> {
private func combineLatestAny<E, R>(_ signals: [Signal<Any, E>], combine: ([Any]) -> R, initialValues: [Int : Any]) -> Signal<R, E> {
return Signal { subscriber in
let state = Atomic(value: SignalCombineState(values: initialValues, completed: Set(), error: false))
let disposable = DisposableSet()
@@ -36,7 +36,7 @@ private func combineLatestAny<E, R>(signals: [Signal<Any, E>], combine: [Any] ->
}
}, error: { error in
var emitError = false
state.modify { current in
let _ = state.modify { current in
if !current.error {
emitError = true
return SignalCombineState(values: current.values, completed: current.completed, error: true)
@@ -49,7 +49,7 @@ private func combineLatestAny<E, R>(signals: [Signal<Any, E>], combine: [Any] ->
}
}, completed: {
var emitCompleted = false
state.modify { current in
let _ = state.modify { current in
if !current.completed.contains(index) {
var completed = current.completed
completed.insert(index)
@@ -70,7 +70,7 @@ private func combineLatestAny<E, R>(signals: [Signal<Any, E>], combine: [Any] ->
}
}
private func signalOfAny<T, E>(signal: Signal<T, E>) -> Signal<Any, E> {
private func signalOfAny<T, E>(_ signal: Signal<T, E>) -> Signal<Any, E> {
return Signal { subscriber in
return signal.start(next: { next in
subscriber.putNext(next)
@@ -82,31 +82,31 @@ private func signalOfAny<T, E>(signal: Signal<T, E>) -> Signal<Any, E> {
}
}
public func combineLatest<T1, T2, E>(s1: Signal<T1, E>, _ s2: Signal<T2, E>) -> Signal<(T1, T2), E> {
public func combineLatest<T1, T2, E>(_ s1: Signal<T1, E>, _ s2: Signal<T2, E>) -> Signal<(T1, T2), E> {
return combineLatestAny([signalOfAny(s1), signalOfAny(s2)], combine: { values in
return (values[0] as! T1, values[1] as! T2)
}, initialValues: [:])
}
public func combineLatest<T1, T2, E>(s1: Signal<T1, E>, _ v1: T1, _ s2: Signal<T2, E>, _ v2: T2) -> Signal<(T1, T2), E> {
public func combineLatest<T1, T2, E>(_ s1: Signal<T1, E>, _ v1: T1, _ s2: Signal<T2, E>, _ v2: T2) -> Signal<(T1, T2), E> {
return combineLatestAny([signalOfAny(s1), signalOfAny(s2)], combine: { values in
return (values[0] as! T1, values[1] as! T2)
}, initialValues: [0: v1, 1: v2])
}
public func combineLatest<T1, T2, T3, E>(s1: Signal<T1, E>, _ s2: Signal<T2, E>, _ s3: Signal<T3, E>) -> Signal<(T1, T2, T3), E> {
public func combineLatest<T1, T2, T3, E>(_ s1: Signal<T1, E>, _ s2: Signal<T2, E>, _ s3: Signal<T3, E>) -> Signal<(T1, T2, T3), E> {
return combineLatestAny([signalOfAny(s1), signalOfAny(s2), signalOfAny(s3)], combine: { values in
return (values[0] as! T1, values[1] as! T2, values[2] as! T3)
}, initialValues: [:])
}
public func combineLatest<T1, T2, T3, T4, E>(s1: Signal<T1, E>, _ s2: Signal<T2, E>, _ s3: Signal<T3, E>, s4: Signal<T4, E>) -> Signal<(T1, T2, T3, T4), E> {
public func combineLatest<T1, T2, T3, T4, E>(_ s1: Signal<T1, E>, _ s2: Signal<T2, E>, _ s3: Signal<T3, E>, s4: Signal<T4, E>) -> Signal<(T1, T2, T3, T4), E> {
return combineLatestAny([signalOfAny(s1), signalOfAny(s2), signalOfAny(s3), signalOfAny(s4)], combine: { values in
return (values[0] as! T1, values[1] as! T2, values[2] as! T3, values[3] as! T4)
}, initialValues: [:])
}
public func combineLatest<T, E>(signals: [Signal<T, E>]) -> Signal<[T], E> {
public func combineLatest<T, E>(_ signals: [Signal<T, E>]) -> Signal<[T], E> {
if signals.count == 0 {
return single([T](), E.self)
}

View File

@@ -1,72 +1,103 @@
import Foundation
public func deliverOn<T, E>(queue: Queue)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal { subscriber in
return signal.start(next: { next in
queue.dispatch {
subscriber.putNext(next)
}
}, error: { error in
queue.dispatch {
subscriber.putError(error)
}
}, completed: {
queue.dispatch {
subscriber.putCompletion()
}
})
}
}
public func deliverOnMainQueue<T, E>(signal: Signal<T, E>) -> Signal<T, E> {
return signal |> deliverOn(Queue.mainQueue())
}
public func deliverOn<T, E>(threadPool: ThreadPool)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal { subscriber in
let queue = threadPool.nextQueue()
return signal.start(next: { next in
queue.addTask(ThreadPoolTask { state in
if !state.cancelled {
public func deliverOn<T, E>(_ queue: Queue) -> (Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal { subscriber in
return signal.start(next: { next in
queue.async {
subscriber.putNext(next)
}
})
}, error: { error in
queue.addTask(ThreadPoolTask { state in
if !state.cancelled {
}, error: { error in
queue.async {
subscriber.putError(error)
}
})
}, completed: {
queue.addTask(ThreadPoolTask { state in
if !state.cancelled {
}, completed: {
queue.async {
subscriber.putCompletion()
}
})
})
}
}
}
public func runOn<T, E>(queue: Queue)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal { subscriber in
if queue.isCurrent() {
public func deliverOnMainQueue<T, E>(_ signal: Signal<T, E>) -> Signal<T, E> {
return signal |> deliverOn(Queue.mainQueue())
}
public func deliverOn<T, E>(_ threadPool: ThreadPool) -> (Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal { subscriber in
let queue = threadPool.nextQueue()
return signal.start(next: { next in
subscriber.putNext(next)
queue.addTask(ThreadPoolTask { state in
if !state.cancelled {
subscriber.putNext(next)
}
})
}, error: { error in
subscriber.putError(error)
queue.addTask(ThreadPoolTask { state in
if !state.cancelled {
subscriber.putError(error)
}
})
}, completed: {
subscriber.putCompletion()
queue.addTask(ThreadPoolTask { state in
if !state.cancelled {
subscriber.putCompletion()
}
})
})
} else {
var cancelled = false
}
}
}
public func runOn<T, E>(_ queue: Queue) -> (Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal { subscriber in
if queue.isCurrent() {
return signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
})
} else {
var cancelled = false
let disposable = MetaDisposable()
disposable.set(ActionDisposable {
cancelled = true
})
queue.async {
if cancelled {
return
}
disposable.set(signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
}))
}
return disposable
}
}
}
}
public func runOn<T, E>(_ threadPool: ThreadPool) -> (Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal { subscriber in
let cancelled = false
let disposable = MetaDisposable()
disposable.set(ActionDisposable {
cancelled = true
})
queue.dispatch {
if cancelled {
let task = ThreadPoolTask { state in
if cancelled || state.cancelled {
return
}
@@ -79,51 +110,13 @@ public func runOn<T, E>(queue: Queue)(signal: Signal<T, E>) -> Signal<T, E> {
}))
}
disposable.set(ActionDisposable {
task.cancel()
})
threadPool.addTask(task)
return disposable
}
}
}
public func runOn<T, E>(threadPool: ThreadPool)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal { subscriber in
let cancelled = false
let disposable = MetaDisposable()
let task = ThreadPoolTask { state in
if cancelled || state.cancelled {
return
}
disposable.set(signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
}))
}
disposable.set(ActionDisposable {
task.cancel()
})
threadPool.addTask(task)
return disposable
}
}
public func bufferOn<T, E>(queue: Queue, timeout: Double)(signal: Signal<T, E>) -> Signal<[T], E> {
return Signal { subscriber in
let timer = Timer(timeout: timeout, `repeat`: false, completion: {
}, queue: queue)
return signal.start(next: { next in
}, error: { error in
subscriber.putError(error)
}, completed: {
})
}
}

View File

@@ -1,40 +1,46 @@
import Foundation
public func map<T, E, R>(f: T -> R)(signal: Signal<T, E>) -> Signal<R, E> {
return Signal<R, E> { subscriber in
return signal.start(next: { next in
subscriber.putNext(f(next))
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
})
public func map<T, E, R>(_ f: (T) -> R) -> (Signal<T, E>) -> Signal<R, E> {
return { signal in
return Signal<R, E> { subscriber in
return signal.start(next: { next in
subscriber.putNext(f(next))
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
})
}
}
}
public func filter<T, E>(f: T -> Bool)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal<T, E> { subscriber in
return signal.start(next: { next in
if f(next) {
public func filter<T, E>(_ f: (T) -> Bool) -> (Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal<T, E> { subscriber in
return signal.start(next: { next in
if f(next) {
subscriber.putNext(next)
}
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
})
}
}
}
public func mapError<T, E, R>(_ f: (E) -> R) -> (Signal<T, E>) -> Signal<T, R> {
return { signal in
return Signal<T, R> { subscriber in
return signal.start(next: { next in
subscriber.putNext(next)
}
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
})
}
}
public func mapError<T, E, R>(f: E -> R)(signal: Signal<T, E>) -> Signal<T, R> {
return Signal<T, R> { subscriber in
return signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(f(error))
}, completed: {
subscriber.putCompletion()
})
}, error: { error in
subscriber.putError(f(error))
}, completed: {
subscriber.putCompletion()
})
}
}
}
@@ -42,7 +48,7 @@ private class DistinctUntilChangedContext<T> {
var value: T?
}
public func distinctUntilChanged<T: Equatable, E>(signal: Signal<T, E>) -> Signal<T, E> {
public func distinctUntilChanged<T: Equatable, E>(_ signal: Signal<T, E>) -> Signal<T, E> {
return Signal { subscriber in
let context = Atomic(value: DistinctUntilChangedContext<T>())

View File

@@ -19,11 +19,11 @@ private final class SignalQueueState<T, E> : Disposable {
self.throttleMode = throttleMode
}
func beginWithDisposable(disposable: Disposable) {
func beginWithDisposable(_ disposable: Disposable) {
self.disposable = disposable
}
func enqueueSignal(signal: Signal<T, E>) {
func enqueueSignal(_ signal: Signal<T, E>) {
var startSignal = false
OSSpinLockLock(&self.lock)
if self.queueMode && self.executingSignal {
@@ -61,7 +61,7 @@ private final class SignalQueueState<T, E> : Disposable {
if self.queueMode {
if self.queuedSignals.count != 0 {
nextSignal = self.queuedSignals[0]
self.queuedSignals.removeAtIndex(0)
self.queuedSignals.remove(at: 0)
self.executingSignal = true
} else {
terminated = self.terminated
@@ -111,7 +111,7 @@ private final class SignalQueueState<T, E> : Disposable {
}
}
public func switchToLatest<T, E>(signal: Signal<Signal<T, E>, E>) -> Signal<T, E> {
public func switchToLatest<T, E>(_ signal: Signal<Signal<T, E>, E>) -> Signal<T, E> {
return Signal { subscriber in
let state = SignalQueueState(subscriber: subscriber, queueMode: false, throttleMode: false)
state.beginWithDisposable(signal.start(next: { next in
@@ -125,7 +125,7 @@ public func switchToLatest<T, E>(signal: Signal<Signal<T, E>, E>) -> Signal<T, E
}
}
public func queue<T, E>(signal: Signal<Signal<T, E>, E>) -> Signal<T, E> {
public func queue<T, E>(_ signal: Signal<Signal<T, E>, E>) -> Signal<T, E> {
return Signal { subscriber in
let state = SignalQueueState(subscriber: subscriber, queueMode: true, throttleMode: false)
state.beginWithDisposable(signal.start(next: { next in
@@ -139,7 +139,7 @@ public func queue<T, E>(signal: Signal<Signal<T, E>, E>) -> Signal<T, E> {
}
}
public func throttled<T, E>(signal: Signal<Signal<T, E>, E>) -> Signal<T, E> {
public func throttled<T, E>(_ signal: Signal<Signal<T, E>, E>) -> Signal<T, E> {
return Signal { subscriber in
let state = SignalQueueState(subscriber: subscriber, queueMode: true, throttleMode: true)
state.beginWithDisposable(signal.start(next: { next in
@@ -153,7 +153,7 @@ public func throttled<T, E>(signal: Signal<Signal<T, E>, E>) -> Signal<T, E> {
}
}
public func mapToSignal<T, R, E>(f: T -> Signal<R, E>) -> (signal: Signal<T, E>) -> Signal<R, E> {
public func mapToSignal<T, R, E>(_ f: (T) -> Signal<R, E>) -> (Signal<T, E>) -> Signal<R, E> {
return { signal -> Signal<R, E> in
return Signal<Signal<R, E>, E> { subscriber in
return signal.start(next: { next in
@@ -167,7 +167,7 @@ public func mapToSignal<T, R, E>(f: T -> Signal<R, E>) -> (signal: Signal<T, E>)
}
}
public func mapToSignalPromotingError<T, R, E>(f: T -> Signal<R, E>) -> (signal: Signal<T, NoError>) -> Signal<R, E> {
public func mapToSignalPromotingError<T, R, E>(_ f: (T) -> Signal<R, E>) -> (Signal<T, NoError>) -> Signal<R, E> {
return { signal -> Signal<R, E> in
return Signal<Signal<R, E>, E> { subscriber in
return signal.start(next: { next in
@@ -179,19 +179,19 @@ public func mapToSignalPromotingError<T, R, E>(f: T -> Signal<R, E>) -> (signal:
}
}
public func mapToQueue<T, R, E>(f: T -> Signal<R, E>) -> (signal: Signal<T, E>) -> Signal<R, E> {
public func mapToQueue<T, R, E>(_ f: (T) -> Signal<R, E>) -> (Signal<T, E>) -> Signal<R, E> {
return { signal -> Signal<R, E> in
return signal |> map { f($0) } |> queue
}
}
public func mapToThrottled<T, R, E>(f: T -> Signal<R, E>) -> (signal: Signal<T, E>) -> Signal<R, E> {
public func mapToThrottled<T, R, E>(_ f: (T) -> Signal<R, E>) -> (Signal<T, E>) -> Signal<R, E> {
return { signal -> Signal<R, E> in
return signal |> map { f($0) } |> throttled
}
}
public func then<T, E>(nextSignal: Signal<T, E>) -> (signal: Signal<T, E>) -> Signal<T, E> {
public func then<T, E>(_ nextSignal: Signal<T, E>) -> (Signal<T, E>) -> Signal<T, E> {
return { signal -> Signal<T, E> in
return Signal<T, E> { subscriber in
let disposable = DisposableSet()
@@ -215,7 +215,7 @@ public func then<T, E>(nextSignal: Signal<T, E>) -> (signal: Signal<T, E>) -> Si
}
}
public func `defer`<T, E>(generator: () -> Signal<T, E>) -> Signal<T, E> {
public func deferred<T, E>(_ generator: () -> Signal<T, E>) -> Signal<T, E> {
return Signal { subscriber in
return generator().start(next: { next in
subscriber.putNext(next)

View File

@@ -1,35 +1,39 @@
import Foundation
public func reduceLeft<T, E>(value: T, f: (T, T) -> T)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal<T, E> { subscriber in
var currentValue = value
return signal.start(next: { next in
currentValue = f(currentValue, next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putNext(currentValue)
subscriber.putCompletion()
})
}
}
public func reduceLeft<T, E>(value: T, f: (T, T, T -> Void) -> T)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal<T, E> { subscriber in
var currentValue = value
let emit: T -> Void = { next in
subscriber.putNext(next)
}
return signal.start(next: { next in
currentValue = f(currentValue, next, emit)
public func reduceLeft<T, E>(value: T, f: (T, T) -> T) -> (signal: Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal<T, E> { subscriber in
var currentValue = value
return signal.start(next: { next in
currentValue = f(currentValue, next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putNext(currentValue)
subscriber.putCompletion()
})
})
}
}
}
public func reduceLeft<T, E>(value: T, f: (T, T, (T) -> Void) -> T) -> (signal: Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal<T, E> { subscriber in
var currentValue = value
let emit: (T) -> Void = { next in
subscriber.putNext(next)
}
return signal.start(next: { next in
currentValue = f(currentValue, next, emit)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putNext(currentValue)
subscriber.putCompletion()
})
}
}
}
@@ -57,11 +61,11 @@ private final class ReduceQueueState<T, E> : Disposable {
self.value = value
}
func beginWithDisposable(disposable: Disposable) {
func beginWithDisposable(_ disposable: Disposable) {
self.disposable = disposable
}
func enqueueNext(next: T) {
func enqueueNext(_ next: T) {
var startSignal = false
var currentValue: T
OSSpinLockLock(&self.lock)
@@ -92,7 +96,7 @@ private final class ReduceQueueState<T, E> : Disposable {
}
}
func updateValue(value: T) {
func updateValue(_ value: T) {
OSSpinLockLock(&self.lock)
self.value = value
OSSpinLockUnlock(&self.lock)
@@ -110,7 +114,7 @@ private final class ReduceQueueState<T, E> : Disposable {
self.executingSignal = false
if self.queuedValues.count != 0 {
nextSignal = self.generator(self.value, self.queuedValues[0])
self.queuedValues.removeAtIndex(0)
self.queuedValues.remove(at: 0)
self.executingSignal = true
} else {
currentValue = self.value
@@ -168,16 +172,18 @@ private final class ReduceQueueState<T, E> : Disposable {
}
}
public func reduceLeft<T, E>(value: T, generator: (T, T) -> Signal<(T, Passthrough<T>), E>)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal { subscriber in
let state = ReduceQueueState(subscriber: subscriber, value: value, generator: generator)
state.beginWithDisposable(signal.start(next: { next in
state.enqueueNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
state.beginCompletion()
}))
return state
public func reduceLeft<T, E>(_ value: T, generator: (T, T) -> Signal<(T, Passthrough<T>), E>) -> (signal: Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal { subscriber in
let state = ReduceQueueState(subscriber: subscriber, value: value, generator: generator)
state.beginWithDisposable(signal.start(next: { next in
state.enqueueNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
state.beginCompletion()
}))
return state
}
}
}

View File

@@ -1,89 +1,101 @@
import Foundation
public func beforeNext<T, E, R>(f: T -> R)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal<T, E> { subscriber in
return signal.start(next: { next in
let _ = f(next)
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
})
public func beforeNext<T, E, R>(_ f: (T) -> R) -> (Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal<T, E> { subscriber in
return signal.start(next: { next in
let _ = f(next)
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
})
}
}
}
public func afterNext<T, E, R>(f: T -> R)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal<T, E> { subscriber in
return signal.start(next: { next in
subscriber.putNext(next)
let _ = f(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
})
public func afterNext<T, E, R>(_ f: (T) -> R) -> (Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal<T, E> { subscriber in
return signal.start(next: { next in
subscriber.putNext(next)
let _ = f(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
})
}
}
}
public func beforeStarted<T, E>(f: () -> Void)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal<T, E> { subscriber in
f()
return signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
})
}
}
public func beforeCompleted<T, E>(f: () -> Void)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal<T, E> { subscriber in
return signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
public func beforeStarted<T, E>(_ f: () -> Void) -> (Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal<T, E> { subscriber in
f()
subscriber.putCompletion()
})
return signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
})
}
}
}
public func afterCompleted<T, E>(f: () -> Void)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal<T, E> { subscriber in
return signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
f()
})
public func beforeCompleted<T, E>(_ f: () -> Void) -> (Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal<T, E> { subscriber in
return signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
f()
subscriber.putCompletion()
})
}
}
}
public func afterDisposed<T, E, R>(f: Void -> R)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal<T, E> { subscriber in
let disposable = DisposableSet()
disposable.add(signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
}))
disposable.add(ActionDisposable {
let _ = f()
})
return disposable
public func afterCompleted<T, E>(_ f: () -> Void) -> (Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal<T, E> { subscriber in
return signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
f()
})
}
}
}
public func withState<T, E, S>(signal: Signal<T, E>, _ initialState: () -> S, next: (T, S) -> Void = { _ in }, error: (E, S) -> Void = { _ in }, completed: (S) -> Void = { _ in }, disposed: (S) -> Void = { _ in }) -> Signal<T, E> {
public func afterDisposed<T, E, R>(_ f: (Void) -> R) -> (Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal<T, E> { subscriber in
let disposable = DisposableSet()
disposable.add(signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
}))
disposable.add(ActionDisposable {
let _ = f()
})
return disposable
}
}
}
public func withState<T, E, S>(_ signal: Signal<T, E>, _ initialState: () -> S, next: (T, S) -> Void = { _ in }, error: (E, S) -> Void = { _ in }, completed: (S) -> Void = { _ in }, disposed: (S) -> Void = { _ in }) -> Signal<T, E> {
return Signal { subscriber in
let state = initialState()
let disposable = signal.start(next: { vNext in

View File

@@ -1,6 +1,6 @@
import Foundation
public func single<T, E>(value: T, _ errorType: E.Type) -> Signal<T, E> {
public func single<T, E>(_ value: T, _ errorType: E.Type) -> Signal<T, E> {
return Signal<T, E> { subscriber in
subscriber.putNext(value)
subscriber.putCompletion()
@@ -9,7 +9,7 @@ public func single<T, E>(value: T, _ errorType: E.Type) -> Signal<T, E> {
}
}
public func fail<T, E>(valueType: T.Type, _ error: E) -> Signal<T, E> {
public func fail<T, E>(_ valueType: T.Type, _ error: E) -> Signal<T, E> {
return Signal<T, E> { subscriber in
subscriber.putError(error)
@@ -17,7 +17,7 @@ public func fail<T, E>(valueType: T.Type, _ error: E) -> Signal<T, E> {
}
}
public func complete<T, E>(valueType: T.Type, _ error: E.Type) -> Signal<T, E> {
public func complete<T, E>(_ valueType: T.Type, _ error: E.Type) -> Signal<T, E> {
return Signal<T, E> { subscriber in
subscriber.putCompletion()
@@ -25,7 +25,7 @@ public func complete<T, E>(valueType: T.Type, _ error: E.Type) -> Signal<T, E> {
}
}
public func never<T, E>(valueType: T.Type, _ error: E.Type) -> Signal<T, E> {
public func never<T, E>(_ valueType: T.Type, _ error: E.Type) -> Signal<T, E> {
return Signal { _ in
return EmptyDisposable
}

View File

@@ -1,28 +1,30 @@
import Foundation
public func take<T, E>(count: Int)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal { subscriber in
let counter = Atomic(value: 0)
return signal.start(next: { next in
var passthrough = false
var complete = false
counter.modify { value in
let updatedCount = value + 1
passthrough = updatedCount <= count
complete = updatedCount == count
return updatedCount
}
if passthrough {
subscriber.putNext(next)
}
if complete {
public func take<T, E>(_ count: Int) -> (Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal { subscriber in
let counter = Atomic(value: 0)
return signal.start(next: { next in
var passthrough = false
var complete = false
let _ = counter.modify { value in
let updatedCount = value + 1
passthrough = updatedCount <= count
complete = updatedCount == count
return updatedCount
}
if passthrough {
subscriber.putNext(next)
}
if complete {
subscriber.putCompletion()
}
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
}
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
})
})
}
}
}
@@ -30,7 +32,7 @@ public func last<T, E>(signal: Signal<T, E>) -> Signal<T?, E> {
return Signal { subscriber in
let value = Atomic<T?>(value: nil)
return signal.start(next: { next in
value.swap(next)
let _ = value.swap(next)
}, error: { error in
subscriber.putError(error)
}, completed: { completed in

View File

@@ -1,56 +1,60 @@
import Foundation
public func delay<T, E>(timeout: NSTimeInterval, queue: Queue)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal<T, E> { subscriber in
let disposable = MetaDisposable()
let timer = Timer(timeout: timeout, `repeat`: false, completion: {
disposable.set(signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
}))
}, queue: queue)
disposable.set(ActionDisposable {
timer.invalidate()
})
timer.start()
return disposable
public func delay<T, E>(_ timeout: Double, queue: Queue) -> (signal: Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal<T, E> { subscriber in
let disposable = MetaDisposable()
let timer = Timer(timeout: timeout, repeat: false, completion: {
disposable.set(signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
}))
}, queue: queue)
disposable.set(ActionDisposable {
timer.invalidate()
})
timer.start()
return disposable
}
}
}
public func timeout<T, E>(timeout: NSTimeInterval, queue: Queue, alternate: Signal<T, E>)(signal: Signal<T, E>) -> Signal<T, E> {
return Signal<T, E> { subscriber in
let disposable = MetaDisposable()
let timer = Timer(timeout: timeout, `repeat`: false, completion: {
disposable.set(alternate.start(next: { next in
public func timeout<T, E>(_ timeout: Double, queue: Queue, alternate: Signal<T, E>) -> (Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal<T, E> { subscriber in
let disposable = MetaDisposable()
let timer = Timer(timeout: timeout, repeat: false, completion: {
disposable.set(alternate.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
}, completed: {
subscriber.putCompletion()
}))
}, queue: queue)
disposable.set(signal.start(next: { next in
timer.invalidate()
subscriber.putNext(next)
}, error: { error in
timer.invalidate()
subscriber.putError(error)
}, completed: {
timer.invalidate()
subscriber.putCompletion()
}))
}, queue: queue)
disposable.set(signal.start(next: { next in
timer.invalidate()
subscriber.putNext(next)
}, error: { error in
timer.invalidate()
subscriber.putError(error)
}, completed: {
timer.invalidate()
subscriber.putCompletion()
}))
timer.start()
let disposableSet = DisposableSet()
disposableSet.add(ActionDisposable {
timer.invalidate()
})
disposableSet.add(disposable)
return disposableSet
timer.start()
let disposableSet = DisposableSet()
disposableSet.add(ActionDisposable {
timer.invalidate()
})
disposableSet.add(disposable)
return disposableSet
}
}
}

View File

@@ -1,21 +1,21 @@
import Foundation
public final class Subscriber<T, E> {
private var next: (T -> Void)!
private var error: (E -> Void)!
private var next: ((T) -> Void)!
private var error: ((E) -> Void)!
private var completed: (() -> Void)!
private var lock: OSSpinLock = 0
private var terminated = false
internal var disposable: Disposable!
public init(next: (T -> Void)! = nil, error: (E -> Void)! = nil, completed: (() -> Void)! = nil) {
public init(next: ((T) -> Void)! = nil, error: ((E) -> Void)! = nil, completed: (() -> Void)! = nil) {
self.next = next
self.error = error
self.completed = completed
}
internal func assignDisposable(disposable: Disposable) {
internal func assignDisposable(_ disposable: Disposable) {
if self.terminated {
disposable.dispose()
} else {
@@ -34,8 +34,8 @@ public final class Subscriber<T, E> {
OSSpinLockUnlock(&self.lock)
}
public func putNext(next: T) {
var action: (T -> Void)! = nil
public func putNext(_ next: T) {
var action: ((T) -> Void)! = nil
OSSpinLockLock(&self.lock)
if !self.terminated {
action = self.next
@@ -47,9 +47,9 @@ public final class Subscriber<T, E> {
}
}
public func putError(error: E) {
public func putError(_ error: E) {
var shouldDispose = false
var action: (E -> Void)! = nil
var action: ((E) -> Void)! = nil
OSSpinLockLock(&self.lock)
if !self.terminated {
@@ -67,7 +67,7 @@ public final class Subscriber<T, E> {
}
if shouldDispose && self.disposable != nil {
let disposable = self.disposable
let disposable = self.disposable!
disposable.dispose()
self.disposable = nil
}
@@ -93,7 +93,7 @@ public final class Subscriber<T, E> {
}
if shouldDispose && self.disposable != nil {
let disposable = self.disposable
let disposable = self.disposable!
disposable.dispose()
self.disposable = nil
}

View File

@@ -6,13 +6,13 @@ public final class ThreadPoolTaskState {
public final class ThreadPoolTask {
private let state = ThreadPoolTaskState()
private let action: ThreadPoolTaskState -> ()
private let action: (ThreadPoolTaskState) -> ()
public init(_ action: ThreadPoolTaskState -> ()) {
public init(_ action: (ThreadPoolTaskState) -> ()) {
self.action = action
}
internal func execute() {
func execute() {
if !state.cancelled {
self.action(self.state)
}
@@ -31,7 +31,7 @@ public final class ThreadPoolQueue : Equatable {
self.threadPool = threadPool
}
public func addTask(task: ThreadPoolTask) {
public func addTask(_ task: ThreadPoolTask) {
if let threadPool = self.threadPool {
threadPool.workOnQueue(self, action: {
self.tasks.append(task)
@@ -42,7 +42,7 @@ public final class ThreadPoolQueue : Equatable {
private func popFirstTask() -> ThreadPoolTask? {
if self.tasks.count != 0 {
let task = self.tasks[0];
self.tasks.removeAtIndex(0)
self.tasks.remove(at: 0)
return task
} else {
return nil
@@ -59,13 +59,13 @@ public func ==(lhs: ThreadPoolQueue, rhs: ThreadPoolQueue) -> Bool {
}
@objc public final class ThreadPool: NSObject {
private var threads: [NSThread] = []
private var threads: [Thread] = []
private var queues: [ThreadPoolQueue] = []
private var takenQueues: [ThreadPoolQueue] = []
private var mutex: pthread_mutex_t
private var condition: pthread_cond_t
@objc class func threadEntryPoint(threadPool: ThreadPool) {
@objc class func threadEntryPoint(_ threadPool: ThreadPool) {
var queue: ThreadPoolQueue!
while (true) {
@@ -74,8 +74,8 @@ public func ==(lhs: ThreadPoolQueue, rhs: ThreadPoolQueue) -> Bool {
pthread_mutex_lock(&threadPool.mutex);
if queue != nil {
if let index = threadPool.takenQueues.indexOf(queue) {
threadPool.takenQueues.removeAtIndex(index)
if let index = threadPool.takenQueues.index(of: queue) {
threadPool.takenQueues.remove(at: index)
}
if queue.hasTasks() {
@@ -97,8 +97,8 @@ public func ==(lhs: ThreadPoolQueue, rhs: ThreadPoolQueue) -> Bool {
task = queue.popFirstTask()
threadPool.takenQueues.append(queue)
if let index = threadPool.queues.indexOf(queue) {
threadPool.queues.removeAtIndex(index)
if let index = threadPool.queues.index(of: queue) {
threadPool.queues.remove(at: index)
}
break
@@ -125,7 +125,7 @@ public func ==(lhs: ThreadPoolQueue, rhs: ThreadPoolQueue) -> Bool {
super.init()
for _ in 0 ..< threadCount {
let thread = NSThread(target: ThreadPool.self, selector: Selector("threadEntryPoint:"), object: self)
let thread = Thread(target: ThreadPool.self, selector: #selector(ThreadPool.threadEntryPoint(_:)), object: self)
thread.threadPriority = threadPriority
self.threads.append(thread)
thread.start()
@@ -137,12 +137,12 @@ public func ==(lhs: ThreadPoolQueue, rhs: ThreadPoolQueue) -> Bool {
pthread_cond_destroy(&self.condition)
}
public func addTask(task: ThreadPoolTask) {
public func addTask(_ task: ThreadPoolTask) {
let tempQueue = self.nextQueue()
tempQueue.addTask(task)
}
private func workOnQueue(queue: ThreadPoolQueue, action: () -> ()) {
private func workOnQueue(_ queue: ThreadPoolQueue, action: () -> ()) {
pthread_mutex_lock(&self.mutex)
action()
if !self.queues.contains(queue) && !self.takenQueues.contains(queue) {
@@ -157,7 +157,7 @@ public func ==(lhs: ThreadPoolQueue, rhs: ThreadPoolQueue) -> Bool {
}
public func isCurrentThreadInPool() -> Bool {
let currentThread = NSThread.currentThread()
let currentThread = Thread.current()
for thread in self.threads {
if currentThread.isEqual(thread) {
return true
@@ -165,4 +165,4 @@ public func ==(lhs: ThreadPoolQueue, rhs: ThreadPoolQueue) -> Bool {
}
return false
}
}
}

View File

@@ -1,13 +1,13 @@
import Foundation
public final class Timer {
private var timer: dispatch_source_t!
private var timeout: NSTimeInterval
private var timer: DispatchSourceTimer?
private var timeout: Double
private var `repeat`: Bool
private var completion: Void -> Void
private var completion: (Void) -> Void
private var queue: Queue
public init(timeout: NSTimeInterval, `repeat`: Bool, completion: Void -> Void, queue: Queue) {
public init(timeout: Double, `repeat`: Bool, completion: (Void) -> Void, queue: Queue) {
self.timeout = timeout
self.`repeat` = `repeat`
self.completion = completion
@@ -19,9 +19,8 @@ public final class Timer {
}
public func start() {
self.timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, self.queue.queue)
dispatch_source_set_timer(self.timer, dispatch_time(DISPATCH_TIME_NOW, Int64(self.timeout * NSTimeInterval(NSEC_PER_SEC))), self.`repeat` ? UInt64(self.timeout * NSTimeInterval(NSEC_PER_SEC)) : DISPATCH_TIME_FOREVER, 0);
dispatch_source_set_event_handler(self.timer, { [weak self] in
let timer = DispatchSource.timer(queue: self.queue.queue)
timer.setEventHandler(handler: { [weak self] in
if let strongSelf = self {
strongSelf.completion()
if !strongSelf.`repeat` {
@@ -29,12 +28,22 @@ public final class Timer {
}
}
})
dispatch_resume(self.timer)
self.timer = timer
if self.`repeat` {
let time: DispatchTime = DispatchTime.now() + self.timeout
timer.scheduleRepeating(deadline: time, interval: self.timeout)
} else {
let time: DispatchTime = DispatchTime.now() + self.timeout
timer.scheduleOneshot(deadline: time)
}
timer.resume()
}
public func invalidate() {
if self.timer != nil {
dispatch_source_cancel(self.timer)
if let timer = self.timer {
timer.cancel()
self.timer = nil
}
}

View File

@@ -1,10 +1,9 @@
import Foundation
public final class Pipe<T> {
let subscribers = Atomic(value: Bag<T -> Void>())
public final class ValuePipe<T> {
private let subscribers = Atomic(value: Bag<(T) -> Void>())
public init() {
}
public func signal() -> Signal<T, Void> {
@@ -29,8 +28,8 @@ public final class Pipe<T> {
}
}
public func putNext(next: T) {
let items = self.subscribers.with { value -> [T -> Void] in
public func putNext(_ next: T) {
let items = self.subscribers.with { value -> [(T) -> Void] in
return value.copyItems()
}
for f in items {

View File

@@ -8,7 +8,7 @@ internal class DeallocatingObject : CustomStringConvertible {
}
deinit {
self.deallocated.memory = true
self.deallocated.pointee = true
}
var description: String {

View File

@@ -86,7 +86,7 @@ class PerformanceTests: XCTestCase {
}
func testMeasureLock() {
measureBlock {
measure {
for _ in 0 ..< 1000000 {
let disposable = DisposableLock(action: {})
disposable.dispose()
@@ -95,7 +95,7 @@ class PerformanceTests: XCTestCase {
}
func testMeasureSpinlock() {
measureBlock {
measure {
for _ in 0 ..< 1000000 {
let disposable = DisposableSpinLock(action: {})
disposable.dispose()
@@ -104,11 +104,11 @@ class PerformanceTests: XCTestCase {
}
func testMeasureAtomic() {
measureBlock {
measure {
for _ in 0 ..< 1000000 {
let disposable = DisposableAtomic(action: {})
disposable.dispose()
}
}
}
}
}

View File

@@ -17,7 +17,7 @@ class SwiftSignalKitTests: XCTestCase {
if true {
var object: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated)
let disposable = ActionDisposable(action: { [object] () -> Void in
object.debugDescription
let _ = object.debugDescription
disposed = true
})
object = nil
@@ -33,9 +33,9 @@ class SwiftSignalKitTests: XCTestCase {
var deallocated = false
var disposed = false
if true {
var object: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated)
let disposable = ActionDisposable(action: { [object] () -> Void in
object.debugDescription
let object: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated)
let _ = ActionDisposable(action: { [object] () -> Void in
let _ = object.debugDescription
disposed = true
})
}
@@ -47,9 +47,9 @@ class SwiftSignalKitTests: XCTestCase {
var deallocated = false
var disposed = false
if true {
var object: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated)
let object: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated)
let disposable = ActionDisposable(action: { [object] () -> Void in
object.debugDescription
let _ = object.debugDescription
disposed = true
})
@@ -67,15 +67,15 @@ class SwiftSignalKitTests: XCTestCase {
var deallocated2 = false
var disposed2 = false
if true {
var object1: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated1)
let object1: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated1)
let actionDisposable1 = ActionDisposable(action: { [object1] () -> Void in
object1.debugDescription
let _ = object1.debugDescription
disposed1 = true
})
var object2: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated2)
let object2: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated2)
let actionDisposable2 = ActionDisposable(action: { [object2] () -> Void in
object2.debugDescription
let _ = object2.debugDescription
disposed2 = true
})
@@ -94,9 +94,9 @@ class SwiftSignalKitTests: XCTestCase {
var deallocated = false
var disposed = false
if true {
var object: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated)
let object: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated)
let disposable = ActionDisposable(action: { [object] () -> Void in
object.debugDescription
let _ = object.debugDescription
disposed = true
})
@@ -111,9 +111,9 @@ class SwiftSignalKitTests: XCTestCase {
var deallocated = false
var disposed = false
if true {
var object: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated)
let object: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated)
let disposable = ActionDisposable(action: { [object] () -> Void in
object.debugDescription
let _ = object.debugDescription
disposed = true
})
@@ -131,15 +131,15 @@ class SwiftSignalKitTests: XCTestCase {
var deallocated2 = false
var disposed2 = false
if true {
var object1: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated1)
let object1: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated1)
let actionDisposable1 = ActionDisposable(action: { [object1] () -> Void in
object1.debugDescription
let _ = object1.debugDescription
disposed1 = true
})
var object2: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated2)
let object2: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated2)
let actionDisposable2 = ActionDisposable(action: { [object2] () -> Void in
object2.debugDescription
let _ = object2.debugDescription
disposed2 = true
})
@@ -158,9 +158,9 @@ class SwiftSignalKitTests: XCTestCase {
var deallocated = false
var disposed = false
if true {
var object: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated)
let object: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated)
let disposable = ActionDisposable(action: { [object] () -> Void in
object.debugDescription
let _ = object.debugDescription
disposed = true
})
@@ -177,15 +177,15 @@ class SwiftSignalKitTests: XCTestCase {
var deallocated2 = false
var disposed2 = false
if true {
var object1: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated1)
let object1: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated1)
let actionDisposable1 = ActionDisposable(action: { [object1] () -> Void in
object1.debugDescription
let _ = object1.debugDescription
disposed1 = true
})
var object2: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated2)
let object2: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated2)
let actionDisposable2 = ActionDisposable(action: { [object2] () -> Void in
object2.debugDescription
let _ = object2.debugDescription
disposed2 = true
})
@@ -205,15 +205,15 @@ class SwiftSignalKitTests: XCTestCase {
var deallocated2 = false
var disposed2 = false
if true {
var object1: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated1)
let object1: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated1)
let actionDisposable1 = ActionDisposable(action: { [object1] () -> Void in
object1.debugDescription
let _ = object1.debugDescription
disposed1 = true
})
var object2: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated2)
let object2: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated2)
let actionDisposable2 = ActionDisposable(action: { [object2] () -> Void in
object2.debugDescription
let _ = object2.debugDescription
disposed2 = true
})
@@ -234,15 +234,15 @@ class SwiftSignalKitTests: XCTestCase {
var deallocated2 = false
var disposed2 = false
if true {
var object1: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated1)
let object1: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated1)
let actionDisposable1 = ActionDisposable(action: { [object1] () -> Void in
object1.debugDescription
let _ = object1.debugDescription
disposed1 = true
})
var object2: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated2)
let object2: DeallocatingObject? = DeallocatingObject(deallocated: &deallocated2)
let actionDisposable2 = ActionDisposable(action: { [object2] () -> Void in
object2.debugDescription
let _ = object2.debugDescription
disposed2 = true
})

View File

@@ -2,7 +2,7 @@ import UIKit
import XCTest
import SwiftSignalKit
func singleSignalInt(value: Signal<Int, Void>) -> Signal<Signal<Int, Void>, Void> {
func singleSignalInt(_ value: Signal<Int, Void>) -> Signal<Signal<Int, Void>, Void> {
return Signal { subscriber in
subscriber.putNext(value)
subscriber.putCompletion()
@@ -30,14 +30,14 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
let signal = Signal<Int, Void> { [object] subscriber in
subscriber.putNext(1)
return ActionDisposable {
object?.description
let _ = object?.description
disposed = true
}
}
let disposable = signal.start(next: { [object] next in
generated = true
object?.description
let _ = object?.description
})
object = nil
@@ -65,17 +65,17 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
subscriber.putCompletion()
return ActionDisposable {
object?.description
let _ = object?.description
disposed = true
}
}
let disposable = signal.start(next: { [object] next in
generated = true
object?.description
let _ = object?.description
}, completed: { [object]
completed = true
object?.description
let _ = object?.description
})
object = nil
@@ -105,21 +105,21 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
subscriber.putNext(1)
return ActionDisposable {
object?.description
let _ = object?.description
disposed = true
}
}
let disposable = signal.start(next: { [object] next in
generated = true
object?.description
let _ = object?.description
}, error: { [object] _ in
error = true
object?.description
let _ = object?.description
},
completed: { [object]
completed = true
object?.description
let _ = object?.description
})
object = nil
@@ -147,7 +147,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
subscriber.putNext(1)
return ActionDisposable {
object?.description
let _ = object?.description
disposed = true
}
}
@@ -155,7 +155,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
let disposable = signal.start(next: { [object] next in
generated = next == 2
object?.description
let _ = object?.description
})
object = nil
@@ -185,7 +185,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
}
var result = 0
catchSignal.start(next: { next in
let _ = catchSignal.start(next: { next in
result += next
})
@@ -195,14 +195,14 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
func testSubscriberDisposal() {
var disposed = false
var generated = false
var queue = dispatch_queue_create(nil, DISPATCH_QUEUE_SERIAL);
let queue = DispatchQueue(label: "")
if true {
let signal = Signal<Int, Void> { subscriber in
dispatch_async(queue, {
queue.async {
usleep(200)
subscriber.putNext(1)
})
}
return ActionDisposable {
disposed = true
}
@@ -213,7 +213,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
})
disposable.dispose()
dispatch_barrier_sync(queue, {})
queue.sync(flags: [.barrier], execute: {})
XCTAssertTrue(disposed, "disposed != true")
XCTAssertFalse(generated, "generated != false")
@@ -245,7 +245,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
}
})
signal.start(next: { next in
let _ = signal.start(next: { next in
result += next
})
@@ -271,7 +271,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
let signal = combineLatest(s1, s2)
var completed = false
signal.start(next: { next in
let _ = signal.start(next: { next in
XCTAssert(next.0 == 1 && next.1 == 2, "next != (1, 2)")
return
}, completed: {
@@ -300,7 +300,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
let signal = combineLatest(s1, s2, s3)
var completed = false
signal.start(next: { next in
let _ = signal.start(next: { next in
XCTAssert(next.0 == 1 && next.1 == 2 && next.2 == 3, "next != (1, 2, 3)")
return
}, completed: {
@@ -315,19 +315,19 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
let s3 = complete(Int.self, Void.self)
var singleEmitted = false
s1.start(next: { next in
let _ = s1.start(next: { next in
singleEmitted = next == 1
})
XCTAssert(singleEmitted == true, "singleEmitted != true")
var errorEmitted = false
s2.start(error: { error in
let _ = s2.start(error: { error in
errorEmitted = true
})
XCTAssert(errorEmitted == true, "errorEmitted != true")
var completedEmitted = false
s3.start(completed: {
let _ = s3.start(completed: {
completedEmitted = true
})
XCTAssert(completedEmitted == true, "errorEmitted != true")
@@ -345,15 +345,15 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
var deallocatedThree = false
if true {
var objectOne: DeallocatingObject? = DeallocatingObject(deallocated: &deallocatedOne)
var objectTwo: DeallocatingObject? = DeallocatingObject(deallocated: &deallocatedTwo)
var objectThree: DeallocatingObject? = DeallocatingObject(deallocated: &deallocatedThree)
let objectOne: DeallocatingObject? = DeallocatingObject(deallocated: &deallocatedOne)
let objectTwo: DeallocatingObject? = DeallocatingObject(deallocated: &deallocatedTwo)
let objectThree: DeallocatingObject? = DeallocatingObject(deallocated: &deallocatedThree)
let one = Signal<Int, Void> { subscriber in
subscriber.putNext(1)
subscriber.putCompletion()
return ActionDisposable { [objectOne] in
objectOne?.description
let _ = objectOne?.description
disposedOne = true
}
}
@@ -362,7 +362,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
subscriber.putNext(2)
subscriber.putCompletion()
return ActionDisposable { [objectTwo] in
objectTwo?.description
let _ = objectTwo?.description
disposedTwo = true
}
}
@@ -371,14 +371,14 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
subscriber.putNext(3)
subscriber.putCompletion()
return ActionDisposable { [objectThree] in
objectThree?.description
let _ = objectThree?.description
disposedThree = true
}
}
let signal = singleSignalInt(one) |> then(singleSignalInt(two)) |> then(singleSignalInt(three)) |> switchToLatest
signal.start(next: { next in
let _ = signal.start(next: { next in
result.append(next)
}, completed: {
completedAll = true
@@ -405,7 +405,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
let signal = singleSignalInt(one) |> switchToLatest
signal.start(error: { error in
let _ = signal.start(error: { error in
errorGenerated = true
})
@@ -413,7 +413,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
}
func testQueue() {
let q = dispatch_queue_create(nil, DISPATCH_QUEUE_SERIAL)
let q = DispatchQueue(label: "")
var disposedOne = false
var disposedTwo = false
@@ -422,30 +422,30 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
var result: [Int] = []
let one = Signal<Int, Void> { subscriber in
dispatch_async(q, {
q.async {
subscriber.putNext(1)
subscriber.putCompletion()
})
}
return ActionDisposable {
disposedOne = true
}
}
let two = Signal<Int, Void> { subscriber in
dispatch_async(q, {
q.async {
subscriber.putNext(2)
subscriber.putCompletion()
})
}
return ActionDisposable {
disposedTwo = true
}
}
let three = Signal<Int, Void> { subscriber in
dispatch_async(q, {
q.async {
subscriber.putNext(3)
subscriber.putCompletion()
})
}
return ActionDisposable {
disposedThree = true
}
@@ -453,7 +453,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
let signal = singleSignalInt(one) |> then(singleSignalInt(two)) |> then(singleSignalInt(three)) |> queue
signal.start(next: { next in
let _ = signal.start(next: { next in
print("next: \(next)")
result.append(next)
}, completed: {
@@ -470,7 +470,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
}
func testQueueInterrupted() {
let q = dispatch_queue_create(nil, DISPATCH_QUEUE_SERIAL)
let q = DispatchQueue(label: "")
var disposedOne = false
var disposedTwo = false
@@ -480,20 +480,20 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
var result: [Int] = []
let one = Signal<Int, Void> { subscriber in
dispatch_async(q, {
q.async {
subscriber.putNext(1)
subscriber.putCompletion()
})
}
return ActionDisposable {
disposedOne = true
}
}
let two = Signal<Int, Void> { subscriber in
dispatch_async(q, {
q.async {
subscriber.putNext(2)
subscriber.putError(Void())
})
}
return ActionDisposable {
disposedTwo = true
}
@@ -501,10 +501,10 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
let three = Signal<Int, Void> { subscriber in
startedThird = true
dispatch_async(q, {
q.async {
subscriber.putNext(3)
subscriber.putCompletion()
})
}
return ActionDisposable {
disposedThree = true
}
@@ -512,7 +512,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
let signal = singleSignalInt(one) |> then(singleSignalInt(two)) |> then(singleSignalInt(three)) |> queue
signal.start(next: { next in
let _ = signal.start(next: { next in
result.append(next)
}, completed: {
completedAll = true
@@ -529,7 +529,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
}
func testQueueDisposed() {
let q = dispatch_queue_create(nil, DISPATCH_QUEUE_SERIAL)
let q = DispatchQueue(label: "")
var disposedOne = false
var disposedTwo = false
@@ -542,13 +542,13 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
let one = Signal<Int, Void> { subscriber in
startedFirst = true
var cancelled = false
dispatch_async(q, {
q.async {
if !cancelled {
usleep(100 * 1000)
subscriber.putNext(1)
subscriber.putCompletion()
}
})
}
return ActionDisposable {
cancelled = true
disposedOne = true
@@ -558,13 +558,13 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
let two = Signal<Int, Void> { subscriber in
startedSecond = true
var cancelled = false
dispatch_async(q, {
q.async {
if !cancelled {
usleep(100 * 1000)
subscriber.putNext(2)
subscriber.putError(Void())
}
})
}
return ActionDisposable {
cancelled = true
disposedTwo = true
@@ -574,13 +574,13 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
let three = Signal<Int, Void> { subscriber in
startedThird = true
var cancelled = false
dispatch_async(q, {
q.async {
if !cancelled {
usleep(100 * 1000)
subscriber.putNext(3)
subscriber.putCompletion()
}
})
}
return ActionDisposable {
cancelled = true
disposedThree = true
@@ -605,18 +605,18 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
}
func testRestart() {
let q = dispatch_queue_create(nil, DISPATCH_QUEUE_CONCURRENT)
let q = DispatchQueue(label: "", attributes: [.concurrent])
let signal = Signal<Int, Void> { subscriber in
dispatch_async(q, {
q.async {
subscriber.putNext(1)
subscriber.putCompletion()
})
}
return EmptyDisposable
}
var result = 0
(signal |> restart |> take(3)).start(next: { next in
let _ = (signal |> restart |> take(3)).start(next: { next in
result += next
})
@@ -626,7 +626,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
}
func testPipe() {
let pipe = Pipe<Int>()
let pipe = ValuePipe<Int>()
var result1 = 0
let disposable1 = pipe.signal().start(next: { next in
@@ -674,7 +674,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
return complete(Void.self, NoError.self) |> deliverOn(q)
}
queued.start()
let _ = queued.start()
}
func testReduceSignal() {
@@ -698,7 +698,7 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
})
var values: [Int] = []
reduced.start(next: { next in
let _ = reduced.start(next: { next in
values.append(next)
})
@@ -718,4 +718,16 @@ class SwiftSignalKitFunctionsTests: XCTestCase {
XCTAssert(values[i] == value, "at \(i): \(values[i]) != \(value)")
}
}
func testMainQueueReentrant() {
let q = Queue.mainQueue()
var a = 1
q.async {
usleep(150 * 1000)
a = 2
}
XCTAssert(a == 2)
}
}