mirror of
https://github.com/Swiftgram/Telegram-iOS.git
synced 2025-10-09 11:23:48 +00:00
Fix read state management
This commit is contained in:
parent
7049801b19
commit
1ed7926d64
@ -78,7 +78,7 @@ public enum PeerReadState: Equatable, CustomStringConvertible {
|
||||
}
|
||||
|
||||
public struct CombinedPeerReadState: Equatable {
|
||||
let states: [(MessageId.Namespace, PeerReadState)]
|
||||
public let states: [(MessageId.Namespace, PeerReadState)]
|
||||
|
||||
public init(states: [(MessageId.Namespace, PeerReadState)]) {
|
||||
self.states = states
|
||||
|
@ -2476,7 +2476,33 @@ func replayFinalState(accountManager: AccountManager, postbox: Postbox, accountP
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var ignore = false
|
||||
if let currentReadState = transaction.getCombinedPeerReadState(peerId) {
|
||||
loop: for (currentNamespace, currentState) in currentReadState.states {
|
||||
if namespace == currentNamespace {
|
||||
switch currentState {
|
||||
case let .idBased(localMaxIncomingReadId, _, _, localCount, localMarkedUnread):
|
||||
if count != 0 || markedUnreadValue {
|
||||
if localMaxIncomingReadId > maxIncomingReadId {
|
||||
transaction.setNeedsIncomingReadStateSynchronization(peerId)
|
||||
|
||||
transaction.resetIncomingReadStates([peerId: [namespace: .idBased(maxIncomingReadId: localMaxIncomingReadId, maxOutgoingReadId: maxOutgoingReadId, maxKnownId: maxKnownId, count: localCount, markedUnread: localMarkedUnread)]])
|
||||
|
||||
Logger.shared.log("State", "not applying incoming read state for \(peerId): \(localMaxIncomingReadId) > \(maxIncomingReadId)")
|
||||
ignore = true
|
||||
}
|
||||
}
|
||||
default:
|
||||
break
|
||||
}
|
||||
break loop
|
||||
}
|
||||
}
|
||||
}
|
||||
if !ignore {
|
||||
transaction.resetIncomingReadStates([peerId: [namespace: .idBased(maxIncomingReadId: maxIncomingReadId, maxOutgoingReadId: maxOutgoingReadId, maxKnownId: maxKnownId, count: count, markedUnread: markedUnreadValue)]])
|
||||
}
|
||||
case let .ResetIncomingReadState(groupId, peerId, namespace, maxIncomingReadId, count, pts):
|
||||
var ptsMatchesState = false
|
||||
if peerId.namespace == Namespaces.Peer.CloudUser || peerId.namespace == Namespaces.Peer.CloudGroup {
|
||||
|
@ -61,24 +61,30 @@ private final class SynchronizePeerReadStatesContextImpl {
|
||||
var maybeOperation: PeerReadStateSynchronizationOperation?
|
||||
if let operation = self.currentState[peerId] {
|
||||
maybeOperation = operation
|
||||
Logger.shared.log("SynchronizePeerReadStates", "\(peerId): take new operation \(operation)")
|
||||
} else if let operation = self.pendingOperations[peerId] {
|
||||
maybeOperation = operation
|
||||
self.pendingOperations.removeValue(forKey: peerId)
|
||||
Logger.shared.log("SynchronizePeerReadStates", "\(peerId): retrieve pending operation \(operation)")
|
||||
}
|
||||
|
||||
if let operation = maybeOperation {
|
||||
if let current = self.activeOperations[peerId] {
|
||||
if current.operation != operation {
|
||||
Logger.shared.log("SynchronizePeerReadStates", "\(peerId): store pending operation \(operation) (active is \(current.operation))")
|
||||
self.pendingOperations[peerId] = operation
|
||||
} else {
|
||||
Logger.shared.log("SynchronizePeerReadStates", "\(peerId): do nothing, no change in \(operation)")
|
||||
}
|
||||
} else {
|
||||
Logger.shared.log("SynchronizePeerReadStates", "\(peerId): begin operation \(operation)")
|
||||
let operationDisposable = MetaDisposable()
|
||||
let activeOperation = Operation(
|
||||
operation: operation,
|
||||
disposable: operationDisposable
|
||||
)
|
||||
self.activeOperations[peerId] = activeOperation
|
||||
let signal: Signal<Never, NoError>
|
||||
let signal: Signal<Never, PeerReadStateValidationError>
|
||||
switch operation {
|
||||
case .Validate:
|
||||
signal = synchronizePeerReadState(network: self.network, postbox: self.postbox, stateManager: self.stateManager, peerId: peerId, push: false, validate: true)
|
||||
@ -88,12 +94,24 @@ private final class SynchronizePeerReadStatesContextImpl {
|
||||
|> ignoreValues
|
||||
}
|
||||
operationDisposable.set((signal
|
||||
|> deliverOn(self.queue)).start(completed: { [weak self, weak activeOperation] in
|
||||
|> deliverOn(self.queue)).start(error: { [weak self, weak activeOperation] _ in
|
||||
guard let strongSelf = self else {
|
||||
return
|
||||
}
|
||||
if let activeOperation = activeOperation {
|
||||
if let current = strongSelf.activeOperations[peerId], current === activeOperation {
|
||||
Logger.shared.log("SynchronizePeerReadStates", "\(peerId): operation retry \(operation)")
|
||||
strongSelf.activeOperations.removeValue(forKey: peerId)
|
||||
strongSelf.update()
|
||||
}
|
||||
}
|
||||
}, completed: { [weak self, weak activeOperation] in
|
||||
guard let strongSelf = self else {
|
||||
return
|
||||
}
|
||||
if let activeOperation = activeOperation {
|
||||
if let current = strongSelf.activeOperations[peerId], current === activeOperation {
|
||||
Logger.shared.log("SynchronizePeerReadStates", "\(peerId): operation completed \(operation)")
|
||||
strongSelf.activeOperations.removeValue(forKey: peerId)
|
||||
strongSelf.update()
|
||||
}
|
||||
|
@ -5,44 +5,41 @@ import SwiftSignalKit
|
||||
|
||||
import SyncCore
|
||||
|
||||
private enum VerifyReadStateError {
|
||||
case Abort
|
||||
case Retry
|
||||
}
|
||||
|
||||
private enum PeerReadStateMarker: Equatable {
|
||||
case Global(Int32)
|
||||
case Channel(Int32)
|
||||
}
|
||||
|
||||
private func inputPeer(postbox: Postbox, peerId: PeerId) -> Signal<Api.InputPeer, VerifyReadStateError> {
|
||||
private func inputPeer(postbox: Postbox, peerId: PeerId) -> Signal<Api.InputPeer, PeerReadStateValidationError> {
|
||||
return postbox.loadedPeerWithId(peerId)
|
||||
|> mapToSignalPromotingError { peer -> Signal<Api.InputPeer, VerifyReadStateError> in
|
||||
|> mapToSignalPromotingError { peer -> Signal<Api.InputPeer, PeerReadStateValidationError> in
|
||||
if let inputPeer = apiInputPeer(peer) {
|
||||
return .single(inputPeer)
|
||||
} else {
|
||||
return .fail(.Abort)
|
||||
return .fail(.retry)
|
||||
}
|
||||
} |> take(1)
|
||||
}
|
||||
|> take(1)
|
||||
}
|
||||
|
||||
private func inputSecretChat(postbox: Postbox, peerId: PeerId) -> Signal<Api.InputEncryptedChat, VerifyReadStateError> {
|
||||
private func inputSecretChat(postbox: Postbox, peerId: PeerId) -> Signal<Api.InputEncryptedChat, PeerReadStateValidationError> {
|
||||
return postbox.loadedPeerWithId(peerId)
|
||||
|> mapToSignalPromotingError { peer -> Signal<Api.InputEncryptedChat, VerifyReadStateError> in
|
||||
|> mapToSignalPromotingError { peer -> Signal<Api.InputEncryptedChat, PeerReadStateValidationError> in
|
||||
if let inputPeer = apiInputSecretChat(peer) {
|
||||
return .single(inputPeer)
|
||||
} else {
|
||||
return .fail(.Abort)
|
||||
return .fail(.retry)
|
||||
}
|
||||
} |> take(1)
|
||||
}
|
||||
|> take(1)
|
||||
}
|
||||
|
||||
private func dialogTopMessage(network: Network, postbox: Postbox, peerId: PeerId) -> Signal<(Int32, Int32), VerifyReadStateError> {
|
||||
private func dialogTopMessage(network: Network, postbox: Postbox, peerId: PeerId) -> Signal<(Int32, Int32), PeerReadStateValidationError> {
|
||||
return inputPeer(postbox: postbox, peerId: peerId)
|
||||
|> mapToSignal { inputPeer -> Signal<(Int32, Int32), VerifyReadStateError> in
|
||||
|> mapToSignal { inputPeer -> Signal<(Int32, Int32), PeerReadStateValidationError> in
|
||||
return network.request(Api.functions.messages.getHistory(peer: inputPeer, offsetId: Int32.max, offsetDate: Int32.max, addOffset: 0, limit: 1, maxId: Int32.max, minId: 1, hash: 0))
|
||||
|> retryRequest
|
||||
|> mapToSignalPromotingError { result -> Signal<(Int32, Int32), VerifyReadStateError> in
|
||||
|> mapToSignalPromotingError { result -> Signal<(Int32, Int32), PeerReadStateValidationError> in
|
||||
let apiMessages: [Api.Message]
|
||||
switch result {
|
||||
case let .channelMessages(_, _, _, messages, _, _):
|
||||
@ -57,7 +54,7 @@ private func dialogTopMessage(network: Network, postbox: Postbox, peerId: PeerId
|
||||
if let message = apiMessages.first, let timestamp = message.timestamp {
|
||||
return .single((message.rawId, timestamp))
|
||||
} else {
|
||||
return .fail(.Abort)
|
||||
return .fail(.retry)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -97,14 +94,14 @@ func fetchPeerCloudReadState(network: Network, postbox: Postbox, peerId: PeerId,
|
||||
}
|
||||
}
|
||||
|
||||
private func dialogReadState(network: Network, postbox: Postbox, peerId: PeerId) -> Signal<(PeerReadState, PeerReadStateMarker), VerifyReadStateError> {
|
||||
private func dialogReadState(network: Network, postbox: Postbox, peerId: PeerId) -> Signal<(PeerReadState, PeerReadStateMarker), PeerReadStateValidationError> {
|
||||
return dialogTopMessage(network: network, postbox: postbox, peerId: peerId)
|
||||
|> mapToSignal { topMessage -> Signal<(PeerReadState, PeerReadStateMarker), VerifyReadStateError> in
|
||||
|> mapToSignal { topMessage -> Signal<(PeerReadState, PeerReadStateMarker), PeerReadStateValidationError> in
|
||||
return inputPeer(postbox: postbox, peerId: peerId)
|
||||
|> mapToSignal { inputPeer -> Signal<(PeerReadState, PeerReadStateMarker), VerifyReadStateError> in
|
||||
|> mapToSignal { inputPeer -> Signal<(PeerReadState, PeerReadStateMarker), PeerReadStateValidationError> in
|
||||
return network.request(Api.functions.messages.getPeerDialogs(peers: [.inputDialogPeer(peer: inputPeer)]))
|
||||
|> retryRequest
|
||||
|> mapToSignalPromotingError { result -> Signal<(PeerReadState, PeerReadStateMarker), VerifyReadStateError> in
|
||||
|> mapToSignalPromotingError { result -> Signal<(PeerReadState, PeerReadStateMarker), PeerReadStateValidationError> in
|
||||
switch result {
|
||||
case let .peerDialogs(dialogs, _, _, _, state):
|
||||
if let dialog = dialogs.filter({ $0.peerId == peerId }).first {
|
||||
@ -126,7 +123,7 @@ private func dialogReadState(network: Network, postbox: Postbox, peerId: PeerId)
|
||||
}
|
||||
case .dialogFolder:
|
||||
assertionFailure()
|
||||
return .fail(.Abort)
|
||||
return .fail(.retry)
|
||||
}
|
||||
|
||||
let marker: PeerReadStateMarker
|
||||
@ -144,7 +141,7 @@ private func dialogReadState(network: Network, postbox: Postbox, peerId: PeerId)
|
||||
|
||||
return .single((.idBased(maxIncomingReadId: apiReadInboxMaxId, maxOutgoingReadId: apiReadOutboxMaxId, maxKnownId: apiTopMessage, count: apiUnreadCount, markedUnread: apiMarkedUnread), marker))
|
||||
} else {
|
||||
return .fail(.Abort)
|
||||
return .fail(.retry)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -152,25 +149,6 @@ private func dialogReadState(network: Network, postbox: Postbox, peerId: PeerId)
|
||||
}
|
||||
}
|
||||
|
||||
private func ==(lhs: PeerReadStateMarker, rhs: PeerReadStateMarker) -> Bool {
|
||||
switch lhs {
|
||||
case let .Global(lhsPts):
|
||||
switch rhs {
|
||||
case let .Global(rhsPts) where lhsPts == rhsPts:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
case let .Channel(lhsPts):
|
||||
switch rhs {
|
||||
case let .Channel(rhsPts) where lhsPts == rhsPts:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func localReadStateMarker(transaction: Transaction, peerId: PeerId) -> PeerReadStateMarker? {
|
||||
if peerId.namespace == Namespaces.Peer.CloudChannel {
|
||||
if let state = transaction.getPeerChatState(peerId) as? ChannelState {
|
||||
@ -187,35 +165,53 @@ private func localReadStateMarker(transaction: Transaction, peerId: PeerId) -> P
|
||||
}
|
||||
}
|
||||
|
||||
private func localReadStateMarker(network: Network, postbox: Postbox, peerId: PeerId) -> Signal<PeerReadStateMarker, VerifyReadStateError> {
|
||||
private func localReadStateMarker(network: Network, postbox: Postbox, peerId: PeerId) -> Signal<PeerReadStateMarker, PeerReadStateValidationError> {
|
||||
return postbox.transaction { transaction -> PeerReadStateMarker? in
|
||||
return localReadStateMarker(transaction: transaction, peerId: peerId)
|
||||
} |> mapToSignalPromotingError { marker -> Signal<PeerReadStateMarker, VerifyReadStateError> in
|
||||
}
|
||||
|> mapToSignalPromotingError { marker -> Signal<PeerReadStateMarker, PeerReadStateValidationError> in
|
||||
if let marker = marker {
|
||||
return .single(marker)
|
||||
} else {
|
||||
return .fail(.Abort)
|
||||
return .fail(.retry)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func validatePeerReadState(network: Network, postbox: Postbox, stateManager: AccountStateManager, peerId: PeerId) -> Signal<Void, NoError> {
|
||||
let readStateWithInitialState = localReadStateMarker(network: network, postbox: postbox, peerId: peerId)
|
||||
|> mapToSignal { marker -> Signal<(PeerReadState, PeerReadStateMarker, PeerReadStateMarker), VerifyReadStateError> in
|
||||
return dialogReadState(network: network, postbox: postbox, peerId: peerId)
|
||||
|> map { ($0.0, marker, $0.1) }
|
||||
enum PeerReadStateValidationError {
|
||||
case retry
|
||||
}
|
||||
|
||||
let maybeAppliedReadState = readStateWithInitialState |> mapToSignal { (readState, initialMarker, finalMarker) -> Signal<Void, VerifyReadStateError> in
|
||||
return stateManager.addCustomOperation(postbox.transaction { transaction -> VerifyReadStateError? in
|
||||
if initialMarker == finalMarker {
|
||||
private func validatePeerReadState(network: Network, postbox: Postbox, stateManager: AccountStateManager, peerId: PeerId) -> Signal<Never, PeerReadStateValidationError> {
|
||||
let readStateWithInitialState = dialogReadState(network: network, postbox: postbox, peerId: peerId)
|
||||
|> map { ($0.0, $0.1) }
|
||||
|
||||
let maybeAppliedReadState = readStateWithInitialState
|
||||
|> mapToSignal { (readState, finalMarker) -> Signal<Never, PeerReadStateValidationError> in
|
||||
return stateManager.addCustomOperation(postbox.transaction { transaction -> PeerReadStateValidationError? in
|
||||
if let currentReadState = transaction.getCombinedPeerReadState(peerId) {
|
||||
loop: for (namespace, currentState) in currentReadState.states {
|
||||
if namespace == Namespaces.Message.Cloud {
|
||||
switch currentState {
|
||||
case let .idBased(localMaxIncomingReadId, _, _, _, _):
|
||||
if case let .idBased(updatedMaxIncomingReadId, _, _, updatedCount, updatedMarkedUnread) = readState {
|
||||
if updatedCount != 0 || updatedMarkedUnread {
|
||||
if localMaxIncomingReadId > updatedMaxIncomingReadId {
|
||||
return .retry
|
||||
}
|
||||
}
|
||||
}
|
||||
default:
|
||||
break
|
||||
}
|
||||
break loop
|
||||
}
|
||||
}
|
||||
}
|
||||
transaction.resetIncomingReadStates([peerId: [Namespaces.Message.Cloud: readState]])
|
||||
return nil
|
||||
} else {
|
||||
return .Retry
|
||||
}
|
||||
}
|
||||
|> mapToSignalPromotingError { error -> Signal<Void, VerifyReadStateError> in
|
||||
|> mapToSignalPromotingError { error -> Signal<Never, PeerReadStateValidationError> in
|
||||
if let error = error {
|
||||
return .fail(error)
|
||||
} else {
|
||||
@ -225,39 +221,30 @@ private func validatePeerReadState(network: Network, postbox: Postbox, stateMana
|
||||
}
|
||||
|
||||
return maybeAppliedReadState
|
||||
|> `catch` { error -> Signal<Void, VerifyReadStateError> in
|
||||
switch error {
|
||||
case .Abort:
|
||||
return .complete()
|
||||
case .Retry:
|
||||
return .fail(error)
|
||||
}
|
||||
}
|
||||
|> retry(0.1, maxDelay: 5.0, onQueue: Queue.concurrentDefaultQueue())
|
||||
}
|
||||
|
||||
private func pushPeerReadState(network: Network, postbox: Postbox, stateManager: AccountStateManager, peerId: PeerId, readState: PeerReadState) -> Signal<PeerReadState, VerifyReadStateError> {
|
||||
private func pushPeerReadState(network: Network, postbox: Postbox, stateManager: AccountStateManager, peerId: PeerId, readState: PeerReadState) -> Signal<PeerReadState, PeerReadStateValidationError> {
|
||||
if !GlobalTelegramCoreConfiguration.readMessages {
|
||||
return .single(readState)
|
||||
}
|
||||
|
||||
if peerId.namespace == Namespaces.Peer.SecretChat {
|
||||
return inputSecretChat(postbox: postbox, peerId: peerId)
|
||||
|> mapToSignal { inputPeer -> Signal<PeerReadState, VerifyReadStateError> in
|
||||
|> mapToSignal { inputPeer -> Signal<PeerReadState, PeerReadStateValidationError> in
|
||||
switch readState {
|
||||
case .idBased:
|
||||
return .single(readState)
|
||||
case let .indexBased(maxIncomingReadIndex, _, _, _):
|
||||
return network.request(Api.functions.messages.readEncryptedHistory(peer: inputPeer, maxDate: maxIncomingReadIndex.timestamp))
|
||||
|> retryRequest
|
||||
|> mapToSignalPromotingError { _ -> Signal<PeerReadState, VerifyReadStateError> in
|
||||
|> mapToSignalPromotingError { _ -> Signal<PeerReadState, PeerReadStateValidationError> in
|
||||
return .single(readState)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return inputPeer(postbox: postbox, peerId: peerId)
|
||||
|> mapToSignal { inputPeer -> Signal<PeerReadState, VerifyReadStateError> in
|
||||
|> mapToSignal { inputPeer -> Signal<PeerReadState, PeerReadStateValidationError> in
|
||||
switch inputPeer {
|
||||
case let .inputPeerChannel(channelId, accessHash):
|
||||
switch readState {
|
||||
@ -280,15 +267,16 @@ private func pushPeerReadState(network: Network, postbox: Postbox, stateManager:
|
||||
})
|
||||
}
|
||||
return pushSignal
|
||||
|> mapError { _ -> VerifyReadStateError in return VerifyReadStateError.Retry }
|
||||
|> mapToSignal { _ -> Signal<PeerReadState, VerifyReadStateError> in
|
||||
|> mapError { _ -> PeerReadStateValidationError in
|
||||
return .retry
|
||||
}
|
||||
|> mapToSignal { _ -> Signal<PeerReadState, PeerReadStateValidationError> in
|
||||
return .complete()
|
||||
}
|
||||
|> then(Signal<PeerReadState, VerifyReadStateError>.single(readState))
|
||||
|> then(Signal<PeerReadState, PeerReadStateValidationError>.single(readState))
|
||||
case .indexBased:
|
||||
return .single(readState)
|
||||
}
|
||||
|
||||
default:
|
||||
switch readState {
|
||||
case let .idBased(maxIncomingReadId, _, _, _, markedUnread):
|
||||
@ -319,11 +307,13 @@ private func pushPeerReadState(network: Network, postbox: Postbox, stateManager:
|
||||
}
|
||||
|
||||
return pushSignal
|
||||
|> mapError { _ -> VerifyReadStateError in return VerifyReadStateError.Retry }
|
||||
|> mapToSignal { _ -> Signal<PeerReadState, VerifyReadStateError> in
|
||||
|> mapError { _ -> PeerReadStateValidationError in
|
||||
return .retry
|
||||
}
|
||||
|> mapToSignal { _ -> Signal<PeerReadState, PeerReadStateValidationError> in
|
||||
return .complete()
|
||||
}
|
||||
|> then(Signal<PeerReadState, VerifyReadStateError>.single(readState))
|
||||
|> then(Signal<PeerReadState, PeerReadStateValidationError>.single(readState))
|
||||
case .indexBased:
|
||||
return .single(readState)
|
||||
}
|
||||
@ -332,7 +322,7 @@ private func pushPeerReadState(network: Network, postbox: Postbox, stateManager:
|
||||
}
|
||||
}
|
||||
|
||||
private func pushPeerReadState(network: Network, postbox: Postbox, stateManager: AccountStateManager, peerId: PeerId) -> Signal<Void, NoError> {
|
||||
private func pushPeerReadState(network: Network, postbox: Postbox, stateManager: AccountStateManager, peerId: PeerId) -> Signal<Never, PeerReadStateValidationError> {
|
||||
let currentReadState = postbox.transaction { transaction -> (MessageId.Namespace, PeerReadState)? in
|
||||
if let readStates = transaction.getPeerReadStates(peerId) {
|
||||
for (namespace, readState) in readStates {
|
||||
@ -345,7 +335,7 @@ private func pushPeerReadState(network: Network, postbox: Postbox, stateManager:
|
||||
}
|
||||
|
||||
let pushedState = currentReadState
|
||||
|> mapToSignalPromotingError { namespaceAndReadState -> Signal<(MessageId.Namespace, PeerReadState), VerifyReadStateError> in
|
||||
|> mapToSignalPromotingError { namespaceAndReadState -> Signal<(MessageId.Namespace, PeerReadState), PeerReadStateValidationError> in
|
||||
if let (namespace, readState) = namespaceAndReadState {
|
||||
return pushPeerReadState(network: network, postbox: postbox, stateManager: stateManager, peerId: peerId, readState: readState)
|
||||
|> map { updatedReadState -> (MessageId.Namespace, PeerReadState) in
|
||||
@ -357,8 +347,8 @@ private func pushPeerReadState(network: Network, postbox: Postbox, stateManager:
|
||||
}
|
||||
|
||||
let verifiedState = pushedState
|
||||
|> mapToSignal { namespaceAndReadState -> Signal<Void, VerifyReadStateError> in
|
||||
return stateManager.addCustomOperation(postbox.transaction { transaction -> VerifyReadStateError? in
|
||||
|> mapToSignal { namespaceAndReadState -> Signal<Never, PeerReadStateValidationError> in
|
||||
return stateManager.addCustomOperation(postbox.transaction { transaction -> PeerReadStateValidationError? in
|
||||
if let readStates = transaction.getPeerReadStates(peerId) {
|
||||
for (namespace, currentReadState) in readStates where namespace == namespaceAndReadState.0 {
|
||||
if currentReadState == namespaceAndReadState.1 {
|
||||
@ -366,13 +356,13 @@ private func pushPeerReadState(network: Network, postbox: Postbox, stateManager:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return .Retry
|
||||
return .retry
|
||||
} else {
|
||||
transaction.confirmSynchronizedIncomingReadState(peerId)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|> mapToSignalPromotingError { error -> Signal<Void, VerifyReadStateError> in
|
||||
|> mapToSignalPromotingError { error -> Signal<Never, PeerReadStateValidationError> in
|
||||
if let error = error {
|
||||
return .fail(error)
|
||||
} else {
|
||||
@ -382,24 +372,17 @@ private func pushPeerReadState(network: Network, postbox: Postbox, stateManager:
|
||||
}
|
||||
|
||||
return verifiedState
|
||||
|> `catch` { error -> Signal<Void, VerifyReadStateError> in
|
||||
switch error {
|
||||
case .Abort:
|
||||
return .complete()
|
||||
case .Retry:
|
||||
return .fail(error)
|
||||
}
|
||||
}
|
||||
|> retry(0.1, maxDelay: 5.0, onQueue: Queue.concurrentDefaultQueue())
|
||||
}
|
||||
|
||||
func synchronizePeerReadState(network: Network, postbox: Postbox, stateManager: AccountStateManager, peerId: PeerId, push: Bool, validate: Bool) -> Signal<Void, NoError> {
|
||||
var signal: Signal<Void, NoError> = .complete()
|
||||
func synchronizePeerReadState(network: Network, postbox: Postbox, stateManager: AccountStateManager, peerId: PeerId, push: Bool, validate: Bool) -> Signal<Never, PeerReadStateValidationError> {
|
||||
var signal: Signal<Never, PeerReadStateValidationError> = .complete()
|
||||
if push {
|
||||
signal = signal |> then(pushPeerReadState(network: network, postbox: postbox, stateManager: stateManager, peerId: peerId))
|
||||
signal = signal
|
||||
|> then(pushPeerReadState(network: network, postbox: postbox, stateManager: stateManager, peerId: peerId))
|
||||
}
|
||||
if validate {
|
||||
signal = signal |> then(validatePeerReadState(network: network, postbox: postbox, stateManager: stateManager, peerId: peerId))
|
||||
signal = signal
|
||||
|> then(validatePeerReadState(network: network, postbox: postbox, stateManager: stateManager, peerId: peerId))
|
||||
}
|
||||
return signal
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user