Some MTSignal fixes

This commit is contained in:
Ali 2023-09-26 18:42:50 +04:00
parent 30639d9a88
commit 1b3cfcb124
9 changed files with 300 additions and 170 deletions

View File

@ -13,8 +13,13 @@
- (instancetype)initWithGenerator:(id<MTDisposable> (^)(MTSubscriber *))generator; - (instancetype)initWithGenerator:(id<MTDisposable> (^)(MTSubscriber *))generator;
- (id<MTDisposable>)startWithNext:(void (^)(id next))next error:(void (^)(id error))error completed:(void (^)())completed; - (id<MTDisposable>)startWithNext:(void (^)(id next))next error:(void (^)(id error))error completed:(void (^)())completed;
- (id<MTDisposable>)startWithNextStrict:(void (^)(id next))next error:(void (^)(id error))error completed:(void (^)())completed file:(const char *)file line:(int)line;
- (id<MTDisposable>)startWithNext:(void (^)(id next))next; - (id<MTDisposable>)startWithNext:(void (^)(id next))next;
- (id<MTDisposable>)startWithNextStrict:(void (^)(id next))next file:(const char *)file line:(int)line;
- (id<MTDisposable>)startWithNext:(void (^)(id next))next completed:(void (^)())completed; - (id<MTDisposable>)startWithNext:(void (^)(id next))next completed:(void (^)())completed;
- (id<MTDisposable>)startWithNextStrict:(void (^)(id next))next completed:(void (^)())completed file:(const char *)file line:(int)line;
+ (MTSignal *)single:(id)next; + (MTSignal *)single:(id)next;
+ (MTSignal *)fail:(id)error; + (MTSignal *)fail:(id)error;

View File

@ -1030,15 +1030,17 @@ static void copyKeychainDictionaryKey(NSString * _Nonnull group, NSString * _Non
__weak MTContext *weakSelf = self; __weak MTContext *weakSelf = self;
MTMetaDisposable *disposable = [[MTMetaDisposable alloc] init]; MTMetaDisposable *disposable = [[MTMetaDisposable alloc] init];
_fetchPublicKeysActions[@(datacenterId)] = disposable; _fetchPublicKeysActions[@(datacenterId)] = disposable;
[disposable setDisposable:[signal startWithNext:^(NSArray<NSDictionary *> *next) { [disposable setDisposable:[signal startWithNextStrict:^(NSArray<NSDictionary *> *next) {
[[MTContext contextQueue] dispatchOnQueue:^{ [[MTContext contextQueue] dispatchOnQueue:^{
__strong MTContext *strongSelf = weakSelf; __strong MTContext *strongSelf = weakSelf;
if (strongSelf != nil) { if (strongSelf != nil) {
id<MTDisposable> disposable = strongSelf->_fetchPublicKeysActions[@(datacenterId)];
[strongSelf->_fetchPublicKeysActions removeObjectForKey:@(datacenterId)]; [strongSelf->_fetchPublicKeysActions removeObjectForKey:@(datacenterId)];
[disposable dispose];
[strongSelf updatePublicKeysForDatacenterWithId:datacenterId publicKeys:next]; [strongSelf updatePublicKeysForDatacenterWithId:datacenterId publicKeys:next];
} }
} synchronous:false]; } synchronous:false];
}]]; } file:__FILE_NAME__ line:__LINE__]];
break; break;
} }
} }
@ -1157,10 +1159,12 @@ static void copyKeychainDictionaryKey(NSString * _Nonnull group, NSString * _Non
{ {
[[MTContext contextQueue] dispatchOnQueue:^ [[MTContext contextQueue] dispatchOnQueue:^
{ {
id<MTDisposable> disposable = strongSelf->_transportSchemeDisposableByDatacenterId[@(datacenterId)];
[strongSelf->_transportSchemeDisposableByDatacenterId removeObjectForKey:@(datacenterId)]; [strongSelf->_transportSchemeDisposableByDatacenterId removeObjectForKey:@(datacenterId)];
[disposable dispose];
}]; }];
} }
}] startWithNext:^(MTTransportScheme *next) }] startWithNextStrict:^(MTTransportScheme *next)
{ {
if (MTLogEnabled()) { if (MTLogEnabled()) {
MTLog(@"scheme: %@", next); MTLog(@"scheme: %@", next);
@ -1176,7 +1180,7 @@ static void copyKeychainDictionaryKey(NSString * _Nonnull group, NSString * _Non
} completed:^ } completed:^
{ {
}]; } file:__FILE_NAME__ line:__LINE__];
} }
}]; }];
} }
@ -1293,7 +1297,7 @@ static void copyKeychainDictionaryKey(NSString * _Nonnull group, NSString * _Non
[strongSelf->_backupAddressListDisposable dispose]; [strongSelf->_backupAddressListDisposable dispose];
strongSelf->_backupAddressListDisposable = nil; strongSelf->_backupAddressListDisposable = nil;
} }
}] startWithNext:nil]; }] startWithNextStrict:nil file:__FILE_NAME__ line:__LINE__];
} }
} }
@ -1491,7 +1495,7 @@ static void copyKeychainDictionaryKey(NSString * _Nonnull group, NSString * _Non
_datacenterCheckKeyRemovedActionTimestamps[@(datacenterId)] = currentTimestamp; _datacenterCheckKeyRemovedActionTimestamps[@(datacenterId)] = currentTimestamp;
[_datacenterCheckKeyRemovedActions[@(datacenterId)] dispose]; [_datacenterCheckKeyRemovedActions[@(datacenterId)] dispose];
__weak MTContext *weakSelf = self; __weak MTContext *weakSelf = self;
_datacenterCheckKeyRemovedActions[@(datacenterId)] = [[MTDiscoverConnectionSignals checkIfAuthKeyRemovedWithContext:self datacenterId:datacenterId authKey:[[MTDatacenterAuthKey alloc] initWithAuthKey:authInfo.authKey authKeyId:authInfo.authKeyId validUntilTimestamp:authInfo.validUntilTimestamp notBound:false]] startWithNext:^(NSNumber* isRemoved) { _datacenterCheckKeyRemovedActions[@(datacenterId)] = [[MTDiscoverConnectionSignals checkIfAuthKeyRemovedWithContext:self datacenterId:datacenterId authKey:[[MTDatacenterAuthKey alloc] initWithAuthKey:authInfo.authKey authKeyId:authInfo.authKeyId validUntilTimestamp:authInfo.validUntilTimestamp notBound:false]] startWithNextStrict:^(NSNumber* isRemoved) {
[[MTContext contextQueue] dispatchOnQueue:^{ [[MTContext contextQueue] dispatchOnQueue:^{
__strong MTContext *strongSelf = weakSelf; __strong MTContext *strongSelf = weakSelf;
if (strongSelf == nil) { if (strongSelf == nil) {
@ -1506,7 +1510,7 @@ static void copyKeychainDictionaryKey(NSString * _Nonnull group, NSString * _Non
} }
} }
}]; }];
}]; } file:__FILE_NAME__ line:__LINE__];
} }
}]; }];
} }

View File

@ -125,7 +125,7 @@
if (disposable != nil) { if (disposable != nil) {
__weak MTDNSContext *weakSelf = self; __weak MTDNSContext *weakSelf = self;
[disposable setDisposable:[[[self performLookup:host port:port] deliverOn:[MTDNSContext sharedQueue]] startWithNext:^(NSString *result) { [disposable setDisposable:[[[self performLookup:host port:port] deliverOn:[MTDNSContext sharedQueue]] startWithNextStrict:^(NSString *result) {
__strong MTDNSContext *strongSelf = weakSelf; __strong MTDNSContext *strongSelf = weakSelf;
if (strongSelf == nil) { if (strongSelf == nil) {
return; return;
@ -134,7 +134,7 @@
[strongSelf->_contexts[key] complete:result]; [strongSelf->_contexts[key] complete:result];
[strongSelf->_contexts removeObjectForKey:key]; [strongSelf->_contexts removeObjectForKey:key];
} }
}]]; } file:__FILE_NAME__ line:__LINE__]];
} }
__weak MTDNSContext *weakSelf = self; __weak MTDNSContext *weakSelf = self;

View File

@ -1,13 +1,11 @@
#import <MtProtoKit/MTDisposable.h> #import <MtProtoKit/MTDisposable.h>
#import <os/lock.h> #import <pthread/pthread.h>
#import <libkern/OSAtomic.h>
#import <stdatomic.h>
#import <objc/runtime.h> #import <objc/runtime.h>
@interface MTBlockDisposable () @interface MTBlockDisposable () {
{ void (^_action)();
void *_block; pthread_mutex_t _lock;
} }
@end @end
@ -19,47 +17,35 @@
self = [super init]; self = [super init];
if (self != nil) if (self != nil)
{ {
_block = (__bridge_retained void *)[block copy]; _action = [block copy];
pthread_mutex_init(&_lock, nil);
} }
return self; return self;
} }
- (void)dealloc - (void)dealloc {
{ void (^freeAction)() = nil;
void *block = _block; pthread_mutex_lock(&_lock);
if (block != NULL) freeAction = _action;
{ _action = nil;
#pragma clang diagnostic push pthread_mutex_unlock(&_lock);
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
if (OSAtomicCompareAndSwapPtr(block, 0, &_block)) if (freeAction) {
{
if (block != nil)
{
__unused __strong id strongBlock = (__bridge_transfer id)block;
strongBlock = nil;
}
}
#pragma clang diagnostic pop
} }
pthread_mutex_destroy(&_lock);
} }
- (void)dispose - (void)dispose {
{ void (^disposeAction)() = nil;
void *block = _block;
if (block != NULL) pthread_mutex_lock(&_lock);
{ disposeAction = _action;
#pragma clang diagnostic push _action = nil;
#pragma clang diagnostic ignored "-Wdeprecated-declarations" pthread_mutex_unlock(&_lock);
if (OSAtomicCompareAndSwapPtr(block, 0, &_block))
{ if (disposeAction) {
if (block != nil) disposeAction();
{
__strong id strongBlock = (__bridge_transfer id)block;
((dispatch_block_t)strongBlock)();
strongBlock = nil;
}
}
#pragma clang diagnostic pop
} }
} }
@ -67,7 +53,7 @@
@interface MTMetaDisposable () @interface MTMetaDisposable ()
{ {
os_unfair_lock _lock; pthread_mutex_t _lock;
bool _disposed; bool _disposed;
id<MTDisposable> _disposable; id<MTDisposable> _disposable;
} }
@ -76,128 +62,139 @@
@implementation MTMetaDisposable @implementation MTMetaDisposable
- (void)setDisposable:(id<MTDisposable>)disposable - (instancetype)init {
{ self = [super init];
id<MTDisposable> previousDisposable = nil; if (self != nil) {
bool dispose = false; pthread_mutex_init(&_lock, nil);
}
return self;
}
os_unfair_lock_lock(&_lock); - (void)dealloc {
dispose = _disposed; id<MTDisposable> freeDisposable = nil;
if (!dispose) pthread_mutex_lock(&_lock);
{ if (_disposable) {
freeDisposable = _disposable;
_disposable = nil;
}
pthread_mutex_unlock(&_lock);
if (freeDisposable) {
}
pthread_mutex_destroy(&_lock);
}
- (void)setDisposable:(id<MTDisposable>)disposable {
id<MTDisposable> previousDisposable = nil;
bool disposeImmediately = false;
pthread_mutex_lock(&_lock);
disposeImmediately = _disposed;
if (!disposeImmediately) {
previousDisposable = _disposable; previousDisposable = _disposable;
_disposable = disposable; _disposable = disposable;
} }
os_unfair_lock_unlock(&_lock); pthread_mutex_unlock(&_lock);
if (previousDisposable != nil) if (previousDisposable) {
[previousDisposable dispose]; [previousDisposable dispose];
}
if (dispose) if (disposeImmediately) {
[disposable dispose]; [disposable dispose];
}
} }
- (void)dispose - (void)dispose {
{
id<MTDisposable> disposable = nil; id<MTDisposable> disposable = nil;
os_unfair_lock_lock(&_lock); pthread_mutex_lock(&_lock);
if (!_disposed) if (!_disposed) {
{
disposable = _disposable;
_disposed = true; _disposed = true;
disposable = _disposable;
_disposable = nil;
} }
os_unfair_lock_unlock(&_lock); pthread_mutex_unlock(&_lock);
if (disposable != nil) if (disposable) {
[disposable dispose]; [disposable dispose];
}
} }
@end @end
@interface MTDisposableSet () @interface MTDisposableSet ()
{ {
os_unfair_lock _lock; pthread_mutex_t _lock;
bool _disposed; bool _disposed;
id<MTDisposable> _singleDisposable; NSMutableArray<id<MTDisposable>> *_disposables;
NSArray *_multipleDisposables;
} }
@end @end
@implementation MTDisposableSet @implementation MTDisposableSet
- (void)add:(id<MTDisposable>)disposable - (instancetype)init {
{ self = [super init];
if (disposable == nil) if (self != nil) {
return; pthread_mutex_init(&_lock, nil);
_disposables = [[NSMutableArray alloc] init];
}
return self;
}
bool dispose = false; - (void)dealloc {
NSArray<id<MTDisposable>> *disposables = nil;
pthread_mutex_lock(&_lock);
disposables = _disposables;
_disposables = nil;
pthread_mutex_unlock(&_lock);
os_unfair_lock_lock(&_lock); if (disposables) {
dispose = _disposed;
if (!dispose)
{
if (_multipleDisposables != nil)
{
NSMutableArray *multipleDisposables = [[NSMutableArray alloc] initWithArray:_multipleDisposables];
[multipleDisposables addObject:disposable];
_multipleDisposables = multipleDisposables;
} }
else if (_singleDisposable != nil) pthread_mutex_destroy(&_lock);
{ }
NSMutableArray *multipleDisposables = [[NSMutableArray alloc] initWithObjects:_singleDisposable, disposable, nil];
_multipleDisposables = multipleDisposables;
_singleDisposable = nil;
}
else
{
_singleDisposable = disposable;
}
}
os_unfair_lock_unlock(&_lock);
if (dispose) - (void)add:(id<MTDisposable>)disposable {
bool disposeImmediately = false;
pthread_mutex_lock(&_lock);
if (_disposed) {
disposeImmediately = true;
} else {
[_disposables addObject:disposable];
}
pthread_mutex_unlock(&_lock);
if (disposeImmediately) {
[disposable dispose]; [disposable dispose];
}
} }
- (void)remove:(id<MTDisposable>)disposable { - (void)remove:(id<MTDisposable>)disposable {
os_unfair_lock_lock(&_lock); pthread_mutex_lock(&_lock);
if (_multipleDisposables != nil) for (NSInteger i = 0; i < _disposables.count; i++) {
{ if (_disposables[i] == disposable) {
NSMutableArray *multipleDisposables = [[NSMutableArray alloc] initWithArray:_multipleDisposables]; [_disposables removeObjectAtIndex:i];
[multipleDisposables removeObject:disposable]; break;
_multipleDisposables = multipleDisposables;
} }
else if (_singleDisposable == disposable)
{
_singleDisposable = nil;
} }
os_unfair_lock_unlock(&_lock); pthread_mutex_unlock(&_lock);
} }
- (void)dispose - (void)dispose {
{ NSArray<id<MTDisposable>> *disposables = nil;
id<MTDisposable> singleDisposable = nil; pthread_mutex_lock(&_lock);
NSArray *multipleDisposables = nil; if (!_disposed) {
os_unfair_lock_lock(&_lock);
if (!_disposed)
{
_disposed = true; _disposed = true;
singleDisposable = _singleDisposable; disposables = _disposables;
multipleDisposables = _multipleDisposables; _disposables = nil;
_singleDisposable = nil;
_multipleDisposables = nil;
} }
os_unfair_lock_unlock(&_lock); pthread_mutex_unlock(&_lock);
if (singleDisposable != nil) if (disposables) {
[singleDisposable dispose]; for (id<MTDisposable> disposable in disposables) {
if (multipleDisposables != nil)
{
for (id<MTDisposable> disposable in multipleDisposables)
{
[disposable dispose]; [disposable dispose];
} }
} }

View File

@ -838,7 +838,7 @@ static const NSUInteger MTMaxUnacknowledgedMessageCount = 64;
__weak MTProto *weakSelf = self; __weak MTProto *weakSelf = self;
MTSignal *checkSignal = [[MTConnectionProbing probeProxyWithContext:_context datacenterId:_datacenterId settings:transport.proxySettings] delay:5.0 onQueue:[MTQueue concurrentDefaultQueue]]; MTSignal *checkSignal = [[MTConnectionProbing probeProxyWithContext:_context datacenterId:_datacenterId settings:transport.proxySettings] delay:5.0 onQueue:[MTQueue concurrentDefaultQueue]];
checkSignal = [[checkSignal then:[[MTSignal complete] delay:20.0 onQueue:[MTQueue concurrentDefaultQueue]]] restart]; checkSignal = [[checkSignal then:[[MTSignal complete] delay:20.0 onQueue:[MTQueue concurrentDefaultQueue]]] restart];
[_probingDisposable setDisposable:[checkSignal startWithNext:^(NSNumber *next) { [_probingDisposable setDisposable:[checkSignal startWithNextStrict:^(NSNumber *next) {
[[MTProto managerQueue] dispatchOnQueue:^{ [[MTProto managerQueue] dispatchOnQueue:^{
__strong MTProto *strongSelf = weakSelf; __strong MTProto *strongSelf = weakSelf;
if (strongSelf == nil) { if (strongSelf == nil) {
@ -849,7 +849,7 @@ static const NSUInteger MTMaxUnacknowledgedMessageCount = 64;
[strongSelf _updateConnectionIssuesStatus:[strongSelf->_probingStatus boolValue]]; [strongSelf _updateConnectionIssuesStatus:[strongSelf->_probingStatus boolValue]];
} }
}]; }];
}]]; } file:__FILE_NAME__ line:__LINE__]];
} }
} }
}]; }];

View File

@ -1,6 +1,6 @@
#import <MtProtoKit/MTSignal.h> #import <MtProtoKit/MTSignal.h>
#import <os/lock.h> #import <pthread/pthread.h>
#import <MtProtoKit/MTTimer.h> #import <MtProtoKit/MTTimer.h>
#import <MtProtoKit/MTQueue.h> #import <MtProtoKit/MTQueue.h>
#import <MtProtoKit/MTAtomic.h> #import <MtProtoKit/MTAtomic.h>
@ -8,28 +8,97 @@
@interface MTSubscriberDisposable : NSObject <MTDisposable> @interface MTSubscriberDisposable : NSObject <MTDisposable>
{ {
MTSubscriber *_subscriber; __weak MTSubscriber *_subscriber;
id<MTDisposable> _disposable; id<MTDisposable> _disposable;
pthread_mutex_t _lock;
} }
@end @end
@implementation MTSubscriberDisposable @implementation MTSubscriberDisposable
- (instancetype)initWithSubscriber:(MTSubscriber *)subscriber disposable:(id<MTDisposable>)disposable - (instancetype)initWithSubscriber:(MTSubscriber *)subscriber disposable:(id<MTDisposable>)disposable {
{
self = [super init]; self = [super init];
if (self != nil) if (self != nil) {
{
_subscriber = subscriber; _subscriber = subscriber;
_disposable = disposable; _disposable = disposable;
pthread_mutex_init(&_lock, nil);
} }
return self; return self;
} }
- (void)dispose - (void)dealloc {
{ pthread_mutex_destroy(&_lock);
[_subscriber _markTerminatedWithoutDisposal]; }
- (void)dispose {
MTSubscriber *subscriber = nil;
id<MTDisposable> disposeItem = nil;
pthread_mutex_lock(&_lock);
disposeItem = _disposable;
_disposable = nil;
subscriber = _subscriber;
_subscriber = nil;
pthread_mutex_unlock(&_lock);
[disposeItem dispose];
[subscriber _markTerminatedWithoutDisposal];
}
@end
@interface MTStrictDisposable : NSObject<MTDisposable> {
id<MTDisposable> _disposable;
const char *_file;
int _line;
#if DEBUG
pthread_mutex_t _lock;
bool _isDisposed;
#endif
}
- (instancetype)initWithDisposable:(id<MTDisposable>)disposable file:(const char *)file line:(int)line;
- (void)dispose;
@end
@implementation MTStrictDisposable
- (instancetype)initWithDisposable:(id<MTDisposable>)disposable file:(const char *)file line:(int)line {
self = [super init];
if (self != nil) {
_disposable = disposable;
_file = file;
_line = line;
#if DEBUG
pthread_mutex_init(&_lock, nil);
#endif
}
return self;
}
- (void)dealloc {
#if DEBUG
pthread_mutex_lock(&_lock);
if (!_isDisposed) {
NSLog(@"Leaked disposable from %s:%d", _file, _line);
assert(false);
}
pthread_mutex_unlock(&_lock);
pthread_mutex_destroy(&_lock);
#endif
}
- (void)dispose {
#if DEBUG
pthread_mutex_lock(&_lock);
_isDisposed = true;
pthread_mutex_unlock(&_lock);
#endif
[_disposable dispose]; [_disposable dispose];
} }
@ -55,7 +124,7 @@
@interface MTSignalQueueState : NSObject <MTDisposable> @interface MTSignalQueueState : NSObject <MTDisposable>
{ {
os_unfair_lock _lock; pthread_mutex_t _lock;
bool _executingSignal; bool _executingSignal;
bool _terminated; bool _terminated;
@ -76,6 +145,8 @@
self = [super init]; self = [super init];
if (self != nil) if (self != nil)
{ {
pthread_mutex_init(&_lock, nil);
_subscriber = subscriber; _subscriber = subscriber;
_currentDisposable = [[MTMetaDisposable alloc] init]; _currentDisposable = [[MTMetaDisposable alloc] init];
_queuedSignals = queueMode ? [[NSMutableArray alloc] init] : nil; _queuedSignals = queueMode ? [[NSMutableArray alloc] init] : nil;
@ -84,6 +155,10 @@
return self; return self;
} }
- (void)dealloc {
pthread_mutex_destroy(&_lock);
}
- (void)beginWithDisposable:(id<MTDisposable>)disposable - (void)beginWithDisposable:(id<MTDisposable>)disposable
{ {
_disposable = disposable; _disposable = disposable;
@ -92,7 +167,7 @@
- (void)enqueueSignal:(MTSignal *)signal - (void)enqueueSignal:(MTSignal *)signal
{ {
bool startSignal = false; bool startSignal = false;
os_unfair_lock_lock(&_lock); pthread_mutex_lock(&_lock);
if (_queueMode && _executingSignal) if (_queueMode && _executingSignal)
{ {
[_queuedSignals addObject:signal]; [_queuedSignals addObject:signal];
@ -102,7 +177,7 @@
_executingSignal = true; _executingSignal = true;
startSignal = true; startSignal = true;
} }
os_unfair_lock_unlock(&_lock); pthread_mutex_unlock(&_lock);
if (startSignal) if (startSignal)
{ {
@ -130,7 +205,7 @@
MTSignal *nextSignal = nil; MTSignal *nextSignal = nil;
bool terminated = false; bool terminated = false;
os_unfair_lock_lock(&_lock); pthread_mutex_lock(&_lock);
_executingSignal = false; _executingSignal = false;
if (_queueMode) if (_queueMode)
@ -146,7 +221,7 @@
} }
else else
terminated = _terminated; terminated = _terminated;
os_unfair_lock_unlock(&_lock); pthread_mutex_unlock(&_lock);
if (terminated) if (terminated)
[_subscriber putCompletion]; [_subscriber putCompletion];
@ -174,10 +249,10 @@
- (void)beginCompletion - (void)beginCompletion
{ {
bool executingSignal = false; bool executingSignal = false;
os_unfair_lock_lock(&_lock); pthread_mutex_lock(&_lock);
executingSignal = _executingSignal; executingSignal = _executingSignal;
_terminated = true; _terminated = true;
os_unfair_lock_unlock(&_lock); pthread_mutex_unlock(&_lock);
if (!executingSignal) if (!executingSignal)
[_subscriber putCompletion]; [_subscriber putCompletion];
@ -235,6 +310,14 @@
return [[MTSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable]; return [[MTSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable];
} }
- (id<MTDisposable>)startWithNextStrict:(void (^)(id next))next error:(void (^)(id error))error completed:(void (^)())completed file:(const char *)file line:(int)line
{
MTSubscriber *subscriber = [[MTSubscriber alloc] initWithNext:next error:error completed:completed];
id<MTDisposable> disposable = _generator(subscriber);
[subscriber _assignDisposable:disposable];
return [[MTStrictDisposable alloc] initWithDisposable:[[MTSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable] file:file line:line];
}
- (id<MTDisposable>)startWithNext:(void (^)(id next))next - (id<MTDisposable>)startWithNext:(void (^)(id next))next
{ {
MTSubscriber *subscriber = [[MTSubscriber alloc] initWithNext:next error:nil completed:nil]; MTSubscriber *subscriber = [[MTSubscriber alloc] initWithNext:next error:nil completed:nil];
@ -243,6 +326,14 @@
return [[MTSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable]; return [[MTSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable];
} }
- (id<MTDisposable>)startWithNextStrict:(void (^)(id next))next file:(const char *)file line:(int)line
{
MTSubscriber *subscriber = [[MTSubscriber alloc] initWithNext:next error:nil completed:nil];
id<MTDisposable> disposable = _generator(subscriber);
[subscriber _assignDisposable:disposable];
return [[MTStrictDisposable alloc] initWithDisposable:[[MTSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable] file:file line:line];
}
- (id<MTDisposable>)startWithNext:(void (^)(id next))next completed:(void (^)())completed - (id<MTDisposable>)startWithNext:(void (^)(id next))next completed:(void (^)())completed
{ {
MTSubscriber *subscriber = [[MTSubscriber alloc] initWithNext:next error:nil completed:completed]; MTSubscriber *subscriber = [[MTSubscriber alloc] initWithNext:next error:nil completed:completed];
@ -251,6 +342,14 @@
return [[MTSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable]; return [[MTSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable];
} }
- (id<MTDisposable>)startWithNextStrict:(void (^)(id next))next completed:(void (^)())completed file:(const char *)file line:(int)line
{
MTSubscriber *subscriber = [[MTSubscriber alloc] initWithNext:next error:nil completed:completed];
id<MTDisposable> disposable = _generator(subscriber);
[subscriber _assignDisposable:disposable];
return [[MTStrictDisposable alloc] initWithDisposable:[[MTSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable] file:file line:line];
}
+ (MTSignal *)single:(id)next + (MTSignal *)single:(id)next
{ {
return [[MTSignal alloc] initWithGenerator:^id<MTDisposable> (MTSubscriber *subscriber) return [[MTSignal alloc] initWithGenerator:^id<MTDisposable> (MTSubscriber *subscriber)
@ -324,11 +423,11 @@
{ {
return [[MTSignal alloc] initWithGenerator:^id<MTDisposable> (MTSubscriber *subscriber) return [[MTSignal alloc] initWithGenerator:^id<MTDisposable> (MTSubscriber *subscriber)
{ {
MTMetaDisposable *disposable = [[MTMetaDisposable alloc] init]; MTMetaDisposable *startDisposable = [[MTMetaDisposable alloc] init];
MTMetaDisposable *timerDisposable = [[MTMetaDisposable alloc] init];
MTTimer *timer = [[MTTimer alloc] initWithTimeout:seconds repeat:false completion:^ MTTimer *timer = [[MTTimer alloc] initWithTimeout:seconds repeat:false completion:^{
{ [startDisposable setDisposable:[self startWithNext:^(id next)
[disposable setDisposable:[self startWithNext:^(id next)
{ {
[subscriber putNext:next]; [subscriber putNext:next];
} error:^(id error) } error:^(id error)
@ -342,12 +441,15 @@
[timer start]; [timer start];
[disposable setDisposable:[[MTBlockDisposable alloc] initWithBlock:^ [timerDisposable setDisposable:[[MTBlockDisposable alloc] initWithBlock:^
{ {
[timer invalidate]; [timer invalidate];
}]]; }]];
return disposable; return [[MTBlockDisposable alloc] initWithBlock:^{
[startDisposable dispose];
[timerDisposable dispose];
}];
}]; }];
} }
@ -355,11 +457,11 @@
{ {
return [[MTSignal alloc] initWithGenerator:^id<MTDisposable> (MTSubscriber *subscriber) return [[MTSignal alloc] initWithGenerator:^id<MTDisposable> (MTSubscriber *subscriber)
{ {
MTMetaDisposable *disposable = [[MTMetaDisposable alloc] init]; MTMetaDisposable *startDisposable = [[MTMetaDisposable alloc] init];
MTMetaDisposable *timerDisposable = [[MTMetaDisposable alloc] init];
MTTimer *timer = [[MTTimer alloc] initWithTimeout:seconds repeat:false completion:^ MTTimer *timer = [[MTTimer alloc] initWithTimeout:seconds repeat:false completion:^{
{ [startDisposable setDisposable:[signal startWithNext:^(id next)
[disposable setDisposable:[signal startWithNext:^(id next)
{ {
[subscriber putNext:next]; [subscriber putNext:next];
} error:^(id error) } error:^(id error)
@ -372,7 +474,7 @@
} queue:queue.nativeQueue]; } queue:queue.nativeQueue];
[timer start]; [timer start];
[disposable setDisposable:[self startWithNext:^(id next) [timerDisposable setDisposable:[self startWithNext:^(id next)
{ {
[timer invalidate]; [timer invalidate];
[subscriber putNext:next]; [subscriber putNext:next];
@ -386,7 +488,10 @@
[subscriber putCompletion]; [subscriber putCompletion];
}]]; }]];
return disposable; return [[MTBlockDisposable alloc] initWithBlock:^{
[startDisposable dispose];
[timerDisposable dispose];
}];
}]; }];
} }

View File

@ -68,6 +68,7 @@
{ {
os_unfair_lock_lock(&_lock); os_unfair_lock_lock(&_lock);
MTSubscriberBlocks *blocks = nil; MTSubscriberBlocks *blocks = nil;
id<MTDisposable> disposable = _disposable;
if (!_terminated) if (!_terminated)
{ {
blocks = _blocks; blocks = _blocks;
@ -80,6 +81,8 @@
if (blocks) { if (blocks) {
blocks = nil; blocks = nil;
} }
if (disposable) {
}
} }
- (void)putNext:(id)next - (void)putNext:(id)next
@ -100,6 +103,7 @@
- (void)putError:(id)error - (void)putError:(id)error
{ {
bool shouldDispose = false; bool shouldDispose = false;
id<MTDisposable> disposable = nil;
MTSubscriberBlocks *blocks = nil; MTSubscriberBlocks *blocks = nil;
os_unfair_lock_lock(&_lock); os_unfair_lock_lock(&_lock);
@ -111,19 +115,23 @@
shouldDispose = true; shouldDispose = true;
_terminated = true; _terminated = true;
} }
disposable = _disposable;
_disposable = nil;
os_unfair_lock_unlock(&_lock); os_unfair_lock_unlock(&_lock);
if (blocks && blocks->_error) { if (blocks && blocks->_error) {
blocks->_error(error); blocks->_error(error);
} }
if (shouldDispose) if (shouldDispose) {
[self->_disposable dispose]; [disposable dispose];
}
} }
- (void)putCompletion - (void)putCompletion
{ {
bool shouldDispose = false; bool shouldDispose = false;
id<MTDisposable> disposable = nil;
MTSubscriberBlocks *blocks = nil; MTSubscriberBlocks *blocks = nil;
os_unfair_lock_lock(&_lock); os_unfair_lock_lock(&_lock);
@ -135,18 +143,30 @@
shouldDispose = true; shouldDispose = true;
_terminated = true; _terminated = true;
} }
disposable = _disposable;
_disposable = nil;
os_unfair_lock_unlock(&_lock); os_unfair_lock_unlock(&_lock);
if (blocks && blocks->_completed) if (blocks && blocks->_completed)
blocks->_completed(); blocks->_completed();
if (shouldDispose) if (shouldDispose) {
[self->_disposable dispose]; [disposable dispose];
}
} }
- (void)dispose - (void)dispose
{ {
[self->_disposable dispose]; id<MTDisposable> disposable = nil;
os_unfair_lock_lock(&_lock);
disposable = _disposable;
_disposable = nil;
os_unfair_lock_unlock(&_lock);
if (disposable) {
[disposable dispose];
}
} }
@end @end

View File

@ -947,7 +947,7 @@ struct ctr_state {
} }
__weak MTTcpConnection *weakSelf = self; __weak MTTcpConnection *weakSelf = self;
[_resolveDisposable setDisposable:[resolveSignal startWithNext:^(MTTcpConnectionData *connectionData) { [_resolveDisposable setDisposable:[resolveSignal startWithNextStrict:^(MTTcpConnectionData *connectionData) {
[[MTTcpConnection tcpQueue] dispatchOnQueue:^{ [[MTTcpConnection tcpQueue] dispatchOnQueue:^{
__strong MTTcpConnection *strongSelf = weakSelf; __strong MTTcpConnection *strongSelf = weakSelf;
if (strongSelf == nil || connectionData == nil) { if (strongSelf == nil || connectionData == nil) {
@ -1111,7 +1111,7 @@ struct ctr_state {
[strongSelf->_socket readDataToLength:sizeof(struct socks5_ident_resp) withTimeout:-1 tag:MTTcpSocksLogin]; [strongSelf->_socket readDataToLength:sizeof(struct socks5_ident_resp) withTimeout:-1 tag:MTTcpSocksLogin];
} }
}]; }];
}]]; } file:__FILE_NAME__ line:__LINE__]];
} }
}]; }];
} }

View File

@ -16,7 +16,6 @@ objc_library(
"Source", "Source",
], ],
deps = [ deps = [
"//submodules/SSignalKit/SwiftSignalKit"
], ],
sdk_frameworks = [ sdk_frameworks = [
"Foundation", "Foundation",