Download manager improvements

This commit is contained in:
Ali
2022-02-18 20:12:38 +04:00
parent 629174d261
commit ac8e9a0b9c
50 changed files with 942 additions and 253 deletions

View File

@@ -91,6 +91,7 @@ private final class FetchManagerStatusContext {
var subscribers = Bag<(MediaResourceStatus) -> Void>()
var hasEntry = false
var isPaused = false
var isEmpty: Bool {
return !self.hasEntry && self.subscribers.isEmpty
@@ -98,14 +99,18 @@ private final class FetchManagerStatusContext {
var combinedStatus: MediaResourceStatus? {
if let originalStatus = self.originalStatus {
if originalStatus == .Remote && self.hasEntry {
return .Fetching(isActive: false, progress: 0.0)
if case let .Remote(progress) = originalStatus, self.hasEntry {
if self.isPaused {
return .Paused(progress: progress)
} else {
return .Fetching(isActive: false, progress: progress)
}
} else if self.hasEntry {
return originalStatus
} else if case .Local = originalStatus {
return originalStatus
} else {
return .Remote
return .Remote(progress: 0.0)
}
} else {
return nil
@@ -251,48 +256,19 @@ private final class FetchManagerCategoryContext {
}
}
if (previousPriorityKey != nil) != (updatedPriorityKey != nil) {
if let statusContext = self.statusContexts[id] {
let previousStatus = statusContext.combinedStatus
statusContext.hasEntry = self.entries[id] != nil
if let combinedStatus = statusContext.combinedStatus, combinedStatus != previousStatus {
for f in statusContext.subscribers.copyItems() {
f(combinedStatus)
}
if let statusContext = self.statusContexts[id] {
let previousStatus = statusContext.combinedStatus
if let entry = self.entries[id] {
statusContext.hasEntry = true
statusContext.isPaused = entry.isPaused
} else {
statusContext.hasEntry = false
statusContext.isPaused = false
}
if let combinedStatus = statusContext.combinedStatus, combinedStatus != previousStatus {
for f in statusContext.subscribers.copyItems() {
f(combinedStatus)
}
/*var hasForegroundPriorityKey = false
if let updatedPriorityKey = updatedPriorityKey, let topReference = updatedPriorityKey.topReference {
switch topReference {
case .userInitiated:
hasForegroundPriorityKey = true
default:
hasForegroundPriorityKey = false
}
}
if hasForegroundPriorityKey {
if !statusContext.hasEntry {
let previousStatus = statusContext.combinedStatus
statusContext.hasEntry = true
if let combinedStatus = statusContext.combinedStatus, combinedStatus != previousStatus {
for f in statusContext.subscribers.copyItems() {
f(combinedStatus)
}
}
} else {
assertionFailure()
}
} else {
if statusContext.hasEntry {
let previousStatus = statusContext.combinedStatus
statusContext.hasEntry = false
if let combinedStatus = statusContext.combinedStatus, combinedStatus != previousStatus {
for f in statusContext.subscribers.copyItems() {
f(combinedStatus)
}
}
}
}*/
}
}
@@ -303,6 +279,9 @@ private final class FetchManagerCategoryContext {
if self.topEntryIdAndPriority == nil && !self.entries.isEmpty {
var topEntryIdAndPriority: (FetchManagerLocationEntryId, FetchManagerPriorityKey)?
for (id, entry) in self.entries {
if entry.isPaused {
continue
}
if let entryPriorityKey = entry.priorityKey {
if let (_, topKey) = topEntryIdAndPriority {
if entryPriorityKey < topKey {
@@ -409,6 +388,15 @@ private final class FetchManagerCategoryContext {
}
}
func getEntryId(resourceId: String) -> FetchManagerLocationEntryId? {
for (id, _) in self.entries {
if id.resourceId.stringRepresentation == resourceId {
return id
}
}
return nil
}
func cancelEntry(_ entryId: FetchManagerLocationEntryId, isCompleted: Bool) {
var id: FetchManagerLocationEntryId = entryId
if self.entries[id] == nil {
@@ -434,6 +422,7 @@ private final class FetchManagerCategoryContext {
}
let previousStatus = statusContext.combinedStatus
statusContext.hasEntry = false
statusContext.isPaused = false
if let combinedStatus = statusContext.combinedStatus, combinedStatus != previousStatus {
for f in statusContext.subscribers.copyItems() {
f(combinedStatus)
@@ -464,6 +453,87 @@ private final class FetchManagerCategoryContext {
self.activeEntriesUpdated()
}
func toggleEntryPaused(_ entryId: FetchManagerLocationEntryId, isPaused: Bool) -> Bool {
var id: FetchManagerLocationEntryId = entryId
if self.entries[id] == nil {
for (key, _) in self.entries {
if key.resourceId == entryId.resourceId {
id = key
break
}
}
}
if let entry = self.entries[id] {
if entry.isPaused == isPaused {
return entry.isPaused
}
entry.isPaused = !entry.isPaused
if entry.isPaused {
if self.topEntryIdAndPriority?.0 == id {
self.topEntryIdAndPriority = nil
}
if let activeContext = self.activeContexts[id] {
activeContext.disposable?.dispose()
activeContext.disposable = nil
self.activeContexts.removeValue(forKey: id)
}
}
if let statusContext = self.statusContexts[id] {
let previousStatus = statusContext.combinedStatus
statusContext.hasEntry = true
statusContext.isPaused = entry.isPaused
if let combinedStatus = statusContext.combinedStatus, combinedStatus != previousStatus {
for f in statusContext.subscribers.copyItems() {
f(combinedStatus)
}
}
}
let _ = self.maybeFindAndActivateNewTopEntry()
self.activeEntriesUpdated()
return entry.isPaused
} else {
return false
}
}
func raiseEntryPriority(_ entryId: FetchManagerLocationEntryId) {
var id: FetchManagerLocationEntryId = entryId
if self.entries[id] == nil {
for (key, _) in self.entries {
if key.resourceId == entryId.resourceId {
id = key
break
}
}
}
if self.entries[id] == nil {
return
}
var topUserInitiatedPriority: Int32 = 0
for (_, entry) in self.entries {
for index in entry.userInitiatedPriorityIndices {
topUserInitiatedPriority = min(topUserInitiatedPriority, index)
}
}
self.withEntry(id: id, takeNew: nil, { entry in
if entry.userInitiatedPriorityIndices.last == topUserInitiatedPriority {
return
}
entry.userInitiatedPriorityIndices.removeAll()
entry.userInitiatedPriorityIndices.append(topUserInitiatedPriority - 1)
})
}
func withFetchStatusContext(_ id: FetchManagerLocationEntryId, _ f: (FetchManagerStatusContext) -> Void) {
let statusContext: FetchManagerStatusContext
if let current = self.statusContexts[id] {
@@ -471,8 +541,9 @@ private final class FetchManagerCategoryContext {
} else {
statusContext = FetchManagerStatusContext()
self.statusContexts[id] = statusContext
if self.entries[id] != nil {
if let entry = self.entries[id] {
statusContext.hasEntry = true
statusContext.isPaused = entry.isPaused
}
}
@@ -494,6 +565,7 @@ public struct FetchManagerEntrySummary: Equatable {
public let mediaReference: AnyMediaReference?
public let resourceReference: MediaResourceReference
public let priority: FetchManagerPriorityKey
public let isPaused: Bool
}
private extension FetchManagerEntrySummary {
@@ -505,6 +577,7 @@ private extension FetchManagerEntrySummary {
self.mediaReference = entry.mediaReference
self.resourceReference = entry.resourceReference
self.priority = priority
self.isPaused = entry.isPaused
}
}
@@ -610,7 +683,9 @@ public final class FetchManagerImpl: FetchManager {
}
strongSelf.hasUserInitiatedEntriesValue.set(hasActiveUserInitiatedEntries)
strongSelf.entriesSummaryValue.set(activeEntries.sorted(by: { $0.priority < $1.priority }))
let sortedEntries = activeEntries.sorted(by: { $0.priority < $1.priority })
strongSelf.entriesSummaryValue.set(sortedEntries)
}
})
self.categoryContexts[key] = context
@@ -629,6 +704,7 @@ public final class FetchManagerImpl: FetchManager {
if let strongSelf = self {
var assignedEpisode: Int32?
var assignedUserInitiatedIndex: Int32?
let _ = assignedUserInitiatedIndex
var assignedReferenceIndex: Int?
var assignedRangeIndex: Int?
@@ -639,6 +715,8 @@ public final class FetchManagerImpl: FetchManager {
if userInitiated {
entry.userInitiated = true
}
let wasPaused = entry.isPaused
entry.isPaused = false
if let peerType = storeToDownloadsPeerType {
entry.storeToDownloadsPeerType = peerType
}
@@ -647,9 +725,10 @@ public final class FetchManagerImpl: FetchManager {
entry.elevatedPriorityReferenceCount += 1
}
assignedRangeIndex = entry.ranges.add(ranges)
if userInitiated {
if userInitiated && !wasPaused {
let userInitiatedIndex = strongSelf.takeNextUserInitiatedIndex()
assignedUserInitiatedIndex = userInitiatedIndex
entry.userInitiatedPriorityIndices.removeAll()
entry.userInitiatedPriorityIndices.append(userInitiatedIndex)
entry.userInitiatedPriorityIndices.sort()
}
@@ -679,13 +758,13 @@ public final class FetchManagerImpl: FetchManager {
entry.elevatedPriorityReferenceCount -= 1
assert(entry.elevatedPriorityReferenceCount >= 0)
}
if let userInitiatedIndex = assignedUserInitiatedIndex {
/*if let userInitiatedIndex = assignedUserInitiatedIndex {
if let index = entry.userInitiatedPriorityIndices.firstIndex(of: userInitiatedIndex) {
entry.userInitiatedPriorityIndices.remove(at: index)
} else {
assertionFailure()
}
}
}*/
}
})
})
@@ -708,6 +787,48 @@ public final class FetchManagerImpl: FetchManager {
}
}
public func cancelInteractiveFetches(resourceId: String) {
self.queue.async {
var removeCategories: [FetchManagerCategory] = []
for (category, categoryContext) in self.categoryContexts {
if let id = categoryContext.getEntryId(resourceId: resourceId) {
categoryContext.cancelEntry(id, isCompleted: false)
if categoryContext.isEmpty {
removeCategories.append(category)
}
}
}
for category in removeCategories {
self.categoryContexts.removeValue(forKey: category)
}
self.postbox.mediaBox.cancelInteractiveResourceFetch(resourceId: MediaResourceId(resourceId))
}
}
public func toggleInteractiveFetchPaused(resourceId: String, isPaused: Bool) {
self.queue.async {
for (_, categoryContext) in self.categoryContexts {
if let id = categoryContext.getEntryId(resourceId: resourceId) {
if categoryContext.toggleEntryPaused(id, isPaused: isPaused) {
self.postbox.mediaBox.cancelInteractiveResourceFetch(resourceId: MediaResourceId(resourceId))
}
}
}
}
}
public func raisePriority(resourceId: String) {
self.queue.async {
for (_, categoryContext) in self.categoryContexts {
if let id = categoryContext.getEntryId(resourceId: resourceId) {
categoryContext.raiseEntryPriority(id)
}
}
}
}
public func fetchStatus(category: FetchManagerCategory, location: FetchManagerLocation, locationKey: FetchManagerLocationKey, resource: MediaResource) -> Signal<MediaResourceStatus, NoError> {
let queue = self.queue
return Signal { [weak self] subscriber in