diff --git a/submodules/Postbox/Sources/MediaBox.swift b/submodules/Postbox/Sources/MediaBox.swift index ebbd34fe22..312e530b44 100644 --- a/submodules/Postbox/Sources/MediaBox.swift +++ b/submodules/Postbox/Sources/MediaBox.swift @@ -238,6 +238,17 @@ public final class MediaBox { return ResourceStorePaths(partial: "\(self.basePath)/\(fileNameForId(id))_partial", complete: "\(self.basePath)/\(fileNameForId(id))") } + public func fileSizeForId(_ id: MediaResourceId) -> Int64 { + let paths = self.storePathsForId(id) + if let size = fileSize(paths.complete, useTotalFileAllocatedSize: false) { + return size + } else if let size = fileSize(paths.partial, useTotalFileAllocatedSize: true) { + return size + } else { + return 0 + } + } + private func fileNamesForId(_ id: MediaResourceId) -> ResourceStorePaths { return ResourceStorePaths(partial: "\(fileNameForId(id))_partial", complete: "\(fileNameForId(id))") } diff --git a/submodules/Postbox/Sources/MessageHistoryTable.swift b/submodules/Postbox/Sources/MessageHistoryTable.swift index 8a15d94506..d83c62cd9b 100644 --- a/submodules/Postbox/Sources/MessageHistoryTable.swift +++ b/submodules/Postbox/Sources/MessageHistoryTable.swift @@ -3004,6 +3004,64 @@ final class MessageHistoryTable: Table { return (result, mediaRefs, count == 0 ? nil : lastIndex) } + func enumerateMediaMessages(lowerBound: MessageIndex?, upperBound: MessageIndex?, limit: Int) -> (messagesByMediaId: [MediaId: [MessageId]], mediaMap: [MediaId: Media], nextLowerBound: MessageIndex?) { + var messagesByMediaId: [MediaId: [MessageId]] = [:] + var mediaRefs: [MediaId: Media] = [:] + var lastIndex: MessageIndex? + var count = 0 + self.valueBox.range(self.table, start: self.key(lowerBound == nil ? MessageIndex.absoluteLowerBound() : lowerBound!), end: self.key(upperBound == nil ? MessageIndex.absoluteUpperBound() : upperBound!), values: { key, value in + count += 1 + + let entry = self.readIntermediateEntry(key, value: value) + lastIndex = entry.message.index + + let message = entry.message + + if let upperBound = upperBound, message.id.peerId != upperBound.id.peerId { + return true + } + + var parsedMedia: [Media] = [] + + let embeddedMediaData = message.embeddedMediaData.sharedBufferNoCopy() + if embeddedMediaData.length > 4 { + var embeddedMediaCount: Int32 = 0 + embeddedMediaData.read(&embeddedMediaCount, offset: 0, length: 4) + for _ in 0 ..< embeddedMediaCount { + var mediaLength: Int32 = 0 + embeddedMediaData.read(&mediaLength, offset: 0, length: 4) + if let media = PostboxDecoder(buffer: MemoryBuffer(memory: embeddedMediaData.memory + embeddedMediaData.offset, capacity: Int(mediaLength), length: Int(mediaLength), freeWhenDone: false)).decodeRootObject() as? Media { + parsedMedia.append(media) + } + embeddedMediaData.skip(Int(mediaLength)) + } + } + + for mediaId in message.referencedMedia { + if let media = self.messageMediaTable.get(mediaId, embedded: { _, _ in + return nil + })?.1 { + parsedMedia.append(media) + } + } + + for media in parsedMedia { + if let id = media.id { + mediaRefs[id] = media + if let current = messagesByMediaId[id] { + if !current.contains(message.id) { + messagesByMediaId[id]?.append(message.id) + } + } else { + messagesByMediaId[id] = [message.id] + } + } + } + return true + }, limit: limit) + return (messagesByMediaId, mediaRefs, count == 0 ? nil : lastIndex) + } + func fetch(peerId: PeerId, namespace: MessageId.Namespace, tag: MessageTags?, threadId: Int64?, from fromIndex: MessageIndex, includeFrom: Bool, to toIndex: MessageIndex, ignoreMessagesInTimestampRange: ClosedRange?, limit: Int) -> [IntermediateMessage] { precondition(fromIndex.id.peerId == toIndex.id.peerId) precondition(fromIndex.id.namespace == toIndex.id.namespace) diff --git a/submodules/Postbox/Sources/Postbox.swift b/submodules/Postbox/Sources/Postbox.swift index af228d1579..96b8cf988c 100644 --- a/submodules/Postbox/Sources/Postbox.swift +++ b/submodules/Postbox/Sources/Postbox.swift @@ -912,6 +912,15 @@ public final class Transaction { return count } + public func enumerateMediaMessages(lowerBound: MessageIndex?, upperBound: MessageIndex?, limit: Int) -> (messagesByMediaId: [MediaId: [MessageId]], mediaMap: [MediaId: Media], nextLowerBound: MessageIndex?) { + assert(!self.disposed) + if let postbox = self.postbox { + return postbox.messageHistoryTable.enumerateMediaMessages(lowerBound: lowerBound, upperBound: upperBound, limit: limit) + } else { + return ([:], [:], nil) + } + } + public func enumerateMedia(lowerBound: MessageIndex?, upperBound: MessageIndex?, limit: Int) -> ([PeerId: Set], [MediaId: Media], MessageIndex?) { assert(!self.disposed) if let postbox = self.postbox { diff --git a/submodules/Postbox/Sources/StorageBox/StorageBox.swift b/submodules/Postbox/Sources/StorageBox/StorageBox.swift index 8b48307585..7dbbfbffdf 100644 --- a/submodules/Postbox/Sources/StorageBox/StorageBox.swift +++ b/submodules/Postbox/Sources/StorageBox/StorageBox.swift @@ -20,9 +20,19 @@ private func md5Hash(_ data: Data) -> HashId { public final class StorageBox { public final class Stats { - public fileprivate(set) var contentTypes: [UInt8: Int64] + public final class ContentTypeStats { + public fileprivate(set) var size: Int64 + public fileprivate(set) var messages: [MessageId: Int64] + + init(size: Int64, messages: [MessageId: Int64]) { + self.size = size + self.messages = messages + } + } - public init(contentTypes: [UInt8: Int64]) { + public fileprivate(set) var contentTypes: [UInt8: ContentTypeStats] + + public init(contentTypes: [UInt8: ContentTypeStats]) { self.contentTypes = contentTypes } } @@ -261,36 +271,73 @@ public final class StorageBox { self.valueBox.set(self.peerContentTypeStatsTable, key: key, value: MemoryBuffer(memory: ¤tSize, capacity: 8, length: 8, freeWhenDone: false)) } - func add(reference: Reference, to id: Data, contentType: UInt8) { - self.valueBox.begin() - + func internalAdd(reference: Reference, to id: Data, contentType: UInt8, size: Int64?) { let hashId = md5Hash(id) let mainKey = ValueBoxKey(length: 16) mainKey.setData(0, value: hashId.data) var previousContentType: UInt8? - var size: Int64 = 0 + var previousSize: Int64 = 0 if let currentInfoValue = self.valueBox.get(self.hashIdToInfoTable, key: mainKey) { var info = ItemInfo(buffer: currentInfoValue) - if info.contentType != contentType { - previousContentType = info.contentType - } - size = info.size + previousContentType = info.contentType + previousSize = info.size info.contentType = contentType + if let size = size { + info.size = size + } self.valueBox.set(self.hashIdToInfoTable, key: mainKey, value: info.serialize()) } else { - self.valueBox.set(self.hashIdToInfoTable, key: mainKey, value: ItemInfo(id: id, contentType: contentType, size: 0).serialize()) + self.valueBox.set(self.hashIdToInfoTable, key: mainKey, value: ItemInfo(id: id, contentType: contentType, size: size ?? 0).serialize()) } - if let previousContentType = previousContentType, previousContentType != contentType { - if size != 0 { - self.internalAddSize(contentType: previousContentType, delta: -size) - self.internalAddSize(contentType: contentType, delta: size) + let updatedSize = size ?? previousSize + let deltaSize = updatedSize - previousSize + + if let previousContentType = previousContentType { + if previousContentType != contentType { + var referencingPeers = self.peerIdsReferencing(hashId: hashId) - for peerId in self.peerIdsReferencing(hashId: hashId) { - self.internalAddSize(peerId: peerId, contentType: previousContentType, delta: -size) + if previousSize != 0 { + self.internalAddSize(contentType: previousContentType, delta: -previousSize) + + for peerId in referencingPeers { + self.internalAddSize(peerId: peerId, contentType: previousContentType, delta: -previousSize) + } } + + if updatedSize != 0 { + self.internalAddSize(contentType: contentType, delta: updatedSize) + + if !referencingPeers.contains(reference.peerId) { + referencingPeers.insert(reference.peerId) + } + for peerId in referencingPeers { + self.internalAddSize(peerId: peerId, contentType: contentType, delta: updatedSize) + } + } + } else if deltaSize != 0 { + self.internalAddSize(contentType: contentType, delta: deltaSize) + + let referencingPeers = self.peerIdsReferencing(hashId: hashId) + + for peerId in referencingPeers { + self.internalAddSize(peerId: peerId, contentType: previousContentType, delta: deltaSize) + } + if !referencingPeers.contains(reference.peerId) { + self.internalAddSize(peerId: reference.peerId, contentType: previousContentType, delta: updatedSize) + } + } + } else if updatedSize != 0 { + self.internalAddSize(contentType: contentType, delta: updatedSize) + + var referencingPeers = self.peerIdsReferencing(hashId: hashId) + if !referencingPeers.contains(reference.peerId) { + referencingPeers.insert(reference.peerId) + } + for peerId in referencingPeers { + self.internalAddSize(peerId: peerId, contentType: contentType, delta: updatedSize) } } @@ -340,13 +387,21 @@ public final class StorageBox { self.valueBox.set(self.peerIdTable, key: peerIdKey, value: MemoryBuffer(memory: &peerIdCount, capacity: 4, length: 4, freeWhenDone: false)) } } + } + + func add(reference: Reference, to id: Data, contentType: UInt8, size: Int64?) { + self.valueBox.begin() - if let previousContentType = previousContentType, previousContentType != contentType { - if size != 0 { - for peerId in self.peerIdsReferencing(hashId: hashId) { - self.internalAddSize(peerId: peerId, contentType: contentType, delta: size) - } - } + self.internalAdd(reference: reference, to: id, contentType: contentType, size: size) + + self.valueBox.commit() + } + + func batchAdd(items: [(reference: Reference, id: Data, contentType: UInt8, size: Int64)]) { + self.valueBox.begin() + + for (reference, id, contentType, size) in items { + self.internalAdd(reference: reference, to: id, contentType: contentType, size: size) } self.valueBox.commit() @@ -663,7 +718,7 @@ public final class StorageBox { self.valueBox.scan(self.contentTypeStatsTable, values: { key, value in var size: Int64 = 0 value.read(&size, offset: 0, length: 8) - allStats.total.contentTypes[key.getUInt8(0)] = size + allStats.total.contentTypes[key.getUInt8(0)] = Stats.ContentTypeStats(size: size, messages: [:]) return true }) @@ -673,14 +728,51 @@ public final class StorageBox { value.read(&size, offset: 0, length: 8) let peerId = key.getInt64(0) - if peerId != 0 { - assert(true) - } - let contentType = key.getUInt8(0) + let contentType = key.getUInt8(8) if allStats.peers[PeerId(peerId)] == nil { allStats.peers[PeerId(peerId)] = StorageBox.Stats(contentTypes: [:]) } - allStats.peers[PeerId(peerId)]?.contentTypes[contentType] = size + allStats.peers[PeerId(peerId)]?.contentTypes[contentType] = Stats.ContentTypeStats(size: size, messages: [:]) + + return true + }) + + let idKey = ValueBoxKey(length: 16 + 8) + + let mainKey = ValueBoxKey(length: 16) + self.valueBox.scan(self.peerIdToIdTable, keys: { key in + let peerId = key.getInt64(0) + if peerId == 0 { + return true + } + + let hashId = key.getData(8, length: 16) + + mainKey.setData(0, value: hashId) + if let currentInfoValue = self.valueBox.get(self.hashIdToInfoTable, key: mainKey) { + let info = ItemInfo(buffer: currentInfoValue) + if info.size != 0 { + idKey.setData(0, value: hashId) + idKey.setInt64(16, value: peerId) + + let contentType = info.contentType + if contentType == 0 { + return true + } + + self.valueBox.range(self.idToReferenceTable, start: idKey, end: idKey.successor, keys: { subKey in + let messageNamespace: UInt8 = subKey.getUInt8(16 + 8) + let messageId = subKey.getInt32(16 + 8 + 1) + + if messageId != 0 { + allStats.total.contentTypes[contentType]?.messages[MessageId(peerId: PeerId(peerId), namespace: Int32(messageNamespace), id: messageId), default: 0] += info.size + allStats.peers[PeerId(peerId)]?.contentTypes[contentType]?.messages[MessageId(peerId: PeerId(peerId), namespace: Int32(messageNamespace), id: messageId), default: 0] += info.size + } + + return true + }, limit: 0) + } + } return true }) @@ -703,7 +795,7 @@ public final class StorageBox { public func add(reference: Reference, to id: Data, contentType: UInt8) { self.impl.with { impl in - impl.add(reference: reference, to: id, contentType: contentType) + impl.add(reference: reference, to: id, contentType: contentType, size: nil) } } @@ -721,6 +813,12 @@ public final class StorageBox { } } + public func batchAdd(items: [(reference: Reference, id: Data, contentType: UInt8, size: Int64)]) { + self.impl.with { impl in + impl.batchAdd(items: items) + } + } + public func remove(ids: [Data]) { self.impl.with { impl in impl.remove(ids: ids) diff --git a/submodules/TelegramCore/Sources/Network/FetchedMediaResource.swift b/submodules/TelegramCore/Sources/Network/FetchedMediaResource.swift index e08185c6e6..42259626ab 100644 --- a/submodules/TelegramCore/Sources/Network/FetchedMediaResource.swift +++ b/submodules/TelegramCore/Sources/Network/FetchedMediaResource.swift @@ -47,6 +47,7 @@ public extension MediaResourceStorageLocation { case let .message(message, _): if let id = message.id { self.init(peerId: id.peerId, messageId: id) + return } default: break @@ -92,12 +93,14 @@ public func fetchedMediaResource( break } + let location = MediaResourceStorageLocation(userLocation: userLocation, reference: reference) + if let ranges = ranges { let signals = ranges.map { (range, priority) -> Signal in return mediaBox.fetchedResourceData(reference.resource, in: range, priority: priority, parameters: MediaResourceFetchParameters( tag: TelegramMediaResourceFetchTag(statsCategory: statsCategory), info: TelegramCloudMediaResourceFetchInfo(reference: reference, preferBackgroundReferenceRevalidation: preferBackgroundReferenceRevalidation, continueInBackground: continueInBackground), - location: MediaResourceStorageLocation(userLocation: userLocation, reference: reference), + location: location, contentType: userContentType, isRandomAccessAllowed: isRandomAccessAllowed )) @@ -110,7 +113,7 @@ public func fetchedMediaResource( return mediaBox.fetchedResource(reference.resource, parameters: MediaResourceFetchParameters( tag: TelegramMediaResourceFetchTag(statsCategory: statsCategory), info: TelegramCloudMediaResourceFetchInfo(reference: reference, preferBackgroundReferenceRevalidation: preferBackgroundReferenceRevalidation, continueInBackground: continueInBackground), - location: MediaResourceStorageLocation(userLocation: userLocation, reference: reference), + location: location, contentType: userContentType, isRandomAccessAllowed: isRandomAccessAllowed ), implNext: reportResultStatus) diff --git a/submodules/TelegramCore/Sources/TelegramEngine/Resources/CollectCacheUsageStats.swift b/submodules/TelegramCore/Sources/TelegramEngine/Resources/CollectCacheUsageStats.swift index 5a7ed3b93e..789aee73be 100644 --- a/submodules/TelegramCore/Sources/TelegramEngine/Resources/CollectCacheUsageStats.swift +++ b/submodules/TelegramCore/Sources/TelegramEngine/Resources/CollectCacheUsageStats.swift @@ -63,11 +63,13 @@ public final class StorageUsageStats { case misc } - public struct CategoryData: Equatable { + public struct CategoryData { public var size: Int64 + public var messages: [EngineMessage.Id: Int64] - public init(size: Int64) { + public init(size: Int64, messages: [EngineMessage.Id: Int64]) { self.size = size + self.messages = messages } } @@ -119,7 +121,7 @@ private extension StorageUsageStats { default: mappedCategory = .misc } - mappedCategories[mappedCategory] = StorageUsageStats.CategoryData(size: value) + mappedCategories[mappedCategory] = StorageUsageStats.CategoryData(size: value.size, messages: value.messages) } self.init(categories: mappedCategories) @@ -211,7 +213,10 @@ func _internal_collectStorageUsageStats(account: Account) -> Signal AllStorageUsageStats in let total = StorageUsageStats(allStats.total) if additionalStats != 0 { - total.categories[.misc, default: StorageUsageStats.CategoryData(size: 0)].size += additionalStats + if total.categories[.misc] == nil { + total.categories[.misc] = StorageUsageStats.CategoryData(size: 0, messages: [:]) + } + total.categories[.misc]?.size += additionalStats } var peers: [EnginePeer.Id: AllStorageUsageStats.PeerStats] = [:] @@ -222,8 +227,8 @@ func _internal_collectStorageUsageStats(account: Account) -> Signal Signal Signal<[EngineMessage.Id: Message], NoError> { + return account.postbox.transaction { transaction -> [EngineMessage.Id: Message] in + var result: [EngineMessage.Id: Message] = [:] + for (category, value) in stats.categories { + if !categories.contains(category) { + continue + } + for id in value.messages.keys { + if result[id] == nil { + if let message = transaction.getMessage(id) { + result[id] = message + } + } + } + } + + return result + } +} + +func _internal_reindexCacheInBackground(account: Account) -> Signal { + let queue = Queue(name: "ReindexCacheInBackground") + return Signal { subscriber in + let isCancelled = Atomic(value: false) + + func process(lowerBound: MessageIndex?) { + if isCancelled.with({ $0 }) { + return + } + + let _ = (account.postbox.transaction { transaction -> (messagesByMediaId: [MediaId: [MessageId]], mediaMap: [MediaId: Media], nextLowerBound: MessageIndex?) in + return transaction.enumerateMediaMessages(lowerBound: lowerBound, upperBound: nil, limit: 1000) + } + |> deliverOn(queue)).start(next: { result in + Logger.shared.log("ReindexCacheInBackground", "process batch of \(result.mediaMap.count) media") + + var storageItems: [(reference: StorageBox.Reference, id: Data, contentType: UInt8, size: Int64)] = [] + + let mediaBox = account.postbox.mediaBox + + let processResource: ([MessageId], MediaResource, MediaResourceUserContentType) -> Void = { messageIds, resource, contentType in + let size = mediaBox.fileSizeForId(resource.id) + if size != 0 { + if let itemId = resource.id.stringRepresentation.data(using: .utf8) { + for messageId in messageIds { + storageItems.append((reference: StorageBox.Reference(peerId: messageId.peerId.toInt64(), messageNamespace: UInt8(clamping: messageId.namespace), messageId: messageId.id), id: itemId, contentType: contentType.rawValue, size: size)) + } + } + } + } + + for (_, media) in result.mediaMap { + guard let mediaId = media.id else { + continue + } + guard let mediaMessages = result.messagesByMediaId[mediaId] else { + continue + } + + if let image = media as? TelegramMediaImage { + for representation in image.representations { + processResource(mediaMessages, representation.resource, .image) + } + } else if let file = media as? TelegramMediaFile { + for representation in file.previewRepresentations { + processResource(mediaMessages, representation.resource, MediaResourceUserContentType(file: file)) + } + processResource(mediaMessages, file.resource, MediaResourceUserContentType(file: file)) + } else if let webpage = media as? TelegramMediaWebpage { + if case let .Loaded(content) = webpage.content { + if let image = content.image { + for representation in image.representations { + processResource(mediaMessages, representation.resource, .image) + } + } + if let file = content.file { + for representation in file.previewRepresentations { + processResource(mediaMessages, representation.resource, MediaResourceUserContentType(file: file)) + } + processResource(mediaMessages, file.resource, MediaResourceUserContentType(file: file)) + } + } + } else if let game = media as? TelegramMediaGame { + if let image = game.image { + for representation in image.representations { + processResource(mediaMessages, representation.resource, .image) + } + } + if let file = game.file { + for representation in file.previewRepresentations { + processResource(mediaMessages, representation.resource, MediaResourceUserContentType(file: file)) + } + processResource(mediaMessages, file.resource, MediaResourceUserContentType(file: file)) + } + } + } + + if !storageItems.isEmpty { + mediaBox.storageBox.batchAdd(items: storageItems) + } + + if let nextLowerBound = result.nextLowerBound { + process(lowerBound: nextLowerBound) + } else { + subscriber.putCompletion() + } + }) + } + + process(lowerBound: nil) + + return ActionDisposable { + let _ = isCancelled.swap(true) + } + } + |> runOn(queue) +} + func _internal_collectCacheUsageStats(account: Account, peerId: PeerId? = nil, additionalCachePaths: [String] = [], logFilesPath: String? = nil) -> Signal { return account.postbox.mediaBox.storageBox.all() |> mapToSignal { entries -> Signal in diff --git a/submodules/TelegramCore/Sources/TelegramEngine/Resources/TelegramEngineResources.swift b/submodules/TelegramCore/Sources/TelegramEngine/Resources/TelegramEngineResources.swift index 78f946e1f1..b443c2b1d5 100644 --- a/submodules/TelegramCore/Sources/TelegramEngine/Resources/TelegramEngineResources.swift +++ b/submodules/TelegramCore/Sources/TelegramEngine/Resources/TelegramEngineResources.swift @@ -21,7 +21,7 @@ public extension MediaResourceUserContentType { self = .video } } else { - self = .other + self = .file } } } @@ -227,9 +227,17 @@ public extension TelegramEngine { return _internal_collectStorageUsageStats(account: self.account) } + public func renderStorageUsageStatsMessages(stats: StorageUsageStats, categories: [StorageUsageStats.CategoryKey]) -> Signal<[EngineMessage.Id: Message], NoError> { + return _internal_renderStorageUsageStatsMessages(account: self.account, stats: stats, categories: categories) + } + public func clearCachedMediaResources(mediaResourceIds: Set) -> Signal { return _internal_clearCachedMediaResources(account: self.account, mediaResourceIds: mediaResourceIds) } + + public func reindexCacheInBackground() -> Signal { + return _internal_reindexCacheInBackground(account: self.account) + } public func data(id: EngineMediaResource.Id, attemptSynchronously: Bool = false) -> Signal { return self.account.postbox.mediaBox.resourceData(