diff --git a/SSignalKit/SQueue.m b/SSignalKit/SQueue.m index 91a7336c89..2d0c86296a 100644 --- a/SSignalKit/SQueue.m +++ b/SSignalKit/SQueue.m @@ -80,7 +80,12 @@ static const void *SQueueSpecificKey = &SQueueSpecificKey; - (void)dispatch:(dispatch_block_t)block { - dispatch_async(_queue, block); + if (_queueSpecific != NULL && dispatch_get_specific(SQueueSpecificKey) == _queueSpecific) + block(); + else if (_specialIsMainQueue && [NSThread isMainThread]) + block(); + else + dispatch_async(_queue, block); } - (void)dispatchSync:(dispatch_block_t)block diff --git a/SSignalKit/SSignal+Accumulate.h b/SSignalKit/SSignal+Accumulate.h index 55e534dbd5..9b0e8e9d47 100644 --- a/SSignalKit/SSignal+Accumulate.h +++ b/SSignalKit/SSignal+Accumulate.h @@ -3,5 +3,6 @@ @interface SSignal (Accumulate) - (SSignal *)reduceLeft:(id)value with:(id (^)(id, id))f; +- (SSignal *)reduceLeftWithPassthrough:(id)value with:(id (^)(id, id, void (^)(id)))f; @end diff --git a/SSignalKit/SSignal+Accumulate.m b/SSignalKit/SSignal+Accumulate.m index e19eda98aa..e237e204d0 100644 --- a/SSignalKit/SSignal+Accumulate.m +++ b/SSignalKit/SSignal+Accumulate.m @@ -23,4 +23,30 @@ }]; } +- (SSignal *)reduceLeftWithPassthrough:(id)value with:(id (^)(id, id, void (^)(id)))f +{ + return [[SSignal alloc] initWithGenerator:^(SSubscriber *subscriber) + { + __block id intermediateResult = value; + + void (^emit)(id) = ^(id next) + { + [subscriber putNext:next]; + }; + + return [self startWithNext:^(id next) + { + intermediateResult = f(intermediateResult, next, emit); + } error:^(id error) + { + [subscriber putError:error]; + } completed:^ + { + if (intermediateResult != nil) + [subscriber putNext:intermediateResult]; + [subscriber putCompletion]; + }]; + }]; +} + @end