switchToLatest + tests

This commit is contained in:
Peter 2015-04-03 23:26:54 +03:00
parent a69c37fb74
commit f94fc005a7
3 changed files with 261 additions and 24 deletions

View File

@ -5,41 +5,114 @@
#import "SSignal+Mapping.h"
#import "SAtomic.h"
#import <libkern/OSAtomic.h>
@interface SSignalSwitchToLatestState : NSObject <SDisposable>
{
OSSpinLock _lock;
bool _didSwitch;
bool _terminated;
id<SDisposable> _disposable;
SMetaDisposable *_currentDisposable;
SSubscriber *_subscriber;
}
@end
@implementation SSignalSwitchToLatestState
- (instancetype)initWithSubscriber:(SSubscriber *)subscriber
{
self = [super init];
if (self != nil)
{
_subscriber = subscriber;
_currentDisposable = [[SMetaDisposable alloc] init];
}
return self;
}
- (void)beginWithDisposable:(id<SDisposable>)disposable
{
_disposable = disposable;
}
- (void)switchToSignal:(SSignal *)signal
{
OSSpinLockLock(&_lock);
_didSwitch = true;
OSSpinLockUnlock(&_lock);
id<SDisposable> disposable = [signal startWithNext:^(id next)
{
[_subscriber putNext:next];
} error:^(id error)
{
[_subscriber putError:error];
} completed:^
{
OSSpinLockLock(&_lock);
_didSwitch = false;
OSSpinLockUnlock(&_lock);
[self maybeComplete];
}];
[_currentDisposable setDisposable:disposable];
}
- (void)maybeComplete
{
bool terminated = false;
OSSpinLockLock(&_lock);
terminated = _terminated;
OSSpinLockUnlock(&_lock);
if (terminated)
[_subscriber putCompletion];
}
- (void)beginCompletion
{
bool didSwitch = false;
OSSpinLockLock(&_lock);
didSwitch = _didSwitch;
_terminated = true;
OSSpinLockUnlock(&_lock);
if (!didSwitch)
[_subscriber putCompletion];
}
- (void)dispose
{
[_disposable dispose];
[_currentDisposable dispose];
}
@end
@implementation SSignal (Meta)
- (SSignal *)switchToLatest
{
return [[SSignal alloc] initWithGenerator:^id<SDisposable> (SSubscriber *subscriber)
{
SDisposableSet *compositeDisposable = [[SDisposableSet alloc] init];
SSignalSwitchToLatestState *state = [[SSignalSwitchToLatestState alloc] initWithSubscriber:subscriber];
SMetaDisposable *currentDisposable = [[SMetaDisposable alloc] init];
[compositeDisposable add:currentDisposable];
SAtomic *didProduceNext = [[SAtomic alloc] initWithValue:nil];
[compositeDisposable add:[self startWithNext:^(SSignal *next)
[state beginWithDisposable:[self startWithNext:^(id next)
{
[didProduceNext swap:@1];
[currentDisposable setDisposable:[next startWithNext:^(id next)
{
[subscriber putNext:next];
} error:^(id error)
{
[subscriber putError:error];
} completed:^
{
[subscriber putCompletion];
}]];
[state switchToSignal:next];
} error:^(id error)
{
[subscriber putError:error];
} completed:^
{
if ([didProduceNext swap:@1] == NULL)
[subscriber putCompletion];
[state beginCompletion];
}]];
return compositeDisposable;
return state;
}];
}

View File

@ -27,7 +27,10 @@
- (void)_assignDisposable:(id<SDisposable>)disposable
{
_disposable = disposable;
if (_terminated)
[disposable dispose];
else
_disposable = disposable;
}
- (void)_markTerminatedWithoutDisposal

View File

@ -179,15 +179,15 @@
SSignal *signal = [[[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
[subscriber putNext:@1];
[object description];
__unused id a0 = [object description];
return [[SBlockDisposable alloc] initWithBlock:^
{
[object description];
__unused id a1 = [object description];
disposed = true;
}];
}] _mapInplace:^id(id value)
{
[object description];
__unused id a1 = [object description];
return @([value intValue] * 2);
}];
@ -242,4 +242,165 @@
XCTAssertFalse(generated);
}
- (void)testThen
{
__block bool generatedFirst = false;
__block bool disposedFirst = false;
__block bool generatedSecond = false;
__block bool disposedSecond = false;
__block int result = 0;
SSignal *signal = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
generatedFirst = true;
[subscriber putNext:@(1)];
[subscriber putCompletion];
return [[SBlockDisposable alloc] initWithBlock:^
{
disposedFirst = true;
}];
}];
signal = [signal then:[[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
generatedSecond = true;
[subscriber putNext:@(2)];
[subscriber putCompletion];
return [[SBlockDisposable alloc] initWithBlock:^
{
disposedSecond = true;
}];
}]];
[signal startWithNext:^(id next)
{
result += [next intValue];
}];
XCTAssertTrue(generatedFirst);
XCTAssertTrue(disposedFirst);
XCTAssertTrue(generatedSecond);
XCTAssertTrue(disposedSecond);
XCTAssert(result == 3);
}
- (void)testSwitchToLatest
{
__block int result = 0;
__block bool disposedOne = false;
__block bool disposedTwo = false;
__block bool disposedThree = false;
__block bool completedAll = false;
bool deallocatedOne = false;
bool deallocatedTwo = false;
bool deallocatedThree = false;
@autoreleasepool
{
DeallocatingObject *objectOne = [[DeallocatingObject alloc] initWithDeallocated:&deallocatedOne];
DeallocatingObject *objectTwo = [[DeallocatingObject alloc] initWithDeallocated:&deallocatedTwo];
DeallocatingObject *objectThree = [[DeallocatingObject alloc] initWithDeallocated:&deallocatedThree];
SSignal *one = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
[subscriber putNext:@(1)];
[subscriber putCompletion];
__unused id a0 = [objectOne description];
return [[SBlockDisposable alloc] initWithBlock:^
{
__unused id a0 = [objectOne description];
disposedOne = true;
}];
}];
SSignal *two = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
[subscriber putNext:@(2)];
[subscriber putCompletion];
__unused id a1 = [objectTwo description];
return [[SBlockDisposable alloc] initWithBlock:^
{
__unused id a1 = [objectOne description];
disposedTwo = true;
}];
}];
SSignal *three = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
[subscriber putNext:@(3)];
[subscriber putCompletion];
__unused id a0 = [objectThree description];
return [[SBlockDisposable alloc] initWithBlock:^
{
__unused id a1 = [objectOne description];
disposedThree = true;
}];
}];
SSignal *signal = [[[[SSignal single:one] then:[SSignal single:two]] then:[SSignal single:three]] switchToLatest];
[signal startWithNext:^(id next)
{
result += [next intValue];
} error:nil completed:^
{
completedAll = true;
}];
}
XCTAssert(result == 6);
XCTAssertTrue(disposedOne);
XCTAssertTrue(disposedTwo);
XCTAssertTrue(disposedThree);
XCTAssertTrue(deallocatedOne);
XCTAssertTrue(deallocatedTwo);
XCTAssertTrue(deallocatedThree);
XCTAssertTrue(completedAll);
}
- (void)testSwitchToLatestError
{
__block bool errorGenerated = false;
SSignal *one = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
[subscriber putError:nil];
return nil;
}];
[one startWithNext:^(__unused id next)
{
} error:^(__unused id error)
{
errorGenerated = true;
} completed:^
{
}];
XCTAssertTrue(errorGenerated);
}
- (void)testSwitchToLatestCompleted
{
__block bool completedAll = false;
SSignal *one = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
[subscriber putCompletion];
return nil;
}];
[one startWithNext:^(__unused id next)
{
} error:^(__unused id error)
{
} completed:^
{
completedAll = true;
}];
XCTAssertTrue(completedAll);
}
@end