Storage management improvements

This commit is contained in:
Ali
2022-12-30 20:03:35 +04:00
parent 3566377d68
commit bf5382c9b4
16 changed files with 383 additions and 15 deletions

View File

@@ -158,6 +158,9 @@ public final class StorageBox {
let contentTypeStatsTable: ValueBoxTable
let metadataTable: ValueBoxTable
let totalSizeSubscribers = Bag<(Int64) -> Void>()
private var totalSize: Int64 = 0
init(queue: Queue, logger: StorageBox.Logger, basePath: String) {
self.queue = queue
self.logger = logger
@@ -183,6 +186,8 @@ public final class StorageBox {
self.metadataTable = ValueBoxTable(id: 21, keyType: .binary, compactValuesOnCreation: true)
self.performUpdatesIfNeeded()
self.updateTotalSize()
}
private func performUpdatesIfNeeded() {
@@ -230,6 +235,35 @@ public final class StorageBox {
})
}
func updateTotalSize() {
self.valueBox.begin()
var totalSize: Int64 = 0
self.valueBox.scan(self.contentTypeStatsTable, values: { key, value in
var size: Int64 = 0
value.read(&size, offset: 0, length: 8)
totalSize += size
return true
})
self.valueBox.commit()
if self.totalSize != totalSize {
self.totalSize = totalSize
for f in self.totalSizeSubscribers.copyItems() {
f(totalSize)
}
}
}
func incrementalUpdateTotalSize() {
for f in self.totalSizeSubscribers.copyItems() {
f(totalSize)
}
}
func reset() {
self.valueBox.begin()
@@ -241,6 +275,8 @@ public final class StorageBox {
self.valueBox.removeAllFromTable(self.metadataTable)
self.valueBox.commit()
self.updateTotalSize()
}
private func internalAddSize(contentType: UInt8, delta: Int64) {
@@ -260,6 +296,8 @@ public final class StorageBox {
}
self.valueBox.set(self.contentTypeStatsTable, key: key, value: MemoryBuffer(memory: &currentSize, capacity: 8, length: 8, freeWhenDone: false))
self.totalSize += delta
}
private func internalAddSize(peerId: Int64, contentType: UInt8, delta: Int64) {
@@ -390,6 +428,8 @@ public final class StorageBox {
}
self.valueBox.commit()
self.incrementalUpdateTotalSize()
}
private func peerIdsReferencing(hashId: HashId) -> Set<Int64> {
@@ -435,6 +475,8 @@ public final class StorageBox {
}
self.valueBox.commit()
self.incrementalUpdateTotalSize()
}
func addEmptyReferencesIfNotReferenced(ids: [(id: Data, size: Int64)], contentType: UInt8) -> Int {
@@ -508,6 +550,8 @@ public final class StorageBox {
}
self.valueBox.commit()
self.incrementalUpdateTotalSize()
}
func allPeerIds() -> [PeerId] {
@@ -615,6 +659,22 @@ public final class StorageBox {
return (ids, nextId)
}
func subscribeTotalSize(next: @escaping (Int64) -> Void) -> Disposable {
let index = self.totalSizeSubscribers.add(next)
next(self.totalSize)
let queue = self.queue
return ActionDisposable { [weak self] in
queue.async {
guard let self else {
return
}
self.totalSizeSubscribers.remove(index)
}
}
}
func all() -> [Entry] {
var result: [Entry] = []
@@ -825,6 +885,8 @@ public final class StorageBox {
self.valueBox.commit()
self.incrementalUpdateTotalSize()
return Array(resultIds)
}
@@ -854,6 +916,8 @@ public final class StorageBox {
self.valueBox.commit()
self.incrementalUpdateTotalSize()
return Array(resultIds)
}
}
@@ -973,4 +1037,12 @@ public final class StorageBox {
return EmptyDisposable
}
}
public func totalSize() -> Signal<Int64, NoError> {
return self.impl.signalWith { impl, subscriber in
return impl.subscribeTotalSize(next: { value in
subscriber.putNext(value)
})
}
}
}