diff --git a/SSignalKit/SSignal+Meta.h b/SSignalKit/SSignal+Meta.h index a9ab8c6a7c..2400695ad9 100644 --- a/SSignalKit/SSignal+Meta.h +++ b/SSignalKit/SSignal+Meta.h @@ -1,9 +1,12 @@ #import "SSignal.h" +@class SQueue; + @interface SSignal (Meta) - (SSignal *)switchToLatest; - (SSignal *)mapToSignal:(SSignal *(^)(id))f; - (SSignal *)then:(SSignal *)signal; +- (SSignal *)queue; @end diff --git a/SSignalKit/SSignal+Meta.m b/SSignalKit/SSignal+Meta.m index 16ccfd7069..79c0652c92 100644 --- a/SSignalKit/SSignal+Meta.m +++ b/SSignalKit/SSignal+Meta.m @@ -7,28 +7,34 @@ #import -@interface SSignalSwitchToLatestState : NSObject +@interface SSignalQueueState : NSObject { OSSpinLock _lock; - bool _didSwitch; + bool _executingSignal; bool _terminated; id _disposable; SMetaDisposable *_currentDisposable; SSubscriber *_subscriber; + + NSMutableArray *_queuedSignals; + bool _queueMode; + } @end -@implementation SSignalSwitchToLatestState +@implementation SSignalQueueState -- (instancetype)initWithSubscriber:(SSubscriber *)subscriber +- (instancetype)initWithSubscriber:(SSubscriber *)subscriber queueMode:(bool)queueMode { self = [super init]; if (self != nil) { _subscriber = subscriber; _currentDisposable = [[SMetaDisposable alloc] init]; + _queuedSignals = queueMode ? [[NSMutableArray alloc] init] : nil; + _queueMode = queueMode; } return self; } @@ -38,50 +44,89 @@ _disposable = disposable; } -- (void)switchToSignal:(SSignal *)signal +- (void)enqueueSignal:(SSignal *)signal { + bool startSignal = false; OSSpinLockLock(&_lock); - _didSwitch = true; + if (_queueMode && _executingSignal) + { + [_queuedSignals addObject:signal]; + } + else + { + _executingSignal = true; + startSignal = true; + } OSSpinLockUnlock(&_lock); - id disposable = [signal startWithNext:^(id next) + if (startSignal) { - [_subscriber putNext:next]; - } error:^(id error) - { - [_subscriber putError:error]; - } completed:^ - { - OSSpinLockLock(&_lock); - _didSwitch = false; - OSSpinLockUnlock(&_lock); + id disposable = [signal startWithNext:^(id next) + { + [_subscriber putNext:next]; + } error:^(id error) + { + [_subscriber putError:error]; + } completed:^ + { + [self headCompleted]; + }]; - [self maybeComplete]; - }]; - - [_currentDisposable setDisposable:disposable]; + [_currentDisposable setDisposable:disposable]; + } } -- (void)maybeComplete +- (void)headCompleted { + SSignal *nextSignal = nil; + bool terminated = false; OSSpinLockLock(&_lock); - terminated = _terminated; + _executingSignal = false; + + if (_queueMode) + { + if (_queuedSignals.count != 0) + { + nextSignal = _queuedSignals[0]; + [_queuedSignals removeObjectAtIndex:0]; + _executingSignal = true; + } + else + terminated = _terminated; + } + else + terminated = _terminated; OSSpinLockUnlock(&_lock); if (terminated) [_subscriber putCompletion]; + else if (nextSignal != nil) + { + id disposable = [nextSignal startWithNext:^(id next) + { + [_subscriber putNext:next]; + } error:^(id error) + { + [_subscriber putError:error]; + } completed:^ + { + [self headCompleted]; + }]; + + [_currentDisposable setDisposable:disposable]; + } } - (void)beginCompletion { - bool didSwitch = false; + bool executingSignal = false; OSSpinLockLock(&_lock); - didSwitch = _didSwitch; + executingSignal = _executingSignal; _terminated = true; OSSpinLockUnlock(&_lock); - if (!didSwitch) + if (!executingSignal) [_subscriber putCompletion]; } @@ -99,11 +144,11 @@ { return [[SSignal alloc] initWithGenerator:^id (SSubscriber *subscriber) { - SSignalSwitchToLatestState *state = [[SSignalSwitchToLatestState alloc] initWithSubscriber:subscriber]; + SSignalQueueState *state = [[SSignalQueueState alloc] initWithSubscriber:subscriber queueMode:false]; [state beginWithDisposable:[self startWithNext:^(id next) { - [state switchToSignal:next]; + [state enqueueSignal:next]; } error:^(id error) { [subscriber putError:error]; @@ -154,4 +199,25 @@ }]; } +- (SSignal *)queue +{ + return [[SSignal alloc] initWithGenerator:^id (SSubscriber *subscriber) + { + SSignalQueueState *state = [[SSignalQueueState alloc] initWithSubscriber:subscriber queueMode:true]; + + [state beginWithDisposable:[self startWithNext:^(id next) + { + [state enqueueSignal:next]; + } error:^(id error) + { + [subscriber putError:error]; + } completed:^ + { + [state beginCompletion]; + }]]; + + return state; + }]; +} + @end diff --git a/SSignalKit/SSignal+Timing.h b/SSignalKit/SSignal+Timing.h index beadb39fd4..68c0fe6ca8 100644 --- a/SSignalKit/SSignal+Timing.h +++ b/SSignalKit/SSignal+Timing.h @@ -6,5 +6,6 @@ - (SSignal *)delay:(NSTimeInterval)seconds onQueue:(SQueue *)queue; - (SSignal *)timeout:(NSTimeInterval)seconds onQueue:(SQueue *)queue orSignal:(SSignal *)signal; +- (SSignal *)wait:(NSTimeInterval)seconds; @end diff --git a/SSignalKit/SSignal+Timing.m b/SSignalKit/SSignal+Timing.m index 09e1bfad1e..3d0dbc4a97 100644 --- a/SSignalKit/SSignal+Timing.m +++ b/SSignalKit/SSignal+Timing.m @@ -4,6 +4,8 @@ #import "SDisposableSet.h" #import "SBlockDisposable.h" +#import "SSignal+Dispatch.h" + #import "STimer.h" @implementation SSignal (Timing) @@ -78,4 +80,30 @@ }]; } +- (SSignal *)wait:(NSTimeInterval)seconds +{ + return [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + dispatch_semaphore_t semaphore = dispatch_semaphore_create(0); + + id disposable = [self startWithNext:^(id next) + { + dispatch_semaphore_signal(semaphore); + [subscriber putNext:next]; + } error:^(id error) + { + dispatch_semaphore_signal(semaphore); + [subscriber putError:error]; + } completed:^ + { + dispatch_semaphore_signal(semaphore); + [subscriber putCompletion]; + }]; + + dispatch_semaphore_wait(semaphore, dispatch_time(DISPATCH_TIME_NOW, (int64_t)(seconds * NSEC_PER_SEC))); + + return disposable; + }]; +} + @end diff --git a/SSignalKitTests/SSignalBasicTests.m b/SSignalKitTests/SSignalBasicTests.m index e4d871b348..acbef9e2d1 100644 --- a/SSignalKitTests/SSignalBasicTests.m +++ b/SSignalKitTests/SSignalBasicTests.m @@ -216,7 +216,7 @@ { dispatch_async(queue, ^ { - usleep(100); + usleep(200); [subscriber putNext:@1]; }); @@ -403,4 +403,256 @@ XCTAssertTrue(completedAll); } +- (void)testQueue +{ + dispatch_queue_t queue = dispatch_queue_create(NULL, 0); + + __block bool disposedFirst = false; + __block bool disposedSecond = false; + __block bool disposedThird = false; + __block int result = 0; + + SSignal *firstSignal = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + dispatch_async(queue, ^ + { + usleep(100); + [subscriber putNext:@1]; + [subscriber putCompletion]; + }); + + return [[SBlockDisposable alloc] initWithBlock:^ + { + disposedFirst = true; + }]; + }]; + + SSignal *secondSignal = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + dispatch_async(queue, ^ + { + usleep(100); + [subscriber putNext:@2]; + [subscriber putCompletion]; + }); + + return [[SBlockDisposable alloc] initWithBlock:^ + { + disposedSecond = true; + }]; + }]; + + SSignal *thirdSignal = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + dispatch_async(queue, ^ + { + usleep(100); + [subscriber putNext:@3]; + [subscriber putCompletion]; + }); + + return [[SBlockDisposable alloc] initWithBlock:^ + { + disposedThird = true; + }]; + }]; + + SSignal *signal = [[[[SSignal single:firstSignal] then:[SSignal single:secondSignal]] then:[SSignal single:thirdSignal]] queue]; + [signal startWithNext:^(id next) + { + result += [next intValue]; + }]; + + usleep(1000); + + XCTAssertEqual(result, 6); + XCTAssertTrue(disposedFirst); + XCTAssertTrue(disposedSecond); + XCTAssertTrue(disposedThird); +} + +- (void)testQueueInterrupted +{ + dispatch_queue_t queue = dispatch_queue_create(NULL, 0); + + __block bool disposedFirst = false; + __block bool disposedSecond = false; + __block bool disposedThird = false; + __block bool startedThird = false; + __block int result = 0; + + SSignal *firstSignal = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + dispatch_async(queue, ^ + { + usleep(100); + [subscriber putNext:@1]; + [subscriber putCompletion]; + }); + + return [[SBlockDisposable alloc] initWithBlock:^ + { + disposedFirst = true; + }]; + }]; + + SSignal *secondSignal = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + dispatch_async(queue, ^ + { + usleep(100); + [subscriber putNext:@2]; + [subscriber putError:nil]; + }); + + return [[SBlockDisposable alloc] initWithBlock:^ + { + disposedSecond = true; + }]; + }]; + + SSignal *thirdSignal = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + startedThird = true; + + dispatch_async(queue, ^ + { + usleep(100); + [subscriber putNext:@3]; + [subscriber putCompletion]; + }); + + return [[SBlockDisposable alloc] initWithBlock:^ + { + disposedThird = true; + }]; + }]; + + SSignal *signal = [[[[SSignal single:firstSignal] then:[SSignal single:secondSignal]] then:[SSignal single:thirdSignal]] queue]; + [signal startWithNext:^(id next) + { + result += [next intValue]; + }]; + + usleep(1000); + + XCTAssertEqual(result, 3); + XCTAssertTrue(disposedFirst); + XCTAssertTrue(disposedSecond); + XCTAssertFalse(startedThird); + XCTAssertFalse(disposedThird); +} + +- (void)testQueueDisposed +{ + dispatch_queue_t queue = dispatch_queue_create(NULL, 0); + + __block bool disposedFirst = false; + __block bool disposedSecond = false; + __block bool disposedThird = false; + __block bool startedFirst = false; + __block bool startedSecond = false; + __block bool startedThird = false; + __block int result = 0; + + SSignal *firstSignal = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + startedFirst = true; + + __block bool cancelled = false; + dispatch_async(queue, ^ + { + if (!cancelled) + { + usleep(100); + [subscriber putNext:@1]; + [subscriber putCompletion]; + } + }); + + return [[SBlockDisposable alloc] initWithBlock:^ + { + cancelled = true; + disposedFirst = true; + }]; + }]; + + SSignal *secondSignal = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + startedSecond = true; + + __block bool cancelled = false; + dispatch_async(queue, ^ + { + if (!cancelled) + { + usleep(100); + [subscriber putNext:@2]; + [subscriber putError:nil]; + } + }); + + return [[SBlockDisposable alloc] initWithBlock:^ + { + cancelled = true; + disposedSecond = true; + }]; + }]; + + SSignal *thirdSignal = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + startedThird = true; + + dispatch_async(queue, ^ + { + usleep(100); + [subscriber putNext:@3]; + [subscriber putCompletion]; + }); + + return [[SBlockDisposable alloc] initWithBlock:^ + { + disposedThird = true; + }]; + }]; + + SSignal *signal = [[[[SSignal single:firstSignal] then:[SSignal single:secondSignal]] then:[SSignal single:thirdSignal]] queue]; + [[signal startWithNext:^(id next) + { + result += [next intValue]; + }] dispose]; + + usleep(1000); + + XCTAssertEqual(result, 0); + XCTAssertTrue(disposedFirst); + XCTAssertFalse(disposedSecond); + XCTAssertFalse(disposedThird); + + XCTAssertTrue(startedFirst); + XCTAssertFalse(startedSecond); + XCTAssertFalse(startedThird); +} + +- (void)testWaitSameQueue +{ + SSignal *signal = [[SSignal alloc] initWithGenerator:^id(SSubscriber *subscriber) + { + dispatch_async(dispatch_get_main_queue(), ^ + { + [subscriber putNext:@(1)]; + [subscriber putCompletion]; + }); + + return nil; + }]; + + CFAbsoluteTime startTime = CFAbsoluteTimeGetCurrent(); + [[signal wait:2.0 onQueue:[SQueue concurrentDefaultQueue]] startWithNext:^(__unused id next) + { + + }]; + XCTAssert(startTime < 0.5); +} + @end