no message

This commit is contained in:
Peter
2018-09-05 00:30:59 +03:00
parent d7b5f5c3cc
commit d41418ffe3
11 changed files with 413 additions and 179 deletions

View File

@@ -9,7 +9,7 @@ import Foundation
import MtProtoKitDynamic
#endif
private enum AccountStateManagerOperation {
private enum AccountStateManagerOperationContent {
case pollDifference(AccountFinalStateEvents)
case collectUpdateGroups([UpdateGroup], Double)
case processUpdateGroups([UpdateGroup])
@@ -19,6 +19,20 @@ private enum AccountStateManagerOperation {
case replayAsynchronouslyBuiltFinalState(AccountFinalState, () -> Void)
}
private final class AccountStateManagerOperation {
var isRunning: Bool = false
let content: AccountStateManagerOperationContent
init(content: AccountStateManagerOperationContent) {
self.content = content
}
}
private enum AccountStateManagerAddOperationPosition {
case first
case last
}
#if os(macOS)
private typealias SignalKitTimer = SwiftSignalKitMac.Timer
#else
@@ -137,22 +151,17 @@ public final class AccountStateManager {
func addUpdateGroups(_ groups: [UpdateGroup]) {
self.queue.async {
if let last = self.operations.last {
switch last {
switch last.content {
case .pollDifference, .processUpdateGroups, .custom, .pollCompletion, .processEvents, .replayAsynchronouslyBuiltFinalState:
self.operations.append(.collectUpdateGroups(groups, 0.0))
self.addOperation(.collectUpdateGroups(groups, 0.0), position: .last)
case let .collectUpdateGroups(currentGroups, timeout):
if timeout.isEqual(to: 0.0) {
self.operations[self.operations.count - 1] = .collectUpdateGroups(currentGroups + groups, timeout)
} else {
self.operations[self.operations.count - 1] = .processUpdateGroups(currentGroups + groups)
if self.operations.count == 1 {
self.startFirstOperation()
}
}
let operation = AccountStateManagerOperation(content: .collectUpdateGroups(currentGroups + groups, timeout))
operation.isRunning = last.isRunning
self.operations[self.operations.count - 1] = operation
self.startFirstOperation()
}
} else {
self.operations.append(.collectUpdateGroups(groups, 0.0))
self.startFirstOperation()
self.addOperation(.collectUpdateGroups(groups, 0.0), position: .last)
}
}
}
@@ -160,14 +169,10 @@ public final class AccountStateManager {
func addReplayAsynchronouslyBuiltFinalState(_ finalState: AccountFinalState) -> Signal<Bool, NoError> {
return Signal { subscriber in
self.queue.async {
let begin = self.operations.isEmpty
self.operations.append(.replayAsynchronouslyBuiltFinalState(finalState, {
self.addOperation(.replayAsynchronouslyBuiltFinalState(finalState, {
subscriber.putNext(true)
subscriber.putCompletion()
}))
if begin {
self.startFirstOperation()
}
}), position: .last)
}
return EmptyDisposable
}
@@ -199,60 +204,78 @@ public final class AccountStateManager {
})
}
self.addOperation(.custom(self.getNextId(), signal))
self.addOperation(.custom(self.getNextId(), signal), position: .last)
return disposable
} |> runOn(self.queue)
}
private func replaceOperations(with operation: AccountStateManagerOperation) {
var collectedProcessUpdateGroups: [(AccountStateManagerOperation, Bool)] = []
private func replaceOperations(with content: AccountStateManagerOperationContent) {
var collectedProcessUpdateGroups: [AccountStateManagerOperationContent] = []
var collectedMessageIds: [MessageId] = []
var collectedPollCompletionSubscribers: [(Int32, ([MessageId]) -> Void)] = []
var collectedReplayAsynchronouslyBuiltFinalState: [(AccountFinalState, () -> Void)] = []
var processEvents: [(Int32, AccountFinalStateEvents)] = []
var replacedOperations: [AccountStateManagerOperation] = []
for i in 0 ..< self.operations.count {
switch self.operations[i] {
case .processUpdateGroups:
collectedProcessUpdateGroups.append((self.operations[i], i == 0))
case let .pollCompletion(_, messageIds, subscribers):
collectedMessageIds.append(contentsOf: messageIds)
collectedPollCompletionSubscribers.append(contentsOf: subscribers)
case let .replayAsynchronouslyBuiltFinalState(finalState, completion):
collectedReplayAsynchronouslyBuiltFinalState.append((finalState, completion))
case let .processEvents(operationId, events):
processEvents.append((operationId, events))
default:
break
if self.operations[i].isRunning {
replacedOperations.append(self.operations[i])
} else {
switch self.operations[i].content {
case .processUpdateGroups:
collectedProcessUpdateGroups.append(self.operations[i].content)
case let .pollCompletion(_, messageIds, subscribers):
collectedMessageIds.append(contentsOf: messageIds)
collectedPollCompletionSubscribers.append(contentsOf: subscribers)
case let .replayAsynchronouslyBuiltFinalState(finalState, completion):
collectedReplayAsynchronouslyBuiltFinalState.append((finalState, completion))
case let .processEvents(operationId, events):
processEvents.append((operationId, events))
default:
break
}
}
}
self.operations.removeAll()
replacedOperations.append(contentsOf: collectedProcessUpdateGroups.map { AccountStateManagerOperation(content: $0) })
self.operations.append(contentsOf: collectedProcessUpdateGroups.map { $0.0 })
self.operations.append(operation)
replacedOperations.append(AccountStateManagerOperation(content: content))
if !collectedPollCompletionSubscribers.isEmpty || !collectedMessageIds.isEmpty {
self.operations.append(.pollCompletion(self.getNextId(), collectedMessageIds, collectedPollCompletionSubscribers))
replacedOperations.append(AccountStateManagerOperation(content: .pollCompletion(self.getNextId(), collectedMessageIds, collectedPollCompletionSubscribers)))
}
for (finalState, completion) in collectedReplayAsynchronouslyBuiltFinalState {
self.operations.append(.replayAsynchronouslyBuiltFinalState(finalState, completion))
replacedOperations.append(AccountStateManagerOperation(content: .replayAsynchronouslyBuiltFinalState(finalState, completion)))
}
for (operationId, events) in processEvents {
self.operations.append(.processEvents(operationId, events))
replacedOperations.append(AccountStateManagerOperation(content: .processEvents(operationId, events)))
}
self.operations.removeAll()
self.operations.append(contentsOf: replacedOperations)
}
private func addOperation(_ operation: AccountStateManagerOperation) {
private func addOperation(_ content: AccountStateManagerOperationContent, position: AccountStateManagerAddOperationPosition) {
self.queue.async {
let begin = self.operations.isEmpty
self.operations.append(operation)
if begin {
self.startFirstOperation()
let operation = AccountStateManagerOperation(content: content)
switch position {
case .first:
if self.operations.isEmpty || !self.operations[0].isRunning {
self.operations.insert(operation, at: 0)
self.startFirstOperation()
} else {
self.operations.insert(operation, at: 1)
}
case .last:
let begin = self.operations.isEmpty
self.operations.append(operation)
if begin {
self.startFirstOperation()
}
}
}
}
@@ -261,7 +284,11 @@ public final class AccountStateManager {
guard let operation = self.operations.first else {
return
}
switch operation {
guard !operation.isRunning else {
return
}
operation.isRunning = true
switch operation.content {
case let .pollDifference(currentEvents):
self.operationTimer?.invalidate()
self.currentIsUpdatingValue = true
@@ -270,71 +297,73 @@ public final class AccountStateManager {
let accountPeerId = account.peerId
let auxiliaryMethods = self.auxiliaryMethods
let signal = account.postbox.stateView()
|> mapToSignal { view -> Signal<AuthorizedAccountState, NoError> in
if let state = view.state as? AuthorizedAccountState {
return .single(state)
} else {
return .complete()
}
|> mapToSignal { view -> Signal<AuthorizedAccountState, NoError> in
if let state = view.state as? AuthorizedAccountState {
return .single(state)
} else {
return .complete()
}
|> take(1)
|> mapToSignal { state -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in
if let authorizedState = state.state {
let request = account.network.request(Api.functions.updates.getDifference(flags: 1 << 0, pts: authorizedState.pts, ptsTotalLimit: 1000, date: authorizedState.date, qts: authorizedState.qts))
|> retryRequest
return request |> mapToSignal { difference -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in
switch difference {
case .differenceTooLong:
return accountStateReset(postbox: account.postbox, network: account.network) |> mapToSignal { _ -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in
return .complete()
} |> then(.single((nil, nil)))
default:
return initialStateWithDifference(account, difference: difference)
|> mapToSignal { state -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in
if state.initialState.state != authorizedState {
Logger.shared.log("State", "pollDifference initial state \(authorizedState) != current state \(state.initialState.state)")
return .single((nil, nil))
} else {
return finalStateWithDifference(account: account, state: state, difference: difference)
|> mapToSignal { finalState -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in
if !finalState.state.preCachedResources.isEmpty {
for (resource, data) in finalState.state.preCachedResources {
account.postbox.mediaBox.storeResourceData(resource.id, data: data)
}
}
return account.postbox.transaction { transaction -> (Api.updates.Difference?, AccountReplayedFinalState?) in
if let replayedState = replayFinalState(accountPeerId: accountPeerId, mediaBox: mediaBox, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState) {
return (difference, replayedState)
} else {
return (nil, nil)
}
}
}
|> take(1)
|> mapToSignal { state -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in
if let authorizedState = state.state {
let request = account.network.request(Api.functions.updates.getDifference(flags: 1 << 0, pts: authorizedState.pts, ptsTotalLimit: 1000, date: authorizedState.date, qts: authorizedState.qts))
|> retryRequest
return request
|> mapToSignal { difference -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in
switch difference {
case .differenceTooLong:
return accountStateReset(postbox: account.postbox, network: account.network) |> mapToSignal { _ -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in
return .complete()
} |> then(.single((nil, nil)))
default:
return initialStateWithDifference(account, difference: difference)
|> mapToSignal { state -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in
if state.initialState.state != authorizedState {
Logger.shared.log("State", "pollDifference initial state \(authorizedState) != current state \(state.initialState.state)")
return .single((nil, nil))
} else {
return finalStateWithDifference(account: account, state: state, difference: difference)
|> mapToSignal { finalState -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in
if !finalState.state.preCachedResources.isEmpty {
for (resource, data) in finalState.state.preCachedResources {
account.postbox.mediaBox.storeResourceData(resource.id, data: data)
}
}
return account.postbox.transaction { transaction -> (Api.updates.Difference?, AccountReplayedFinalState?) in
if let replayedState = replayFinalState(accountPeerId: accountPeerId, mediaBox: mediaBox, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState) {
return (difference, replayedState)
} else {
return (nil, nil)
}
}
}
}
}
}
} else {
let appliedState = account.network.request(Api.functions.updates.getState())
|> retryRequest
|> mapToSignal { state in
return account.postbox.transaction { transaction -> (Api.updates.Difference?, AccountReplayedFinalState?) in
if let currentState = transaction.getState() as? AuthorizedAccountState {
switch state {
case let .state(pts, qts, date, seq, _):
transaction.setState(currentState.changedState(AuthorizedAccountState.State(pts: pts, qts: qts, date: date, seq: seq)))
}
}
return (nil, nil)
}
}
return appliedState
}
} else {
let appliedState = account.network.request(Api.functions.updates.getState())
|> retryRequest
|> mapToSignal { state in
return account.postbox.transaction { transaction -> (Api.updates.Difference?, AccountReplayedFinalState?) in
if let currentState = transaction.getState() as? AuthorizedAccountState {
switch state {
case let .state(pts, qts, date, seq, _):
transaction.setState(currentState.changedState(AuthorizedAccountState.State(pts: pts, qts: qts, date: date, seq: seq)))
}
}
return (nil, nil)
}
}
return appliedState
}
|> deliverOn(self.queue)
}
|> deliverOn(self.queue)
let _ = signal.start(next: { [weak self] difference, finalState in
if let strongSelf = self {
if case .pollDifference = strongSelf.operations.removeFirst() {
if case .pollDifference = strongSelf.operations.removeFirst().content {
let events: AccountFinalStateEvents
if let finalState = finalState {
events = currentEvents.union(with: AccountFinalStateEvents(state: finalState))
@@ -344,7 +373,7 @@ public final class AccountStateManager {
if let difference = difference {
switch difference {
case .differenceSlice:
strongSelf.operations.insert(.pollDifference(events), at: 0)
strongSelf.addOperation(.pollDifference(events), position: .first)
default:
if !events.isEmpty {
strongSelf.insertProcessEvents(events)
@@ -370,9 +399,10 @@ public final class AccountStateManager {
self.operationTimer?.invalidate()
let operationTimer = SignalKitTimer(timeout: timeout, repeat: false, completion: { [weak self] in
if let strongSelf = self {
if let firstOperation = strongSelf.operations.first, case let .collectUpdateGroups(groups, _) = firstOperation {
let firstOperation = strongSelf.operations.removeFirst()
if case let .collectUpdateGroups(groups, _) = firstOperation.content {
if timeout.isEqual(to: 0.0) {
strongSelf.operations[0] = .processUpdateGroups(groups)
strongSelf.addOperation(.processUpdateGroups(groups), position: .first)
} else {
Logger.shared.log("AccountStateManager", "timeout while waiting for updates")
strongSelf.replaceOperations(with: .pollDifference(AccountFinalStateEvents()))
@@ -393,32 +423,32 @@ public final class AccountStateManager {
let mediaBox = account.postbox.mediaBox
let queue = self.queue
let signal = initialStateWithUpdateGroups(account, groups: groups)
|> mapToSignal { state -> Signal<(AccountReplayedFinalState?, AccountFinalState), NoError> in
return finalStateWithUpdateGroups(account, state: state, groups: groups)
|> mapToSignal { finalState in
if !finalState.state.preCachedResources.isEmpty {
for (resource, data) in finalState.state.preCachedResources {
account.postbox.mediaBox.storeResourceData(resource.id, data: data)
}
}
return account.postbox.transaction { transaction -> AccountReplayedFinalState? in
return replayFinalState(accountPeerId: accountPeerId, mediaBox: mediaBox, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState)
}
|> map({ ($0, finalState) })
|> deliverOn(queue)
|> mapToSignal { state -> Signal<(AccountReplayedFinalState?, AccountFinalState), NoError> in
return finalStateWithUpdateGroups(account, state: state, groups: groups)
|> mapToSignal { finalState in
if !finalState.state.preCachedResources.isEmpty {
for (resource, data) in finalState.state.preCachedResources {
account.postbox.mediaBox.storeResourceData(resource.id, data: data)
}
}
return account.postbox.transaction { transaction -> AccountReplayedFinalState? in
return replayFinalState(accountPeerId: accountPeerId, mediaBox: mediaBox, transaction: transaction, auxiliaryMethods: auxiliaryMethods, finalState: finalState)
}
|> map({ ($0, finalState) })
|> deliverOn(queue)
}
}
let _ = signal.start(next: { [weak self] replayedState, finalState in
if let strongSelf = self {
if case let .processUpdateGroups(groups) = strongSelf.operations.removeFirst() {
if case let .processUpdateGroups(groups) = strongSelf.operations.removeFirst().content {
if let replayedState = replayedState, !finalState.shouldPoll {
let events = AccountFinalStateEvents(state: replayedState)
if !events.isEmpty {
strongSelf.insertProcessEvents(events)
}
if finalState.incomplete {
strongSelf.operations.append(.collectUpdateGroups(groups, 2.0))
strongSelf.addOperation(.collectUpdateGroups(groups, 2.0), position: .last)
}
} else {
strongSelf.replaceOperations(with: .pollDifference(AccountFinalStateEvents()))
@@ -436,8 +466,8 @@ public final class AccountStateManager {
self.operationTimer?.invalidate()
let completed: () -> Void = { [weak self] in
if let strongSelf = self {
if let topOperation = strongSelf.operations.first, case .custom(operationId, _) = topOperation {
strongSelf.operations.removeFirst()
let topOperation = strongSelf.operations.removeFirst()
if case .custom(operationId, _) = topOperation.content {
strongSelf.startFirstOperation()
} else {
assertionFailure()
@@ -453,7 +483,8 @@ public final class AccountStateManager {
self.operationTimer?.invalidate()
let completed: () -> Void = { [weak self] in
if let strongSelf = self {
if let topOperation = strongSelf.operations.first, case .processEvents(operationId, _) = topOperation {
let topOperation = strongSelf.operations.removeFirst()
if case .processEvents(operationId, _) = topOperation.content {
if !events.updatedTypingActivities.isEmpty {
strongSelf.peerInputActivityManager.transaction { manager in
for (chatPeerId, peerActivities) in events.updatedTypingActivities {
@@ -475,14 +506,18 @@ public final class AccountStateManager {
strongSelf.account.callSessionManager.updateSession(call)
}
}
strongSelf.operations.removeFirst()
if !events.isContactUpdates.isEmpty {
strongSelf.account.contactSyncManager.addIsContactUpdates(events.isContactUpdates)
}
var pollCount = 0
for i in 0 ..< strongSelf.operations.count {
if case let .pollCompletion(pollId, messageIds, subscribers) = strongSelf.operations[i] {
if case let .pollCompletion(pollId, messageIds, subscribers) = strongSelf.operations[i].content {
pollCount += 1
var updatedMessageIds = messageIds
updatedMessageIds.append(contentsOf: events.addedIncomingMessageIds)
strongSelf.operations[i] = .pollCompletion(pollId, updatedMessageIds, subscribers)
let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, updatedMessageIds, subscribers))
operation.isRunning = strongSelf.operations[i].isRunning
strongSelf.operations[i] = operation
}
}
assert(pollCount <= 1)
@@ -508,7 +543,8 @@ public final class AccountStateManager {
return messages
}
let _ = (signal |> deliverOn(self.queue)).start(next: { [weak self] messages in
let _ = (signal
|> deliverOn(self.queue)).start(next: { [weak self] messages in
if let strongSelf = self {
strongSelf.notificationMessagesPipe.putNext(messages)
}
@@ -529,13 +565,13 @@ public final class AccountStateManager {
} else {
self.operationTimer?.invalidate()
let signal = self.account.network.request(Api.functions.help.test())
|> deliverOn(self.queue)
|> deliverOn(self.queue)
let completed: () -> Void = { [weak self] in
if let strongSelf = self {
if let topOperation = strongSelf.operations.first, case let .pollCompletion(topPollId, messageIds, subscribers) = topOperation {
let topOperation = strongSelf.operations.removeFirst()
if case let .pollCompletion(topPollId, messageIds, subscribers) = topOperation.content {
assert(topPollId == pollId)
strongSelf.operations.removeFirst()
if strongSelf.operations.isEmpty {
for (_, f) in subscribers {
f(messageIds)
@@ -573,7 +609,7 @@ public final class AccountStateManager {
let _ = signal.start(next: { [weak self] replayedState, finalState in
if let strongSelf = self {
if case .replayAsynchronouslyBuiltFinalState = strongSelf.operations.removeFirst() {
if case .replayAsynchronouslyBuiltFinalState = strongSelf.operations.removeFirst().content {
if let replayedState = replayedState {
let events = AccountFinalStateEvents(state: replayedState)
if !events.isEmpty {
@@ -596,21 +632,30 @@ public final class AccountStateManager {
private func insertProcessEvents(_ events: AccountFinalStateEvents) {
if !events.isEmpty {
var index = 0
if !self.operations.isEmpty {
while case .processEvents = self.operations[index] {
index += 1
let operation = AccountStateManagerOperation(content: .processEvents(self.getNextId(), events))
var inserted = false
for i in 0 ..< self.operations.count {
if self.operations[i].isRunning {
continue
}
if case .processEvents = self.operations[i].content {
continue
}
self.operations.insert(operation, at: i)
inserted = true
break
}
if !inserted {
self.operations.append(operation)
}
self.operations.insert(.processEvents(self.getNextId(), events), at: 0)
}
}
private func postponePollCompletionOperation(messageIds: [MessageId], subscribers: [(Int32, ([MessageId]) -> Void)]) {
self.operations.append(.pollCompletion(self.getNextId(), messageIds, subscribers))
self.addOperation(.pollCompletion(self.getNextId(), messageIds, subscribers), position: .last)
for i in 0 ..< self.operations.count {
if case .pollCompletion = self.operations[i] {
if case .pollCompletion = self.operations[i].content {
if i != self.operations.count - 1 {
assertionFailure()
}
@@ -624,31 +669,32 @@ public final class AccountStateManager {
let updatedId: Int32 = self.getNextId()
for i in 0 ..< self.operations.count {
if case let .pollCompletion(pollId, messageIds, subscribers) = self.operations[i] {
if case let .pollCompletion(pollId, messageIds, subscribers) = self.operations[i].content {
var subscribers = subscribers
subscribers.append((updatedId, f))
self.operations[i] = .pollCompletion(pollId, messageIds, subscribers)
let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, messageIds, subscribers))
operation.isRunning = self.operations[i].isRunning
self.operations[i] = operation
return updatedId
}
}
let beginFirst = self.operations.isEmpty
self.operations.append(.pollCompletion(self.getNextId(), [], [(updatedId, f)]))
if beginFirst {
self.startFirstOperation()
}
self.addOperation(.pollCompletion(self.getNextId(), [], [(updatedId, f)]), position: .last)
return updatedId
}
private func removePollCompletion(_ id: Int32) {
for i in 0 ..< self.operations.count {
if case let .pollCompletion(pollId, messages, subscribers) = self.operations[i] {
if case let .pollCompletion(pollId, messages, subscribers) = self.operations[i].content {
for j in 0 ..< subscribers.count {
if subscribers[j].0 == id {
var subscribers = subscribers
subscribers.remove(at: j)
self.operations[i] = .pollCompletion(pollId, messages, subscribers)
let operation = AccountStateManagerOperation(content: .pollCompletion(pollId, messages, subscribers))
operation.isRunning = self.operations[i].isRunning
self.operations[i] = operation
break
}
}