diff --git a/SSignalKit/SSignal+Meta.m b/SSignalKit/SSignal+Meta.m index 0adc4846ab..16ccfd7069 100644 --- a/SSignalKit/SSignal+Meta.m +++ b/SSignalKit/SSignal+Meta.m @@ -5,41 +5,114 @@ #import "SSignal+Mapping.h" #import "SAtomic.h" +#import + +@interface SSignalSwitchToLatestState : NSObject +{ + OSSpinLock _lock; + bool _didSwitch; + bool _terminated; + + id _disposable; + SMetaDisposable *_currentDisposable; + SSubscriber *_subscriber; +} + +@end + +@implementation SSignalSwitchToLatestState + +- (instancetype)initWithSubscriber:(SSubscriber *)subscriber +{ + self = [super init]; + if (self != nil) + { + _subscriber = subscriber; + _currentDisposable = [[SMetaDisposable alloc] init]; + } + return self; +} + +- (void)beginWithDisposable:(id)disposable +{ + _disposable = disposable; +} + +- (void)switchToSignal:(SSignal *)signal +{ + OSSpinLockLock(&_lock); + _didSwitch = true; + OSSpinLockUnlock(&_lock); + + id disposable = [signal startWithNext:^(id next) + { + [_subscriber putNext:next]; + } error:^(id error) + { + [_subscriber putError:error]; + } completed:^ + { + OSSpinLockLock(&_lock); + _didSwitch = false; + OSSpinLockUnlock(&_lock); + + [self maybeComplete]; + }]; + + [_currentDisposable setDisposable:disposable]; +} + +- (void)maybeComplete +{ + bool terminated = false; + OSSpinLockLock(&_lock); + terminated = _terminated; + OSSpinLockUnlock(&_lock); + + if (terminated) + [_subscriber putCompletion]; +} + +- (void)beginCompletion +{ + bool didSwitch = false; + OSSpinLockLock(&_lock); + didSwitch = _didSwitch; + _terminated = true; + OSSpinLockUnlock(&_lock); + + if (!didSwitch) + [_subscriber putCompletion]; +} + +- (void)dispose +{ + [_disposable dispose]; + [_currentDisposable dispose]; +} + +@end + @implementation SSignal (Meta) - (SSignal *)switchToLatest { return [[SSignal alloc] initWithGenerator:^id (SSubscriber *subscriber) { - SDisposableSet *compositeDisposable = [[SDisposableSet alloc] init]; + SSignalSwitchToLatestState *state = [[SSignalSwitchToLatestState alloc] initWithSubscriber:subscriber]; - SMetaDisposable *currentDisposable = [[SMetaDisposable alloc] init]; - [compositeDisposable add:currentDisposable]; - - SAtomic *didProduceNext = [[SAtomic alloc] initWithValue:nil]; - [compositeDisposable add:[self startWithNext:^(SSignal *next) + [state beginWithDisposable:[self startWithNext:^(id next) { - [didProduceNext swap:@1]; - [currentDisposable setDisposable:[next startWithNext:^(id next) - { - [subscriber putNext:next]; - } error:^(id error) - { - [subscriber putError:error]; - } completed:^ - { - [subscriber putCompletion]; - }]]; + [state switchToSignal:next]; } error:^(id error) { [subscriber putError:error]; } completed:^ { - if ([didProduceNext swap:@1] == NULL) - [subscriber putCompletion]; + [state beginCompletion]; }]]; - return compositeDisposable; + return state; }]; } diff --git a/SSignalKit/SSubscriber.m b/SSignalKit/SSubscriber.m index ef5979d5db..9c0e26cb5a 100644 --- a/SSignalKit/SSubscriber.m +++ b/SSignalKit/SSubscriber.m @@ -27,7 +27,10 @@ - (void)_assignDisposable:(id)disposable { - _disposable = disposable; + if (_terminated) + [disposable dispose]; + else + _disposable = disposable; } - (void)_markTerminatedWithoutDisposal diff --git a/SSignalKitTests/SSignalBasicTests.m b/SSignalKitTests/SSignalBasicTests.m index abe09c87a1..e4d871b348 100644 --- a/SSignalKitTests/SSignalBasicTests.m +++ b/SSignalKitTests/SSignalBasicTests.m @@ -179,15 +179,15 @@ SSignal *signal = [[[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) { [subscriber putNext:@1]; - [object description]; + __unused id a0 = [object description]; return [[SBlockDisposable alloc] initWithBlock:^ { - [object description]; + __unused id a1 = [object description]; disposed = true; }]; }] _mapInplace:^id(id value) { - [object description]; + __unused id a1 = [object description]; return @([value intValue] * 2); }]; @@ -242,4 +242,165 @@ XCTAssertFalse(generated); } +- (void)testThen +{ + __block bool generatedFirst = false; + __block bool disposedFirst = false; + __block bool generatedSecond = false; + __block bool disposedSecond = false; + __block int result = 0; + + SSignal *signal = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + generatedFirst = true; + [subscriber putNext:@(1)]; + [subscriber putCompletion]; + return [[SBlockDisposable alloc] initWithBlock:^ + { + disposedFirst = true; + }]; + }]; + + signal = [signal then:[[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + generatedSecond = true; + [subscriber putNext:@(2)]; + [subscriber putCompletion]; + return [[SBlockDisposable alloc] initWithBlock:^ + { + disposedSecond = true; + }]; + }]]; + + [signal startWithNext:^(id next) + { + result += [next intValue]; + }]; + + XCTAssertTrue(generatedFirst); + XCTAssertTrue(disposedFirst); + XCTAssertTrue(generatedSecond); + XCTAssertTrue(disposedSecond); + XCTAssert(result == 3); +} + +- (void)testSwitchToLatest +{ + __block int result = 0; + __block bool disposedOne = false; + __block bool disposedTwo = false; + __block bool disposedThree = false; + __block bool completedAll = false; + + bool deallocatedOne = false; + bool deallocatedTwo = false; + bool deallocatedThree = false; + + @autoreleasepool + { + DeallocatingObject *objectOne = [[DeallocatingObject alloc] initWithDeallocated:&deallocatedOne]; + DeallocatingObject *objectTwo = [[DeallocatingObject alloc] initWithDeallocated:&deallocatedTwo]; + DeallocatingObject *objectThree = [[DeallocatingObject alloc] initWithDeallocated:&deallocatedThree]; + + SSignal *one = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + [subscriber putNext:@(1)]; + [subscriber putCompletion]; + __unused id a0 = [objectOne description]; + return [[SBlockDisposable alloc] initWithBlock:^ + { + __unused id a0 = [objectOne description]; + disposedOne = true; + }]; + }]; + SSignal *two = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + [subscriber putNext:@(2)]; + [subscriber putCompletion]; + __unused id a1 = [objectTwo description]; + return [[SBlockDisposable alloc] initWithBlock:^ + { + __unused id a1 = [objectOne description]; + disposedTwo = true; + }]; + }]; + SSignal *three = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + [subscriber putNext:@(3)]; + [subscriber putCompletion]; + __unused id a0 = [objectThree description]; + return [[SBlockDisposable alloc] initWithBlock:^ + { + __unused id a1 = [objectOne description]; + disposedThree = true; + }]; + }]; + + SSignal *signal = [[[[SSignal single:one] then:[SSignal single:two]] then:[SSignal single:three]] switchToLatest]; + [signal startWithNext:^(id next) + { + result += [next intValue]; + } error:nil completed:^ + { + completedAll = true; + }]; + } + + XCTAssert(result == 6); + XCTAssertTrue(disposedOne); + XCTAssertTrue(disposedTwo); + XCTAssertTrue(disposedThree); + XCTAssertTrue(deallocatedOne); + XCTAssertTrue(deallocatedTwo); + XCTAssertTrue(deallocatedThree); + XCTAssertTrue(completedAll); +} + +- (void)testSwitchToLatestError +{ + __block bool errorGenerated = false; + + SSignal *one = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + [subscriber putError:nil]; + return nil; + }]; + + [one startWithNext:^(__unused id next) + { + + } error:^(__unused id error) + { + errorGenerated = true; + } completed:^ + { + + }]; + + XCTAssertTrue(errorGenerated); +} + +- (void)testSwitchToLatestCompleted +{ + __block bool completedAll = false; + + SSignal *one = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + [subscriber putCompletion]; + return nil; + }]; + + [one startWithNext:^(__unused id next) + { + + } error:^(__unused id error) + { + } completed:^ + { + completedAll = true; + }]; + + XCTAssertTrue(completedAll); +} + @end