no message

This commit is contained in:
Peter
2017-02-19 23:20:10 +03:00
parent 00ad214144
commit b076cf4dd1
29 changed files with 1110 additions and 257 deletions

View File

@@ -14,7 +14,7 @@ private enum AccountStateManagerOperation {
case collectUpdateGroups([UpdateGroup], Double)
case processUpdateGroups([UpdateGroup])
case custom(Int32, Signal<Void, NoError>)
case pollCompletion(Int32, [(Int32, () -> Void)])
case pollCompletion(Int32, [MessageId], [(Int32, ([MessageId]) -> Void)])
case processEvents(Int32, AccountFinalStateEvents)
}
@@ -161,11 +161,13 @@ public final class AccountStateManager {
}
private func replaceOperations(with operation: AccountStateManagerOperation) {
var collectedPollCompletionSubscribers: [(Int32, () -> Void)] = []
var collectedMessageIds: [MessageId] = []
var collectedPollCompletionSubscribers: [(Int32, ([MessageId]) -> Void)] = []
if !self.operations.isEmpty {
for operation in self.operations {
if case let .pollCompletion(_, subscribers) = operation {
if case let .pollCompletion(_, messageIds, subscribers) = operation {
collectedMessageIds.append(contentsOf: messageIds)
collectedPollCompletionSubscribers.append(contentsOf: subscribers)
}
}
@@ -173,8 +175,9 @@ public final class AccountStateManager {
self.operations.removeAll()
self.operations.append(operation)
for (id, f) in collectedPollCompletionSubscribers {
let _ = self.addPollCompletion(f, id: id)
if !collectedPollCompletionSubscribers.isEmpty || !collectedMessageIds.isEmpty {
self.operations.append(.pollCompletion(self.getNextId(), collectedMessageIds, collectedPollCompletionSubscribers))
}
}
@@ -215,7 +218,7 @@ public final class AccountStateManager {
return initialStateWithDifference(account, difference: difference)
|> mapToSignal { state -> Signal<(Api.updates.Difference?, AccountReplayedFinalState?), NoError> in
if state.initialState.state != authorizedState {
trace("State", what: "pollDifference initial state \(authorizedState) != current state \(state.initialState.state)")
Logger.shared.log("State", "pollDifference initial state \(authorizedState) != current state \(state.initialState.state)")
return .single((nil, nil))
} else {
return finalStateWithDifference(account: account, state: state, difference: difference)
@@ -286,7 +289,7 @@ public final class AccountStateManager {
}
}, error: { _ in
assertionFailure()
trace("AccountStateManager", what: "processUpdateGroups signal completed with error")
Logger.shared.log("AccountStateManager", "processUpdateGroups signal completed with error")
})
case let .collectUpdateGroups(_, timeout):
self.operationTimer?.invalidate()
@@ -296,7 +299,7 @@ public final class AccountStateManager {
if timeout.isEqual(to: 0.0) {
strongSelf.operations[0] = .processUpdateGroups(groups)
} else {
trace("AccountStateManager", what: "timeout while waiting for updates")
Logger.shared.log("AccountStateManager", "timeout while waiting for updates")
strongSelf.replaceOperations(with: .pollDifference(AccountFinalStateEvents()))
}
strongSelf.startFirstOperation()
@@ -350,7 +353,7 @@ public final class AccountStateManager {
}
}, error: { _ in
assertionFailure()
trace("AccountStateManager", what: "processUpdateGroups signal completed with error")
Logger.shared.log("AccountStateManager", "processUpdateGroups signal completed with error")
})
case let .custom(operationId, signal):
self.operationTimer?.invalidate()
@@ -388,6 +391,16 @@ public final class AccountStateManager {
}
}
strongSelf.operations.removeFirst()
var pollCount = 0
for i in 0 ..< strongSelf.operations.count {
if case let .pollCompletion(pollId, messageIds, subscribers) = strongSelf.operations[i] {
pollCount += 1
var updatedMessageIds = messageIds
updatedMessageIds.append(contentsOf: events.addedIncomingMessageIds)
strongSelf.operations[i] = .pollCompletion(pollId, updatedMessageIds, subscribers)
}
}
assert(pollCount <= 1)
strongSelf.startFirstOperation()
} else {
assertionFailure()
@@ -396,45 +409,11 @@ public final class AccountStateManager {
}
let signal = self.account.postbox.modify { modifier -> [Message] in
let timestamp = Int32(self.account.network.context.globalTime())
var messages: [Message] = []
for id in events.addedIncomingMessageIds {
var notify = true
if let notificationSettings = modifier.getPeerNotificationSettings(id.peerId) as? TelegramPeerNotificationSettings {
switch notificationSettings.muteState {
case let .muted(until):
if until >= timestamp {
notify = false
}
case .unmuted:
break
}
} else {
trace("AccountStateManager", what: "notification settings for \(id.peerId) are undefined")
}
if notify {
if let message = modifier.getMessage(id) {
var foundReadState = false
var isUnread = true
if let readState = modifier.getCombinedPeerReadState(id.peerId) {
if readState.isIncomingMessageIndexRead(MessageIndex(message)) {
isUnread = false
}
foundReadState = true
}
if !foundReadState {
trace("AccountStateManager", what: "read state for \(id.peerId) is undefined")
}
if isUnread {
messages.append(message)
}
} else {
trace("AccountStateManager", what: "notification message doesn't exist")
}
let (message, notify) = messageForNotification(modifier: modifier, id: id, alwaysReturnMessage: false)
if let message = message, notify {
messages.append(message)
}
}
return messages
@@ -443,7 +422,7 @@ public final class AccountStateManager {
let _ = (signal |> deliverOn(self.queue)).start(next: { [weak self] messages in
if let strongSelf = self {
for message in messages {
print("notify: \(String(describing: messageMainPeer(message)?.displayTitle)): \(message.id)")
Logger.shared.log("State" , "notify: \(String(describing: messageMainPeer(message)?.displayTitle)): \(message.id)")
}
strongSelf.notificationMessagesPipe.putNext(messages)
@@ -453,12 +432,10 @@ public final class AccountStateManager {
}, completed: {
completed()
})
case let .pollCompletion(pollId, preSubscribers):
case let .pollCompletion(pollId, preMessageIds, preSubscribers):
if self.operations.count > 1 {
self.operations.removeFirst()
for (id, f) in preSubscribers {
let _ = self.addPollCompletion(f, id: id)
}
self.postponePollCompletionOperation(messageIds: preMessageIds, subscribers: preSubscribers)
self.startFirstOperation()
} else {
self.operationTimer?.invalidate()
@@ -466,18 +443,16 @@ public final class AccountStateManager {
|> deliverOn(self.queue)
let completed: () -> Void = { [weak self] in
if let strongSelf = self {
if let topOperation = strongSelf.operations.first, case let .pollCompletion(topPollId, subscribers) = topOperation {
if let topOperation = strongSelf.operations.first, case let .pollCompletion(topPollId, messageIds, subscribers) = topOperation {
assert(topPollId == pollId)
strongSelf.operations.removeFirst()
if strongSelf.operations.isEmpty {
for (_, f) in subscribers {
f()
f(messageIds)
}
} else {
for (id, f) in subscribers {
let _ = strongSelf.addPollCompletion(f, id: id)
}
strongSelf.postponePollCompletionOperation(messageIds: messageIds, subscribers: subscribers)
}
strongSelf.startFirstOperation()
} else {
@@ -506,29 +481,34 @@ public final class AccountStateManager {
}
}
private func addPollCompletion(_ f: @escaping () -> Void, id: Int32?) -> Int32 {
private func postponePollCompletionOperation(messageIds: [MessageId], subscribers: [(Int32, ([MessageId]) -> Void)]) {
self.operations.append(.pollCompletion(self.getNextId(), messageIds, subscribers))
for i in 0 ..< self.operations.count {
if case .pollCompletion = self.operations[i] {
if i != self.operations.count - 1 {
assertionFailure()
}
}
}
}
private func addPollCompletion(_ f: @escaping ([MessageId]) -> Void) -> Int32 {
assert(self.queue.isCurrent())
let updatedId: Int32
if let id = id {
updatedId = id
} else {
updatedId = self.getNextId()
}
let updatedId: Int32 = self.getNextId()
if !self.operations.isEmpty {
for i in 1 ..< self.operations.count {
if case let .pollCompletion(pollId, subscribers) = self.operations[i] {
var subscribers = subscribers
subscribers.append((updatedId, f))
self.operations[i] = .pollCompletion(pollId, subscribers)
return updatedId
}
for i in 0 ..< self.operations.count {
if case let .pollCompletion(pollId, messageIds, subscribers) = self.operations[i] {
var subscribers = subscribers
subscribers.append((updatedId, f))
self.operations[i] = .pollCompletion(pollId, messageIds, subscribers)
return updatedId
}
}
let beginFirst = self.operations.isEmpty
self.operations.append(.pollCompletion(self.getNextId(), [(updatedId, f)]))
self.operations.append(.pollCompletion(self.getNextId(), [], [(updatedId, f)]))
if beginFirst {
self.startFirstOperation()
}
@@ -538,12 +518,12 @@ public final class AccountStateManager {
private func removePollCompletion(_ id: Int32) {
for i in 0 ..< self.operations.count {
if case let .pollCompletion(pollId, subscribers) = self.operations[i] {
if case let .pollCompletion(pollId, messages, subscribers) = self.operations[i] {
for j in 0 ..< subscribers.count {
if subscribers[j].0 == id {
var subscribers = subscribers
subscribers.remove(at: j)
self.operations[i] = .pollCompletion(pollId, subscribers)
self.operations[i] = .pollCompletion(pollId, messages, subscribers)
break
}
}
@@ -551,14 +531,15 @@ public final class AccountStateManager {
}
}
public func wakeup() -> Signal<Void, NoError> {
public func pollStateUpdateCompletion() -> Signal<[MessageId], NoError> {
return Signal { [weak self] subscriber in
let disposable = MetaDisposable()
if let strongSelf = self {
strongSelf.queue.async {
let id = strongSelf.addPollCompletion({
let id = strongSelf.addPollCompletion({ messageIds in
subscriber.putNext(messageIds)
subscriber.putCompletion()
}, id: nil)
})
disposable.set(ActionDisposable {
if let strongSelf = self {
@@ -573,3 +554,52 @@ public final class AccountStateManager {
}
}
}
public func messageForNotification(modifier: Modifier, id: MessageId, alwaysReturnMessage: Bool) -> (message: Message?, notify: Bool) {
var notify = true
let timestamp = Int32(CFAbsoluteTimeGetCurrent() + NSTimeIntervalSince1970)
if let notificationSettings = modifier.getPeerNotificationSettings(id.peerId) as? TelegramPeerNotificationSettings {
switch notificationSettings.muteState {
case let .muted(until):
if until >= timestamp {
notify = false
}
case .unmuted:
break
}
} else {
Logger.shared.log("AccountStateManager", "notification settings for \(id.peerId) are undefined")
}
if notify {
let message = modifier.getMessage(id)
if let message = message {
var foundReadState = false
var isUnread = true
if let readState = modifier.getCombinedPeerReadState(id.peerId) {
if readState.isIncomingMessageIndexRead(MessageIndex(message)) {
isUnread = false
}
foundReadState = true
}
if !foundReadState {
Logger.shared.log("AccountStateManager", "read state for \(id.peerId) is undefined")
}
return (message, isUnread)
} else {
Logger.shared.log("AccountStateManager", "notification message doesn't exist")
return (nil, false)
}
} else {
var message: Message?
if alwaysReturnMessage {
message = modifier.getMessage(id)
}
return (message, false)
}
}