mirror of
https://github.com/Swiftgram/Telegram-iOS.git
synced 2025-12-22 22:25:57 +00:00
Add 'submodules/SSignalKit/' from commit '359b2ee7c9f20f99f221f78e307369ef5ad0ece2'
git-subtree-dir: submodules/SSignalKit git-subtree-mainline:4459dc5b47git-subtree-split:359b2ee7c9
This commit is contained in:
248
submodules/SSignalKit/SwiftSignalKit/Signal_Meta.swift
Normal file
248
submodules/SSignalKit/SwiftSignalKit/Signal_Meta.swift
Normal file
@@ -0,0 +1,248 @@
|
||||
import Foundation
|
||||
|
||||
private final class SignalQueueState<T, E>: Disposable {
|
||||
var lock = pthread_mutex_t()
|
||||
var executingSignal = false
|
||||
var terminated = false
|
||||
|
||||
var disposable: Disposable = EmptyDisposable
|
||||
let currentDisposable = MetaDisposable()
|
||||
var subscriber: Subscriber<T, E>?
|
||||
|
||||
var queuedSignals: [Signal<T, E>] = []
|
||||
let queueMode: Bool
|
||||
let throttleMode: Bool
|
||||
|
||||
init(subscriber: Subscriber<T, E>, queueMode: Bool, throttleMode: Bool) {
|
||||
pthread_mutex_init(&self.lock, nil)
|
||||
|
||||
self.subscriber = subscriber
|
||||
self.queueMode = queueMode
|
||||
self.throttleMode = throttleMode
|
||||
}
|
||||
|
||||
deinit {
|
||||
pthread_mutex_destroy(&self.lock)
|
||||
}
|
||||
|
||||
func beginWithDisposable(_ disposable: Disposable) {
|
||||
self.disposable = disposable
|
||||
}
|
||||
|
||||
func enqueueSignal(_ signal: Signal<T, E>) {
|
||||
var startSignal = false
|
||||
pthread_mutex_lock(&self.lock)
|
||||
if self.queueMode && self.executingSignal {
|
||||
if self.throttleMode {
|
||||
self.queuedSignals.removeAll()
|
||||
}
|
||||
self.queuedSignals.append(signal)
|
||||
} else {
|
||||
self.executingSignal = true
|
||||
startSignal = true
|
||||
}
|
||||
pthread_mutex_unlock(&self.lock)
|
||||
|
||||
if startSignal {
|
||||
let disposable = signal.start(next: { next in
|
||||
assert(self.subscriber != nil)
|
||||
self.subscriber?.putNext(next)
|
||||
}, error: { error in
|
||||
assert(self.subscriber != nil)
|
||||
self.subscriber?.putError(error)
|
||||
}, completed: {
|
||||
self.headCompleted()
|
||||
})
|
||||
self.currentDisposable.set(disposable)
|
||||
}
|
||||
}
|
||||
|
||||
func headCompleted() {
|
||||
while true {
|
||||
let leftFunction = Atomic(value: false)
|
||||
|
||||
var nextSignal: Signal<T, E>! = nil
|
||||
|
||||
var terminated = false
|
||||
pthread_mutex_lock(&self.lock)
|
||||
self.executingSignal = false
|
||||
if self.queueMode {
|
||||
if self.queuedSignals.count != 0 {
|
||||
nextSignal = self.queuedSignals[0]
|
||||
self.queuedSignals.remove(at: 0)
|
||||
self.executingSignal = true
|
||||
} else {
|
||||
terminated = self.terminated
|
||||
}
|
||||
} else {
|
||||
terminated = self.terminated
|
||||
}
|
||||
pthread_mutex_unlock(&self.lock)
|
||||
|
||||
if terminated {
|
||||
self.subscriber?.putCompletion()
|
||||
} else if nextSignal != nil {
|
||||
let disposable = nextSignal.start(next: { next in
|
||||
assert(self.subscriber != nil)
|
||||
self.subscriber?.putNext(next)
|
||||
}, error: { error in
|
||||
assert(self.subscriber != nil)
|
||||
self.subscriber?.putError(error)
|
||||
}, completed: {
|
||||
if leftFunction.swap(true) == true {
|
||||
self.headCompleted()
|
||||
}
|
||||
})
|
||||
|
||||
currentDisposable.set(disposable)
|
||||
}
|
||||
|
||||
if leftFunction.swap(true) == false {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func beginCompletion() {
|
||||
var executingSignal = false
|
||||
pthread_mutex_lock(&self.lock)
|
||||
executingSignal = self.executingSignal
|
||||
self.terminated = true
|
||||
pthread_mutex_unlock(&self.lock)
|
||||
|
||||
if !executingSignal {
|
||||
self.subscriber?.putCompletion()
|
||||
}
|
||||
}
|
||||
|
||||
func dispose() {
|
||||
self.currentDisposable.dispose()
|
||||
self.disposable.dispose()
|
||||
}
|
||||
}
|
||||
|
||||
public func switchToLatest<T, E>(_ signal: Signal<Signal<T, E>, E>) -> Signal<T, E> {
|
||||
return Signal { subscriber in
|
||||
let state = SignalQueueState(subscriber: subscriber, queueMode: false, throttleMode: false)
|
||||
state.beginWithDisposable(signal.start(next: { next in
|
||||
state.enqueueSignal(next)
|
||||
}, error: { error in
|
||||
subscriber.putError(error)
|
||||
}, completed: {
|
||||
state.beginCompletion()
|
||||
}))
|
||||
return state
|
||||
}
|
||||
}
|
||||
|
||||
public func queue<T, E>(_ signal: Signal<Signal<T, E>, E>) -> Signal<T, E> {
|
||||
return Signal { subscriber in
|
||||
let state = SignalQueueState(subscriber: subscriber, queueMode: true, throttleMode: false)
|
||||
state.beginWithDisposable(signal.start(next: { next in
|
||||
state.enqueueSignal(next)
|
||||
}, error: { error in
|
||||
subscriber.putError(error)
|
||||
}, completed: {
|
||||
state.beginCompletion()
|
||||
}))
|
||||
return state
|
||||
}
|
||||
}
|
||||
|
||||
public func throttled<T, E>(_ signal: Signal<Signal<T, E>, E>) -> Signal<T, E> {
|
||||
return Signal { subscriber in
|
||||
let state = SignalQueueState(subscriber: subscriber, queueMode: true, throttleMode: true)
|
||||
state.beginWithDisposable(signal.start(next: { next in
|
||||
state.enqueueSignal(next)
|
||||
}, error: { error in
|
||||
subscriber.putError(error)
|
||||
}, completed: {
|
||||
state.beginCompletion()
|
||||
}))
|
||||
return state
|
||||
}
|
||||
}
|
||||
|
||||
public func mapToSignal<T, R, E>(_ f: @escaping(T) -> Signal<R, E>) -> (Signal<T, E>) -> Signal<R, E> {
|
||||
return { signal -> Signal<R, E> in
|
||||
return Signal<Signal<R, E>, E> { subscriber in
|
||||
return signal.start(next: { next in
|
||||
subscriber.putNext(f(next))
|
||||
}, error: { error in
|
||||
subscriber.putError(error)
|
||||
}, completed: {
|
||||
subscriber.putCompletion()
|
||||
})
|
||||
} |> switchToLatest
|
||||
}
|
||||
}
|
||||
|
||||
public func ignoreValues<T, E>(_ signal: Signal<T, E>) -> Signal<Never, E> {
|
||||
return Signal { subscriber in
|
||||
return signal.start(error: { error in
|
||||
subscriber.putError(error)
|
||||
}, completed: {
|
||||
subscriber.putCompletion()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
public func mapToSignalPromotingError<T, R, E>(_ f: @escaping(T) -> Signal<R, E>) -> (Signal<T, NoError>) -> Signal<R, E> {
|
||||
return { signal -> Signal<R, E> in
|
||||
return Signal<Signal<R, E>, E> { subscriber in
|
||||
return signal.start(next: { next in
|
||||
subscriber.putNext(f(next))
|
||||
}, completed: {
|
||||
subscriber.putCompletion()
|
||||
})
|
||||
} |> switchToLatest
|
||||
}
|
||||
}
|
||||
|
||||
public func mapToQueue<T, R, E>(_ f: @escaping(T) -> Signal<R, E>) -> (Signal<T, E>) -> Signal<R, E> {
|
||||
return { signal -> Signal<R, E> in
|
||||
return signal |> map { f($0) } |> queue
|
||||
}
|
||||
}
|
||||
|
||||
public func mapToThrottled<T, R, E>(_ f: @escaping(T) -> Signal<R, E>) -> (Signal<T, E>) -> Signal<R, E> {
|
||||
return { signal -> Signal<R, E> in
|
||||
return signal |> map { f($0) } |> throttled
|
||||
}
|
||||
}
|
||||
|
||||
public func then<T, E>(_ nextSignal: Signal<T, E>) -> (Signal<T, E>) -> Signal<T, E> {
|
||||
return { signal -> Signal<T, E> in
|
||||
return Signal<T, E> { subscriber in
|
||||
let disposable = DisposableSet()
|
||||
|
||||
disposable.add(signal.start(next: { next in
|
||||
subscriber.putNext(next)
|
||||
}, error: { error in
|
||||
subscriber.putError(error)
|
||||
}, completed: {
|
||||
disposable.add(nextSignal.start(next: { next in
|
||||
subscriber.putNext(next)
|
||||
}, error: { error in
|
||||
subscriber.putError(error)
|
||||
}, completed: {
|
||||
subscriber.putCompletion()
|
||||
}))
|
||||
}))
|
||||
|
||||
return disposable
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public func deferred<T, E>(_ generator: @escaping() -> Signal<T, E>) -> Signal<T, E> {
|
||||
return Signal { subscriber in
|
||||
return generator().start(next: { next in
|
||||
subscriber.putNext(next)
|
||||
}, error: { error in
|
||||
subscriber.putError(error)
|
||||
}, completed: {
|
||||
subscriber.putCompletion()
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user