diff --git a/TelegramCore/Api.swift b/TelegramCore/Api.swift index fc6a026630..6c5277cd02 100644 --- a/TelegramCore/Api.swift +++ b/TelegramCore/Api.swift @@ -533,6 +533,7 @@ fileprivate let parsers: [Int32 : (BufferReader) -> Any?] = { dict[-1892568281] = { return Api.MessageAction.parse_messageActionPaymentSentMe($0) } dict[1080663248] = { return Api.MessageAction.parse_messageActionPaymentSent($0) } dict[-2132731265] = { return Api.MessageAction.parse_messageActionPhoneCall($0) } + dict[1200788123] = { return Api.MessageAction.parse_messageActionScreenshotTaken($0) } dict[1399245077] = { return Api.PhoneCall.parse_phoneCallEmpty($0) } dict[462375633] = { return Api.PhoneCall.parse_phoneCallWaiting($0) } dict[-2089411356] = { return Api.PhoneCall.parse_phoneCallRequested($0) } @@ -15357,6 +15358,7 @@ public struct Api { case messageActionPaymentSentMe(flags: Int32, currency: String, totalAmount: Int64, payload: Buffer, info: Api.PaymentRequestedInfo?, shippingOptionId: String?, charge: Api.PaymentCharge) case messageActionPaymentSent(currency: String, totalAmount: Int64) case messageActionPhoneCall(flags: Int32, callId: Int64, reason: Api.PhoneCallDiscardReason?, duration: Int32?) + case messageActionScreenshotTaken public func serialize(_ buffer: Buffer, _ boxed: Swift.Bool) -> Swift.Bool { switch self { @@ -15482,6 +15484,12 @@ public struct Api { serializeInt64(callId, buffer: buffer, boxed: false) if Int(flags) & Int(1 << 0) != 0 {reason!.serialize(buffer, true)} if Int(flags) & Int(1 << 1) != 0 {serializeInt32(duration!, buffer: buffer, boxed: false)} + break + case .messageActionScreenshotTaken: + if boxed { + buffer.appendInt32(1200788123) + } + break } return true @@ -15693,6 +15701,9 @@ public struct Api { return nil } } + fileprivate static func parse_messageActionScreenshotTaken(_ reader: BufferReader) -> MessageAction? { + return Api.MessageAction.messageActionScreenshotTaken + } public var description: String { get { @@ -15731,6 +15742,8 @@ public struct Api { return "(messageActionPaymentSent currency: \(currency), totalAmount: \(totalAmount))" case .messageActionPhoneCall(let flags, let callId, let reason, let duration): return "(messageActionPhoneCall flags: \(flags), callId: \(callId), reason: \(reason), duration: \(duration))" + case .messageActionScreenshotTaken: + return "(messageActionScreenshotTaken)" } } } @@ -21436,6 +21449,22 @@ public struct Api { return result }) } + + public static func sendScreenshotNotification(peer: Api.InputPeer, replyToMsgId: Int32, randomId: Int64) -> (CustomStringConvertible, Buffer, (Buffer) -> Api.Updates?) { + let buffer = Buffer() + buffer.appendInt32(-914493408) + peer.serialize(buffer, true) + serializeInt32(replyToMsgId, buffer: buffer, boxed: false) + serializeInt64(randomId, buffer: buffer, boxed: false) + return (FunctionDescription({return "(messages.sendScreenshotNotification peer: \(peer), replyToMsgId: \(replyToMsgId), randomId: \(randomId))"}), buffer, { (buffer: Buffer) -> Api.Updates? in + let reader = BufferReader(buffer) + var result: Api.Updates? + if let signature = reader.readInt32() { + result = Api.parse(reader, signature: signature) as? Api.Updates + } + return result + }) + } } public struct channels { public static func readHistory(channel: Api.InputChannel, maxId: Int32) -> (CustomStringConvertible, Buffer, (Buffer) -> Api.Bool?) { @@ -22835,16 +22864,16 @@ public struct Api { }) } - public static func reuploadCdnFile(fileToken: Buffer, requestToken: Buffer) -> (CustomStringConvertible, Buffer, (Buffer) -> Api.Bool?) { + public static func reuploadCdnFile(fileToken: Buffer, requestToken: Buffer) -> (CustomStringConvertible, Buffer, (Buffer) -> [Api.CdnFileHash]?) { let buffer = Buffer() - buffer.appendInt32(779755552) + buffer.appendInt32(452533257) serializeBytes(fileToken, buffer: buffer, boxed: false) serializeBytes(requestToken, buffer: buffer, boxed: false) - return (FunctionDescription({return "(upload.reuploadCdnFile fileToken: \(fileToken), requestToken: \(requestToken))"}), buffer, { (buffer: Buffer) -> Api.Bool? in + return (FunctionDescription({return "(upload.reuploadCdnFile fileToken: \(fileToken), requestToken: \(requestToken))"}), buffer, { (buffer: Buffer) -> [Api.CdnFileHash]? in let reader = BufferReader(buffer) - var result: Api.Bool? - if let signature = reader.readInt32() { - result = Api.parse(reader, signature: signature) as? Api.Bool + var result: [Api.CdnFileHash]? + if let _ = reader.readInt32() { + result = Api.parseVector(reader, elementSignature: 0, elementType: Api.CdnFileHash.self) } return result }) diff --git a/TelegramCore/MultipartFetch.swift b/TelegramCore/MultipartFetch.swift index 73ef557ddd..eee7ce9baf 100644 --- a/TelegramCore/MultipartFetch.swift +++ b/TelegramCore/MultipartFetch.swift @@ -127,60 +127,162 @@ private func roundUp(_ value: Int, to multiple: Int) -> Int { return value + multiple - remainder } -private final class MultipartCdnHashSourceState { - private var hashes: [Int32: Data] - private var requestOffsetAndDisposable: (Int32, Disposable)? - private var requestedOffsets = Set() - - init(hashes: [Int32: Data]) { - self.hashes = hashes - } - - func dispose() -> Disposable? { - let disposable = self.requestOffsetAndDisposable?.1 - self.requestOffsetAndDisposable = nil - return disposable - } - - func get(offset: Int32) -> (Data?, MetaDisposable?) { - if let data = self.hashes[offset] { - return (data, nil) - } else { - requestedOffsets.insert(offset) - if self.requestOffsetAndDisposable == nil { - let disposable = MetaDisposable() - self.requestOffsetAndDisposable = (offset, disposable) - return (nil, disposable) - } else { - return (nil, nil) - } - } - } - - func add(requestedOffset: Int32, addedHashes: [Int32: Data]) -> (Int32, MetaDisposable)? { - - return nil - } -} +private let dataHashLength: Int32 = 128 * 1024 private final class MultipartCdnHashSource { - private let state: Atomic + private let queue: Queue + + private let fileToken: Data private let masterDownload: DownloadWrapper - init(hashes: [Int32: Data], masterDownload: DownloadWrapper) { - self.state = Atomic(value: MultipartCdnHashSourceState(hashes: hashes)) + private var knownUpperBound: Int32 + private var hashes: [Int32: Data] + private var requestOffsetAndDisposable: (Int32, Disposable)? + private var requestedUpperBound: Int32? + + private var subscribers = Bag<(Int32, Int32, ([Int32: Data]) -> Void)>() + + init(queue: Queue, fileToken: Data, hashes: [Int32: Data], masterDownload: DownloadWrapper) { + assert(queue.isCurrent()) + + self.queue = queue + self.fileToken = fileToken self.masterDownload = masterDownload + + self.hashes = hashes + var knownUpperBound: Int32 = 0 + /*for (offset, _) in hashes { + assert(offset % dataHashLength == 0) + knownUpperBound = max(knownUpperBound, offset + dataHashLength) + }*/ + self.knownUpperBound = knownUpperBound } deinit { - let disposable = self.state.with { - return $0.dispose() - } - disposable?.dispose() + assert(self.queue.isCurrent()) + + self.requestOffsetAndDisposable?.1.dispose() } - func get(offset: Int32) -> Signal { - return .never() + private func take(offset: Int32, limit: Int32) -> [Int32: Data]? { + assert(offset % dataHashLength == 0) + assert(limit % dataHashLength == 0) + + var result: [Int32: Data] = [:] + + var localOffset: Int32 = 0 + while localOffset < limit { + if let hash = self.hashes[offset + localOffset] { + result[offset + localOffset] = hash + } else { + return nil + } + localOffset += dataHashLength + } + + return result + } + + func get(offset: Int32, limit: Int32) -> Signal<[Int32: Data], MultipartFetchDownloadError> { + assert(self.queue.isCurrent()) + + let queue = self.queue + return Signal { [weak self] subscriber in + let disposable = MetaDisposable() + + queue.async { + if let strongSelf = self { + if let result = strongSelf.take(offset: offset, limit: limit) { + subscriber.putNext(result) + subscriber.putCompletion() + } else { + let index = strongSelf.subscribers.add((offset, limit, { result in + subscriber.putNext(result) + subscriber.putCompletion() + })) + + disposable.set(ActionDisposable { + queue.async { + if let strongSelf = self { + strongSelf.subscribers.remove(index) + } + } + }) + + if let requestedUpperBound = strongSelf.requestedUpperBound { + strongSelf.requestedUpperBound = max(requestedUpperBound, offset + limit) + } else { + strongSelf.requestedUpperBound = offset + limit + } + + if strongSelf.requestOffsetAndDisposable == nil { + strongSelf.requestMore() + } else { + if let requestedUpperBound = strongSelf.requestedUpperBound { + strongSelf.requestedUpperBound = max(requestedUpperBound, offset + limit) + } else { + strongSelf.requestedUpperBound = offset + limit + } + } + } + } + } + + return disposable + } + } + + private func requestMore() { + assert(self.queue.isCurrent()) + + let requestOffset = self.knownUpperBound + let disposable = MetaDisposable() + self.requestOffsetAndDisposable = (requestOffset, disposable) + let queue = self.queue + let fileToken = self.fileToken + disposable.set((self.masterDownload.get() |> mapToSignal { download -> Signal<[Int32: Data], NoError> in + return download.request(Api.functions.upload.getCdnFileHashes(fileToken: Buffer(data: fileToken), offset: requestOffset)) + |> map { partHashes -> [Int32: Data] in + var parsedPartHashes: [Int32: Data] = [:] + for part in partHashes { + switch part { + case let .cdnFileHash(offset, limit, bytes): + assert(limit == 128 * 1024) + parsedPartHashes[offset] = bytes.makeData() + } + } + return parsedPartHashes + } + |> `catch` { _ -> Signal<[Int32: Data], NoError> in + return .single([:]) + } + } |> deliverOn(queue)).start(next: { [weak self] result in + if let strongSelf = self { + if strongSelf.requestOffsetAndDisposable?.0 == requestOffset { + strongSelf.requestOffsetAndDisposable = nil + + for (hashOffset, hashData) in result { + assert(hashOffset % dataHashLength == 0) + strongSelf.knownUpperBound = max(strongSelf.knownUpperBound, hashOffset + dataHashLength) + strongSelf.hashes[hashOffset] = hashData + } + + for (index, item) in strongSelf.subscribers.copyItemsWithIndices() { + let (offset, limit, subscriber) = item + if let data = strongSelf.take(offset: offset, limit: limit) { + strongSelf.subscribers.remove(index) + subscriber(data) + } + } + + if let requestedUpperBound = strongSelf.requestedUpperBound, requestedUpperBound > strongSelf.knownUpperBound { + strongSelf.requestMore() + } + } else { + assertionFailure() + } + } + })) } } @@ -274,7 +376,7 @@ private enum MultipartFetchSource { } } } - return combineLatest(part, hashSource.get(offset: offset)) + return combineLatest(part, hashSource.get(offset: offset, limit: limit)) |> mapToSignal { partData, hashData -> Signal in return .single(partData) } @@ -285,6 +387,7 @@ private enum MultipartFetchSource { private final class MultipartFetchManager { let parallelParts: Int let defaultPartSize = 128 * 1024 + let partAlignment = 128 * 1024 let queue = Queue() @@ -314,7 +417,6 @@ private final class MultipartFetchManager { self.completeSize = size if let size = size { if size <= range.lowerBound { - //assertionFailure() self.range = range self.parallelParts = 0 } else { @@ -433,7 +535,7 @@ private final class MultipartFetchManager { case let .switchToCdn(id, token, key, iv, partHashes): switch strongSelf.source { case let .master(location, download): - strongSelf.source = .cdn(masterDatacenterId: location.datacenterId, fileToken: token, key: key, iv: iv, download: DownloadWrapper(id: id, cdn: true, take: strongSelf.takeDownloader), masterDownload: download, hashSource: MultipartCdnHashSource(hashes: partHashes, masterDownload: download)) + strongSelf.source = .cdn(masterDatacenterId: location.datacenterId, fileToken: token, key: key, iv: iv, download: DownloadWrapper(id: id, cdn: true, take: strongSelf.takeDownloader), masterDownload: download, hashSource: MultipartCdnHashSource(queue: strongSelf.queue, fileToken: token, hashes: partHashes, masterDownload: download)) strongSelf.checkState() case .cdn, .none: break @@ -445,13 +547,13 @@ private final class MultipartFetchManager { case let .cdn(_, fileToken, _, _, _, masterDownload, _): if !strongSelf.reuploadingToCdn { strongSelf.reuploadingToCdn = true - let reupload: Signal = masterDownload.get() |> mapToSignal { download -> Signal in + let reupload: Signal<[Api.CdnFileHash], NoError> = masterDownload.get() |> mapToSignal { download -> Signal<[Api.CdnFileHash], NoError> in return download.request(Api.functions.upload.reuploadCdnFile(fileToken: Buffer(data: fileToken), requestToken: Buffer(data: token))) - |> `catch` { _ -> Signal in - return .single(.boolFalse) + |> `catch` { _ -> Signal<[Api.CdnFileHash], NoError> in + return .single([]) } } - strongSelf.reuploadToCdnDisposable.set((reupload |> deliverOn(strongSelf.queue)).start(next: { result in + strongSelf.reuploadToCdnDisposable.set((reupload |> deliverOn(strongSelf.queue)).start(next: { _ in if let strongSelf = self { strongSelf.reuploadingToCdn = false strongSelf.checkState() diff --git a/TelegramCore/StoreMessage_Telegram.swift b/TelegramCore/StoreMessage_Telegram.swift index 15f560bd3d..d317248ff5 100644 --- a/TelegramCore/StoreMessage_Telegram.swift +++ b/TelegramCore/StoreMessage_Telegram.swift @@ -185,7 +185,7 @@ extension Api.Message { } switch action { - case .messageActionChannelCreate, .messageActionChatDeletePhoto, .messageActionChatEditPhoto, .messageActionChatEditTitle, .messageActionEmpty, .messageActionPinMessage, .messageActionHistoryClear, .messageActionHistoryClear, .messageActionGameScore: + case .messageActionChannelCreate, .messageActionChatDeletePhoto, .messageActionChatEditPhoto, .messageActionChatEditTitle, .messageActionEmpty, .messageActionPinMessage, .messageActionHistoryClear, .messageActionGameScore, .messageActionPaymentSent, .messageActionPaymentSentMe, .messageActionPhoneCall, .messageActionScreenshotTaken: break case let .messageActionChannelMigrateFrom(_, chatId): result.append(PeerId(namespace: Namespaces.Peer.CloudGroup, id: chatId)) @@ -203,12 +203,6 @@ extension Api.Message { result.append(PeerId(namespace: Namespaces.Peer.CloudUser, id: inviterId)) case let .messageActionChatMigrateTo(channelId): result.append(PeerId(namespace: Namespaces.Peer.CloudChannel, id: channelId)) - case let .messageActionPhoneCall(flags, callId, reason, duration): - break - case let .messageActionPaymentSent(currency, totalAmount): - break - case .messageActionPaymentSentMe: - break } return result diff --git a/TelegramCore/TelegramMediaAction.swift b/TelegramCore/TelegramMediaAction.swift index 0adfb829df..09a83ffcf2 100644 --- a/TelegramCore/TelegramMediaAction.swift +++ b/TelegramCore/TelegramMediaAction.swift @@ -328,6 +328,8 @@ func telegramMediaActionFromApiAction(_ action: Api.MessageAction) -> TelegramMe return TelegramMediaAction(action: .paymentSent(currency: currency, totalAmount: totalAmount)) case .messageActionPaymentSentMe: return nil + case .messageActionScreenshotTaken: + return TelegramMediaAction(action: .historyScreenshot) } }