More memory leak fixes

This commit is contained in:
Ali 2023-09-21 01:14:05 +02:00
parent 6e62eb56b1
commit 823754a081
5 changed files with 113 additions and 38 deletions

View File

@ -1080,7 +1080,7 @@ public final class MediaBox {
|> map(Optional.init)
}
|> deliverOn(self.dataQueue)
context.disposable.set(signal.start(next: { [weak self, weak context] next in
context.disposable.set(signal.startStrict(next: { [weak self, weak context] next in
guard let strongSelf = self else {
return
}
@ -1263,7 +1263,7 @@ public final class MediaBox {
let cacheStorageBox = self.cacheStorageBox
let signal = fetch()
|> deliverOn(self.dataQueue)
context.disposable.set(signal.start(next: { [weak self, weak context] next in
context.disposable.set(signal.startStrict(next: { [weak self, weak context] next in
guard let strongSelf = self else {
return
}
@ -1414,7 +1414,7 @@ public final class MediaBox {
func processStale(nextId: Data?) {
let _ = (storageBox.enumerateItems(startingWith: nextId, limit: 1000)
|> deliverOn(processQueue)).start(next: { ids, realNextId in
|> deliverOn(processQueue)).startStandalone(next: { ids, realNextId in
var staleIds: [Data] = []
for id in ids {
@ -1506,7 +1506,7 @@ public final class MediaBox {
func processStale(nextId: Data?) {
let _ = (storageBox.enumerateItems(startingWith: nextId, limit: 1000)
|> deliverOn(processQueue)).start(next: { ids, realNextId in
|> deliverOn(processQueue)).startStandalone(next: { ids, realNextId in
var staleIds: [Data] = []
for id in ids {

View File

@ -336,7 +336,7 @@ final class MediaBoxFileContextV2Impl: MediaBoxFileContext {
self.hasPerformedAnyFetch = true
let queue = self.queue
disposable.set(fetchImpl(pendingFetch.ranges.get()).start(next: { [weak self] result in
disposable.set(fetchImpl(pendingFetch.ranges.get()).startStrict(next: { [weak self] result in
queue.async {
guard let `self` = self else {
return

View File

@ -70,22 +70,22 @@ public final class Signal<T, E> {
public func start(next: ((T) -> Void)! = nil, error: ((E) -> Void)! = nil, completed: (() -> Void)! = nil) -> Disposable {
let subscriber = Subscriber<T, E>(next: next, error: error, completed: completed)
let disposable = self.generator(subscriber)
subscriber.assignDisposable(disposable)
return SubscriberDisposable(subscriber: subscriber, disposable: disposable)
let wrappedDisposable = subscriber.assignDisposable(disposable)
return SubscriberDisposable(subscriber: subscriber, disposable: wrappedDisposable)
}
public func startStandalone(next: ((T) -> Void)! = nil, error: ((E) -> Void)! = nil, completed: (() -> Void)! = nil) -> Disposable {
let subscriber = Subscriber<T, E>(next: next, error: error, completed: completed)
let disposable = self.generator(subscriber)
subscriber.assignDisposable(disposable)
return SubscriberDisposable(subscriber: subscriber, disposable: disposable)
let wrappedDisposable = subscriber.assignDisposable(disposable)
return SubscriberDisposable(subscriber: subscriber, disposable: wrappedDisposable)
}
public func startStrict(next: ((T) -> Void)! = nil, error: ((E) -> Void)! = nil, completed: (() -> Void)! = nil, file: String = #file, line: Int = #line) -> Disposable {
let subscriber = Subscriber<T, E>(next: next, error: error, completed: completed)
let disposable = self.generator(subscriber)
subscriber.assignDisposable(disposable)
return SubscriberDisposable(subscriber: subscriber, disposable: disposable).strict(file: file, line: line)
let wrappedDisposable = subscriber.assignDisposable(disposable)
return SubscriberDisposable(subscriber: subscriber, disposable: wrappedDisposable).strict(file: file, line: line)
}
public static func single(_ value: T) -> Signal<T, E> {

View File

@ -1,21 +1,54 @@
import Foundation
#if DEBUG
// Signals keep themselves in memory until terminated (dispose, putError, putCompletion)
private final class LiveSubscribers {
var dict: [ObjectIdentifier: AnyObject] = [:]
final class WrappedSubscriberDisposable: Disposable {
private var lock = pthread_mutex_t()
private var disposable: Disposable?
init(_ disposable: Disposable) {
self.disposable = disposable
pthread_mutex_init(&self.lock, nil)
}
deinit {
pthread_mutex_destroy(&self.lock)
}
func dispose() {
var disposableValue: Disposable?
pthread_mutex_lock(&self.lock)
disposableValue = self.disposable
self.disposable = nil
pthread_mutex_unlock(&self.lock)
disposableValue?.dispose()
}
func markTerminated() {
var disposableValue: Disposable?
pthread_mutex_lock(&self.lock)
disposableValue = self.disposable
self.disposable = nil
pthread_mutex_unlock(&self.lock)
if let disposableValue {
withExtendedLifetime(disposableValue, {
})
}
}
}
private let liveSubscribers = Atomic<LiveSubscribers>(value: LiveSubscribers())
#endif
public final class Subscriber<T, E>: CustomStringConvertible {
private var next: ((T) -> Void)!
private var error: ((E) -> Void)!
private var completed: (() -> Void)!
private var keepAliveObjects: [AnyObject]?
private var lock = pthread_mutex_t()
private var terminated = false
internal var disposable: Disposable?
private weak var wrappedDisposable: WrappedSubscriberDisposable?
public init(next: ((T) -> Void)! = nil, error: ((E) -> Void)! = nil, completed: (() -> Void)! = nil) {
self.next = next
@ -30,11 +63,15 @@ public final class Subscriber<T, E>: CustomStringConvertible {
deinit {
var freeDisposable: Disposable?
var keepAliveObjects: [AnyObject]?
pthread_mutex_lock(&self.lock)
if let disposable = self.disposable {
freeDisposable = disposable
self.disposable = nil
}
keepAliveObjects = self.keepAliveObjects
self.keepAliveObjects = nil
pthread_mutex_unlock(&self.lock)
if let freeDisposableValue = freeDisposable {
withExtendedLifetime(freeDisposableValue, {
@ -42,32 +79,41 @@ public final class Subscriber<T, E>: CustomStringConvertible {
freeDisposable = nil
}
if let keepAliveObjects {
withExtendedLifetime(keepAliveObjects, {
})
}
pthread_mutex_destroy(&self.lock)
}
internal func assignDisposable(_ disposable: Disposable) {
#if DEBUG
liveSubscribers.with { impl in
//let _ = impl.dict[ObjectIdentifier(self)] = self
}
#endif
internal func assignDisposable(_ disposable: Disposable) -> Disposable {
var updatedWrappedDisposable: WrappedSubscriberDisposable?
var dispose = false
pthread_mutex_lock(&self.lock)
if self.terminated {
dispose = true
} else {
self.disposable = disposable
updatedWrappedDisposable = WrappedSubscriberDisposable(disposable)
self.wrappedDisposable = updatedWrappedDisposable
}
pthread_mutex_unlock(&self.lock)
if dispose {
disposable.dispose()
}
if let updatedWrappedDisposable {
return updatedWrappedDisposable
} else {
return EmptyDisposable
}
}
internal func markTerminatedWithoutDisposal() {
var freeDisposable: Disposable?
var keepAliveObjects: [AnyObject]?
pthread_mutex_lock(&self.lock)
if !self.terminated {
@ -81,6 +127,10 @@ public final class Subscriber<T, E>: CustomStringConvertible {
self.disposable = nil
}
}
keepAliveObjects = self.keepAliveObjects
self.keepAliveObjects = nil
pthread_mutex_unlock(&self.lock)
if let freeDisposableValue = freeDisposable {
@ -89,11 +139,14 @@ public final class Subscriber<T, E>: CustomStringConvertible {
freeDisposable = nil
}
#if DEBUG
liveSubscribers.with { impl in
let _ = impl.dict.removeValue(forKey: ObjectIdentifier(self))
if let wrappedDisposable = self.wrappedDisposable {
wrappedDisposable.markTerminated()
}
if let keepAliveObjects {
withExtendedLifetime(keepAliveObjects, {
})
}
#endif
}
public func putNext(_ next: T) {
@ -113,6 +166,7 @@ public final class Subscriber<T, E>: CustomStringConvertible {
var action: ((E) -> Void)! = nil
var disposeDisposable: Disposable?
var keepAliveObjects: [AnyObject]?
pthread_mutex_lock(&self.lock)
if !self.terminated {
@ -124,6 +178,8 @@ public final class Subscriber<T, E>: CustomStringConvertible {
disposeDisposable = self.disposable
self.disposable = nil
}
keepAliveObjects = self.keepAliveObjects
self.keepAliveObjects = nil
pthread_mutex_unlock(&self.lock)
if action != nil {
@ -134,17 +190,21 @@ public final class Subscriber<T, E>: CustomStringConvertible {
disposeDisposable.dispose()
}
#if DEBUG
liveSubscribers.with { impl in
let _ = impl.dict.removeValue(forKey: ObjectIdentifier(self))
if let wrappedDisposable = self.wrappedDisposable {
wrappedDisposable.markTerminated()
}
if let keepAliveObjects {
withExtendedLifetime(keepAliveObjects, {
})
}
#endif
}
public func putCompletion() {
var action: (() -> Void)! = nil
var disposeDisposable: Disposable? = nil
var keepAliveObjects: [AnyObject]?
var next: ((T) -> Void)?
var error: ((E) -> Void)?
@ -164,6 +224,8 @@ public final class Subscriber<T, E>: CustomStringConvertible {
disposeDisposable = self.disposable
self.disposable = nil
}
keepAliveObjects = self.keepAliveObjects
self.keepAliveObjects = nil
pthread_mutex_unlock(&self.lock)
if let next = next {
@ -184,10 +246,22 @@ public final class Subscriber<T, E>: CustomStringConvertible {
disposeDisposable.dispose()
}
#if DEBUG
liveSubscribers.with { impl in
let _ = impl.dict.removeValue(forKey: ObjectIdentifier(self))
if let wrappedDisposable = self.wrappedDisposable {
wrappedDisposable.markTerminated()
}
#endif
if let keepAliveObjects {
withExtendedLifetime(keepAliveObjects, {
})
}
}
public func keepAlive(_ object: AnyObject) {
pthread_mutex_lock(&self.lock)
if self.keepAliveObjects == nil {
self.keepAliveObjects = []
}
self.keepAliveObjects?.append(object)
pthread_mutex_unlock(&self.lock)
}
}

View File

@ -56,9 +56,10 @@ public func fetchCachedResourceRepresentation(account: Account, resource: MediaR
break
}
})
subscriber.keepAlive(videoSource)
return ActionDisposable {
// keep the reference
let _ = videoSource.takeFrame(at: 0.0)
disposable.dispose()
}
} else {