no message

This commit is contained in:
Peter 2015-04-10 17:02:40 +03:00
parent 9eb975aa47
commit 6a06af76cb
5 changed files with 378 additions and 28 deletions

View File

@ -1,9 +1,12 @@
#import "SSignal.h" #import "SSignal.h"
@class SQueue;
@interface SSignal (Meta) @interface SSignal (Meta)
- (SSignal *)switchToLatest; - (SSignal *)switchToLatest;
- (SSignal *)mapToSignal:(SSignal *(^)(id))f; - (SSignal *)mapToSignal:(SSignal *(^)(id))f;
- (SSignal *)then:(SSignal *)signal; - (SSignal *)then:(SSignal *)signal;
- (SSignal *)queue;
@end @end

View File

@ -7,28 +7,34 @@
#import <libkern/OSAtomic.h> #import <libkern/OSAtomic.h>
@interface SSignalSwitchToLatestState : NSObject <SDisposable> @interface SSignalQueueState : NSObject <SDisposable>
{ {
OSSpinLock _lock; OSSpinLock _lock;
bool _didSwitch; bool _executingSignal;
bool _terminated; bool _terminated;
id<SDisposable> _disposable; id<SDisposable> _disposable;
SMetaDisposable *_currentDisposable; SMetaDisposable *_currentDisposable;
SSubscriber *_subscriber; SSubscriber *_subscriber;
NSMutableArray *_queuedSignals;
bool _queueMode;
} }
@end @end
@implementation SSignalSwitchToLatestState @implementation SSignalQueueState
- (instancetype)initWithSubscriber:(SSubscriber *)subscriber - (instancetype)initWithSubscriber:(SSubscriber *)subscriber queueMode:(bool)queueMode
{ {
self = [super init]; self = [super init];
if (self != nil) if (self != nil)
{ {
_subscriber = subscriber; _subscriber = subscriber;
_currentDisposable = [[SMetaDisposable alloc] init]; _currentDisposable = [[SMetaDisposable alloc] init];
_queuedSignals = queueMode ? [[NSMutableArray alloc] init] : nil;
_queueMode = queueMode;
} }
return self; return self;
} }
@ -38,50 +44,89 @@
_disposable = disposable; _disposable = disposable;
} }
- (void)switchToSignal:(SSignal *)signal - (void)enqueueSignal:(SSignal *)signal
{ {
bool startSignal = false;
OSSpinLockLock(&_lock); OSSpinLockLock(&_lock);
_didSwitch = true; if (_queueMode && _executingSignal)
{
[_queuedSignals addObject:signal];
}
else
{
_executingSignal = true;
startSignal = true;
}
OSSpinLockUnlock(&_lock); OSSpinLockUnlock(&_lock);
id<SDisposable> disposable = [signal startWithNext:^(id next) if (startSignal)
{ {
[_subscriber putNext:next]; id<SDisposable> disposable = [signal startWithNext:^(id next)
} error:^(id error) {
{ [_subscriber putNext:next];
[_subscriber putError:error]; } error:^(id error)
} completed:^ {
{ [_subscriber putError:error];
OSSpinLockLock(&_lock); } completed:^
_didSwitch = false; {
OSSpinLockUnlock(&_lock); [self headCompleted];
}];
[self maybeComplete]; [_currentDisposable setDisposable:disposable];
}]; }
[_currentDisposable setDisposable:disposable];
} }
- (void)maybeComplete - (void)headCompleted
{ {
SSignal *nextSignal = nil;
bool terminated = false; bool terminated = false;
OSSpinLockLock(&_lock); OSSpinLockLock(&_lock);
terminated = _terminated; _executingSignal = false;
if (_queueMode)
{
if (_queuedSignals.count != 0)
{
nextSignal = _queuedSignals[0];
[_queuedSignals removeObjectAtIndex:0];
_executingSignal = true;
}
else
terminated = _terminated;
}
else
terminated = _terminated;
OSSpinLockUnlock(&_lock); OSSpinLockUnlock(&_lock);
if (terminated) if (terminated)
[_subscriber putCompletion]; [_subscriber putCompletion];
else if (nextSignal != nil)
{
id<SDisposable> disposable = [nextSignal startWithNext:^(id next)
{
[_subscriber putNext:next];
} error:^(id error)
{
[_subscriber putError:error];
} completed:^
{
[self headCompleted];
}];
[_currentDisposable setDisposable:disposable];
}
} }
- (void)beginCompletion - (void)beginCompletion
{ {
bool didSwitch = false; bool executingSignal = false;
OSSpinLockLock(&_lock); OSSpinLockLock(&_lock);
didSwitch = _didSwitch; executingSignal = _executingSignal;
_terminated = true; _terminated = true;
OSSpinLockUnlock(&_lock); OSSpinLockUnlock(&_lock);
if (!didSwitch) if (!executingSignal)
[_subscriber putCompletion]; [_subscriber putCompletion];
} }
@ -99,11 +144,11 @@
{ {
return [[SSignal alloc] initWithGenerator:^id<SDisposable> (SSubscriber *subscriber) return [[SSignal alloc] initWithGenerator:^id<SDisposable> (SSubscriber *subscriber)
{ {
SSignalSwitchToLatestState *state = [[SSignalSwitchToLatestState alloc] initWithSubscriber:subscriber]; SSignalQueueState *state = [[SSignalQueueState alloc] initWithSubscriber:subscriber queueMode:false];
[state beginWithDisposable:[self startWithNext:^(id next) [state beginWithDisposable:[self startWithNext:^(id next)
{ {
[state switchToSignal:next]; [state enqueueSignal:next];
} error:^(id error) } error:^(id error)
{ {
[subscriber putError:error]; [subscriber putError:error];
@ -154,4 +199,25 @@
}]; }];
} }
- (SSignal *)queue
{
return [[SSignal alloc] initWithGenerator:^id<SDisposable> (SSubscriber *subscriber)
{
SSignalQueueState *state = [[SSignalQueueState alloc] initWithSubscriber:subscriber queueMode:true];
[state beginWithDisposable:[self startWithNext:^(id next)
{
[state enqueueSignal:next];
} error:^(id error)
{
[subscriber putError:error];
} completed:^
{
[state beginCompletion];
}]];
return state;
}];
}
@end @end

View File

@ -6,5 +6,6 @@
- (SSignal *)delay:(NSTimeInterval)seconds onQueue:(SQueue *)queue; - (SSignal *)delay:(NSTimeInterval)seconds onQueue:(SQueue *)queue;
- (SSignal *)timeout:(NSTimeInterval)seconds onQueue:(SQueue *)queue orSignal:(SSignal *)signal; - (SSignal *)timeout:(NSTimeInterval)seconds onQueue:(SQueue *)queue orSignal:(SSignal *)signal;
- (SSignal *)wait:(NSTimeInterval)seconds;
@end @end

View File

@ -4,6 +4,8 @@
#import "SDisposableSet.h" #import "SDisposableSet.h"
#import "SBlockDisposable.h" #import "SBlockDisposable.h"
#import "SSignal+Dispatch.h"
#import "STimer.h" #import "STimer.h"
@implementation SSignal (Timing) @implementation SSignal (Timing)
@ -78,4 +80,30 @@
}]; }];
} }
- (SSignal *)wait:(NSTimeInterval)seconds
{
return [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);
id<SDisposable> disposable = [self startWithNext:^(id next)
{
dispatch_semaphore_signal(semaphore);
[subscriber putNext:next];
} error:^(id error)
{
dispatch_semaphore_signal(semaphore);
[subscriber putError:error];
} completed:^
{
dispatch_semaphore_signal(semaphore);
[subscriber putCompletion];
}];
dispatch_semaphore_wait(semaphore, dispatch_time(DISPATCH_TIME_NOW, (int64_t)(seconds * NSEC_PER_SEC)));
return disposable;
}];
}
@end @end

View File

@ -216,7 +216,7 @@
{ {
dispatch_async(queue, ^ dispatch_async(queue, ^
{ {
usleep(100); usleep(200);
[subscriber putNext:@1]; [subscriber putNext:@1];
}); });
@ -403,4 +403,256 @@
XCTAssertTrue(completedAll); XCTAssertTrue(completedAll);
} }
- (void)testQueue
{
dispatch_queue_t queue = dispatch_queue_create(NULL, 0);
__block bool disposedFirst = false;
__block bool disposedSecond = false;
__block bool disposedThird = false;
__block int result = 0;
SSignal *firstSignal = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
dispatch_async(queue, ^
{
usleep(100);
[subscriber putNext:@1];
[subscriber putCompletion];
});
return [[SBlockDisposable alloc] initWithBlock:^
{
disposedFirst = true;
}];
}];
SSignal *secondSignal = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
dispatch_async(queue, ^
{
usleep(100);
[subscriber putNext:@2];
[subscriber putCompletion];
});
return [[SBlockDisposable alloc] initWithBlock:^
{
disposedSecond = true;
}];
}];
SSignal *thirdSignal = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
dispatch_async(queue, ^
{
usleep(100);
[subscriber putNext:@3];
[subscriber putCompletion];
});
return [[SBlockDisposable alloc] initWithBlock:^
{
disposedThird = true;
}];
}];
SSignal *signal = [[[[SSignal single:firstSignal] then:[SSignal single:secondSignal]] then:[SSignal single:thirdSignal]] queue];
[signal startWithNext:^(id next)
{
result += [next intValue];
}];
usleep(1000);
XCTAssertEqual(result, 6);
XCTAssertTrue(disposedFirst);
XCTAssertTrue(disposedSecond);
XCTAssertTrue(disposedThird);
}
- (void)testQueueInterrupted
{
dispatch_queue_t queue = dispatch_queue_create(NULL, 0);
__block bool disposedFirst = false;
__block bool disposedSecond = false;
__block bool disposedThird = false;
__block bool startedThird = false;
__block int result = 0;
SSignal *firstSignal = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
dispatch_async(queue, ^
{
usleep(100);
[subscriber putNext:@1];
[subscriber putCompletion];
});
return [[SBlockDisposable alloc] initWithBlock:^
{
disposedFirst = true;
}];
}];
SSignal *secondSignal = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
dispatch_async(queue, ^
{
usleep(100);
[subscriber putNext:@2];
[subscriber putError:nil];
});
return [[SBlockDisposable alloc] initWithBlock:^
{
disposedSecond = true;
}];
}];
SSignal *thirdSignal = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
startedThird = true;
dispatch_async(queue, ^
{
usleep(100);
[subscriber putNext:@3];
[subscriber putCompletion];
});
return [[SBlockDisposable alloc] initWithBlock:^
{
disposedThird = true;
}];
}];
SSignal *signal = [[[[SSignal single:firstSignal] then:[SSignal single:secondSignal]] then:[SSignal single:thirdSignal]] queue];
[signal startWithNext:^(id next)
{
result += [next intValue];
}];
usleep(1000);
XCTAssertEqual(result, 3);
XCTAssertTrue(disposedFirst);
XCTAssertTrue(disposedSecond);
XCTAssertFalse(startedThird);
XCTAssertFalse(disposedThird);
}
- (void)testQueueDisposed
{
dispatch_queue_t queue = dispatch_queue_create(NULL, 0);
__block bool disposedFirst = false;
__block bool disposedSecond = false;
__block bool disposedThird = false;
__block bool startedFirst = false;
__block bool startedSecond = false;
__block bool startedThird = false;
__block int result = 0;
SSignal *firstSignal = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
startedFirst = true;
__block bool cancelled = false;
dispatch_async(queue, ^
{
if (!cancelled)
{
usleep(100);
[subscriber putNext:@1];
[subscriber putCompletion];
}
});
return [[SBlockDisposable alloc] initWithBlock:^
{
cancelled = true;
disposedFirst = true;
}];
}];
SSignal *secondSignal = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
startedSecond = true;
__block bool cancelled = false;
dispatch_async(queue, ^
{
if (!cancelled)
{
usleep(100);
[subscriber putNext:@2];
[subscriber putError:nil];
}
});
return [[SBlockDisposable alloc] initWithBlock:^
{
cancelled = true;
disposedSecond = true;
}];
}];
SSignal *thirdSignal = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
startedThird = true;
dispatch_async(queue, ^
{
usleep(100);
[subscriber putNext:@3];
[subscriber putCompletion];
});
return [[SBlockDisposable alloc] initWithBlock:^
{
disposedThird = true;
}];
}];
SSignal *signal = [[[[SSignal single:firstSignal] then:[SSignal single:secondSignal]] then:[SSignal single:thirdSignal]] queue];
[[signal startWithNext:^(id next)
{
result += [next intValue];
}] dispose];
usleep(1000);
XCTAssertEqual(result, 0);
XCTAssertTrue(disposedFirst);
XCTAssertFalse(disposedSecond);
XCTAssertFalse(disposedThird);
XCTAssertTrue(startedFirst);
XCTAssertFalse(startedSecond);
XCTAssertFalse(startedThird);
}
- (void)testWaitSameQueue
{
SSignal *signal = [[SSignal alloc] initWithGenerator:^id<SDisposable>(SSubscriber *subscriber)
{
dispatch_async(dispatch_get_main_queue(), ^
{
[subscriber putNext:@(1)];
[subscriber putCompletion];
});
return nil;
}];
CFAbsoluteTime startTime = CFAbsoluteTimeGetCurrent();
[[signal wait:2.0 onQueue:[SQueue concurrentDefaultQueue]] startWithNext:^(__unused id next)
{
}];
XCTAssert(startTime < 0.5);
}
@end @end