This commit is contained in:
Ali 2023-09-19 19:05:22 +02:00
parent 08ebdf6f90
commit 75d9f03212
2 changed files with 22 additions and 9 deletions

View File

@ -24,7 +24,11 @@ public func |> <T, U>(value: T, function: ((T) -> U)) -> U {
}
private final class SubscriberDisposable<T, E>: Disposable, CustomStringConvertible {
#if DEBUG
private weak var subscriber: Subscriber<T, E>?
#else
private var subscriber: Subscriber<T, E>?
#endif
private var lock = pthread_mutex_t()
private var disposable: Disposable?

View File

@ -3,10 +3,11 @@ import Foundation
public func delay<T, E>(_ timeout: Double, queue: Queue) -> (_ signal: Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal<T, E> { subscriber in
let disposable = MetaDisposable()
let timerDisposable = MetaDisposable()
let runDisposable = MetaDisposable()
queue.async {
let timer = Timer(timeout: timeout, repeat: false, completion: {
disposable.set(signal.start(next: { next in
runDisposable.set(signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
@ -15,7 +16,7 @@ public func delay<T, E>(_ timeout: Double, queue: Queue) -> (_ signal: Signal<T,
}))
}, queue: queue)
disposable.set(ActionDisposable {
timerDisposable.set(ActionDisposable {
queue.async {
timer.invalidate()
}
@ -23,7 +24,10 @@ public func delay<T, E>(_ timeout: Double, queue: Queue) -> (_ signal: Signal<T,
timer.start()
}
return disposable
return ActionDisposable {
timerDisposable.dispose()
runDisposable.dispose()
}
}
}
}
@ -31,14 +35,16 @@ public func delay<T, E>(_ timeout: Double, queue: Queue) -> (_ signal: Signal<T,
public func suspendAwareDelay<T, E>(_ timeout: Double, granularity: Double = 4.0, queue: Queue) -> (_ signal: Signal<T, E>) -> Signal<T, E> {
return { signal in
return Signal<T, E> { subscriber in
let disposable = MetaDisposable()
let timerDisposable = MetaDisposable()
let runDisposable = MetaDisposable()
queue.async {
let beginTimestamp = CFAbsoluteTimeGetCurrent()
let startFinalTimer: () -> Void = {
let finalTimeout = beginTimestamp + timeout - CFAbsoluteTimeGetCurrent()
let timer = Timer(timeout: max(0.0, finalTimeout), repeat: false, completion: {
disposable.set(signal.start(next: { next in
runDisposable.set(signal.start(next: { next in
subscriber.putNext(next)
}, error: { error in
subscriber.putError(error)
@ -46,7 +52,7 @@ public func suspendAwareDelay<T, E>(_ timeout: Double, granularity: Double = 4.0
subscriber.putCompletion()
}))
}, queue: queue)
disposable.set(ActionDisposable {
timerDisposable.set(ActionDisposable {
queue.async {
timer.invalidate()
}
@ -72,14 +78,17 @@ public func suspendAwareDelay<T, E>(_ timeout: Double, granularity: Double = 4.0
}
}
disposable.set(ActionDisposable {
timerDisposable.set(ActionDisposable {
invalidateImpl?()
})
timer.start()
}
}
return disposable
return ActionDisposable {
timerDisposable.dispose()
runDisposable.dispose()
}
}
}
}