From c41a6d09cc04f7b7ac7da9cec6042825e5a285c4 Mon Sep 17 00:00:00 2001 From: Peter Date: Fri, 27 Mar 2015 13:14:50 +0300 Subject: [PATCH] no message --- SSignalKit.xcodeproj/project.pbxproj | 16 +++ SSignalKit/SSignal+Dispatch.m | 29 ++--- SSignalKit/SSignal.m | 39 ++++-- SSignalKit/SThreadPool.h | 15 +-- SSignalKit/SThreadPool.m | 177 ++++++--------------------- SSignalKit/SThreadPoolQueue.h | 13 ++ SSignalKit/SThreadPoolQueue.m | 51 ++++++++ SSignalKit/SThreadPoolTask.h | 9 ++ SSignalKit/SThreadPoolTask.m | 53 ++++++++ SSignalKitTests/SSignalBasicTests.m | 2 + 10 files changed, 232 insertions(+), 172 deletions(-) create mode 100644 SSignalKit/SThreadPoolQueue.h create mode 100644 SSignalKit/SThreadPoolQueue.m create mode 100644 SSignalKit/SThreadPoolTask.h create mode 100644 SSignalKit/SThreadPoolTask.m diff --git a/SSignalKit.xcodeproj/project.pbxproj b/SSignalKit.xcodeproj/project.pbxproj index 0633d6faa2..f9d10025eb 100644 --- a/SSignalKit.xcodeproj/project.pbxproj +++ b/SSignalKit.xcodeproj/project.pbxproj @@ -80,6 +80,10 @@ D087632A1A839EDC00632240 /* SDisposableSet.h in Headers */ = {isa = PBXBuildFile; fileRef = D08763281A839EDC00632240 /* SDisposableSet.h */; settings = {ATTRIBUTES = (Public, ); }; }; D087632B1A839EDC00632240 /* SDisposableSet.m in Sources */ = {isa = PBXBuildFile; fileRef = D08763291A839EDC00632240 /* SDisposableSet.m */; }; D087632C1A839EE800632240 /* SDisposableSet.m in Sources */ = {isa = PBXBuildFile; fileRef = D08763291A839EDC00632240 /* SDisposableSet.m */; }; + D089E0311AC48EA7009A744B /* SThreadPoolTask.m in Sources */ = {isa = PBXBuildFile; fileRef = D089E02D1AC48EA7009A744B /* SThreadPoolTask.m */; }; + D089E0321AC48EA7009A744B /* SThreadPoolQueue.m in Sources */ = {isa = PBXBuildFile; fileRef = D089E02E1AC48EA7009A744B /* SThreadPoolQueue.m */; }; + D089E0331AC48EA7009A744B /* SThreadPoolQueue.h in Headers */ = {isa = PBXBuildFile; fileRef = D089E02F1AC48EA7009A744B /* SThreadPoolQueue.h */; settings = {ATTRIBUTES = (Public, ); }; }; + D089E0341AC48EA7009A744B /* SThreadPoolTask.h in Headers */ = {isa = PBXBuildFile; fileRef = D089E0301AC48EA7009A744B /* SThreadPoolTask.h */; settings = {ATTRIBUTES = (Public, ); }; }; /* End PBXBuildFile section */ /* Begin PBXContainerItemProxy section */ @@ -161,6 +165,10 @@ D06F10721A85882000485185 /* SSignalPerformanceTests.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = SSignalPerformanceTests.m; sourceTree = ""; }; D08763281A839EDC00632240 /* SDisposableSet.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = SDisposableSet.h; sourceTree = ""; }; D08763291A839EDC00632240 /* SDisposableSet.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = SDisposableSet.m; sourceTree = ""; }; + D089E02D1AC48EA7009A744B /* SThreadPoolTask.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = SThreadPoolTask.m; sourceTree = ""; }; + D089E02E1AC48EA7009A744B /* SThreadPoolQueue.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = SThreadPoolQueue.m; sourceTree = ""; }; + D089E02F1AC48EA7009A744B /* SThreadPoolQueue.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = SThreadPoolQueue.h; sourceTree = ""; }; + D089E0301AC48EA7009A744B /* SThreadPoolTask.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = SThreadPoolTask.h; sourceTree = ""; }; /* End PBXFileReference section */ /* Begin PBXFrameworksBuildPhase section */ @@ -252,6 +260,10 @@ D0445E1D1A7C2D7300267924 /* SSubscriber.m */, D0445E1E1A7C2D7300267924 /* SThreadPool.h */, D0445E1F1A7C2D7300267924 /* SThreadPool.m */, + D089E02F1AC48EA7009A744B /* SThreadPoolQueue.h */, + D089E02E1AC48EA7009A744B /* SThreadPoolQueue.m */, + D089E0301AC48EA7009A744B /* SThreadPoolTask.h */, + D089E02D1AC48EA7009A744B /* SThreadPoolTask.m */, D0445E4C1A7C2D8A00267924 /* SQueue.h */, D0445E4D1A7C2D8A00267924 /* SQueue.m */, D0445E521A7C2E9D00267924 /* STimer.h */, @@ -301,9 +313,11 @@ D0445E481A7C2D7300267924 /* SSubscriber.h in Headers */, D0445E2E1A7C2D7300267924 /* SMulticastSignalManager.h in Headers */, D0445E271A7C2D7300267924 /* SBlockDisposable.h in Headers */, + D089E0331AC48EA7009A744B /* SThreadPoolQueue.h in Headers */, D0445E4A1A7C2D7300267924 /* SThreadPool.h in Headers */, D0445E541A7C2E9D00267924 /* STimer.h in Headers */, D0445E301A7C2D7300267924 /* SSignal.h in Headers */, + D089E0341AC48EA7009A744B /* SThreadPoolTask.h in Headers */, D0445E321A7C2D7300267924 /* SSignal+Accumulate.h in Headers */, D0445E361A7C2D7300267924 /* SSignal+Combine.h in Headers */, D0445E2A1A7C2D7300267924 /* SDisposable.h in Headers */, @@ -449,6 +463,7 @@ D0445E241A7C2D7300267924 /* SAtomic.m in Sources */, D0445E4B1A7C2D7300267924 /* SThreadPool.m in Sources */, D0445E551A7C2E9D00267924 /* STimer.m in Sources */, + D089E0321AC48EA7009A744B /* SThreadPoolQueue.m in Sources */, D0445E3D1A7C2D7300267924 /* SSignal+Mapping.m in Sources */, D0445E351A7C2D7300267924 /* SSignal+Catch.m in Sources */, D0445E411A7C2D7300267924 /* SSignal+Multicast.m in Sources */, @@ -458,6 +473,7 @@ D0445E471A7C2D7300267924 /* SSignal+Timing.m in Sources */, D0445E311A7C2D7300267924 /* SSignal.m in Sources */, D087632B1A839EDC00632240 /* SDisposableSet.m in Sources */, + D089E0311AC48EA7009A744B /* SThreadPoolTask.m in Sources */, D0445E491A7C2D7300267924 /* SSubscriber.m in Sources */, D0445E431A7C2D7300267924 /* SSignal+SideEffects.m in Sources */, D0445E3F1A7C2D7300267924 /* SSignal+Meta.m in Sources */, diff --git a/SSignalKit/SSignal+Dispatch.m b/SSignalKit/SSignal+Dispatch.m index b23337e3fa..2a72201ae4 100644 --- a/SSignalKit/SSignal+Dispatch.m +++ b/SSignalKit/SSignal+Dispatch.m @@ -35,40 +35,31 @@ { return [[SSignal alloc] initWithGenerator:^id (SSubscriber *subscriber) { - SAtomic *atomicLastTask = [[SAtomic alloc] initWithValue:nil]; + SThreadPoolQueue *queue = [threadPool nextQueue]; return [self startWithNext:^(id next) { - SThreadPoolTask *task = [threadPool prepareTask:^(bool (^cancelled)()) + SThreadPoolTask *task = [[SThreadPoolTask alloc] initWithBlock:^(bool (^cancelled)()) { if (!cancelled()) [subscriber putNext:next]; }]; - SThreadPoolTask *lastTask = [atomicLastTask swap:task]; - if (lastTask != nil) - [task addDependency:lastTask]; - [threadPool startTask:task]; + [queue addTask:task]; } error:^(id error) { - SThreadPoolTask *task = [threadPool prepareTask:^(bool (^cancelled)()) + SThreadPoolTask *task = [[SThreadPoolTask alloc] initWithBlock:^(bool (^cancelled)()) { if (!cancelled()) [subscriber putError:error]; }]; - SThreadPoolTask *lastTask = [atomicLastTask swap:task]; - if (lastTask != nil) - [task addDependency:lastTask]; - [threadPool startTask:task]; + [queue addTask:task]; } completed:^ { - SThreadPoolTask *task = [threadPool prepareTask:^(bool (^cancelled)()) + SThreadPoolTask *task = [[SThreadPoolTask alloc] initWithBlock:^(bool (^cancelled)()) { if (!cancelled()) [subscriber putCompletion]; }]; - SThreadPoolTask *lastTask = [atomicLastTask swap:task]; - if (lastTask != nil) - [task addDependency:lastTask]; - [threadPool startTask:task]; + [queue addTask:task]; }]; }]; } @@ -111,7 +102,7 @@ { SMetaDisposable *disposable = [[SMetaDisposable alloc] init]; - id taskId = [threadPool addTask:^(bool (^cancelled)()) + SThreadPoolTask *task = [[SThreadPoolTask alloc] initWithBlock:^(bool (^cancelled)()) { if (cancelled && cancelled()) return; @@ -130,9 +121,11 @@ [disposable setDisposable:[[SBlockDisposable alloc] initWithBlock:^ { - [threadPool cancelTask:taskId]; + [task cancel]; }]]; + [threadPool addTask:task]; + return disposable; }]; } diff --git a/SSignalKit/SSignal.m b/SSignalKit/SSignal.m index 1cae763a69..8c7d52d877 100644 --- a/SSignalKit/SSignal.m +++ b/SSignalKit/SSignal.m @@ -2,6 +2,35 @@ #import "SBlockDisposable.h" +@interface SSubscriberDisposable : NSObject +{ + SSubscriber *_subscriber; + id _disposable; +} + +@end + +@implementation SSubscriberDisposable + +- (instancetype)initWithSubscriber:(SSubscriber *)subscriber disposable:(id)disposable +{ + self = [super init]; + if (self != nil) + { + _subscriber = subscriber; + _disposable = disposable; + } + return self; +} + +- (void)dispose +{ + [_subscriber _markTerminatedWithoutDisposal]; + [_disposable dispose]; +} + +@end + @interface SSignal () { } @@ -25,11 +54,7 @@ SSubscriber *subscriber = [[SSubscriber alloc] initWithNext:next error:error completed:completed]; id disposable = _generator(subscriber); [subscriber _assignDisposable:disposable]; - return [[SBlockDisposable alloc] initWithBlock:^ - { - [subscriber _markTerminatedWithoutDisposal]; - [disposable dispose]; - }]; + return [[SSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable]; } - (id)startWithNext:(void (^)(id next))next @@ -37,7 +62,7 @@ SSubscriber *subscriber = [[SSubscriber alloc] initWithNext:next error:nil completed:nil]; id disposable = _generator(subscriber); [subscriber _assignDisposable:disposable]; - return subscriber; + return [[SSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable]; } - (id)startWithNext:(void (^)(id next))next completed:(void (^)())completed @@ -45,7 +70,7 @@ SSubscriber *subscriber = [[SSubscriber alloc] initWithNext:next error:nil completed:completed]; id disposable = _generator(subscriber); [subscriber _assignDisposable:disposable]; - return subscriber; + return [[SSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable]; } @end diff --git a/SSignalKit/SThreadPool.h b/SSignalKit/SThreadPool.h index f2342081f7..69d0565938 100644 --- a/SSignalKit/SThreadPool.h +++ b/SSignalKit/SThreadPool.h @@ -1,18 +1,15 @@ #import -@interface SThreadPoolTask : NSObject - -- (void)addDependency:(SThreadPoolTask *)task; - -@end +#import +#import @interface SThreadPool : NSObject - (instancetype)initWithThreadCount:(NSUInteger)threadCount threadPriority:(double)threadPriority; -- (id)addTask:(void (^)(bool (^)()))task; -- (SThreadPoolTask *)prepareTask:(void (^)(bool (^)()))task; -- (id)startTask:(SThreadPoolTask *)task; -- (void)cancelTask:(id)taskId; +- (void)addTask:(SThreadPoolTask *)task; + +- (SThreadPoolQueue *)nextQueue; +- (void)_workOnQueue:(SThreadPoolQueue *)queue block:(void (^)())block; @end diff --git a/SSignalKit/SThreadPool.m b/SSignalKit/SThreadPool.m index b39ef64da3..060ee29c92 100644 --- a/SSignalKit/SThreadPool.m +++ b/SSignalKit/SThreadPool.m @@ -4,82 +4,13 @@ #import #import "SQueue.h" -@class SThreadPoolOperation; - -@interface SThreadPoolTask () - -@property (nonatomic, strong, readonly) SThreadPoolOperation *operation; - -- (instancetype)initWithOperation:(SThreadPoolOperation *)operation; - -@end - -@interface SThreadPoolOperationCanelledHolder : NSObject -{ - @public - volatile bool _cancelled; -} - -@property (nonatomic, weak) SThreadPoolOperation *operation; - -@end - -@implementation SThreadPoolOperationCanelledHolder - -@end - -@interface SThreadPoolOperation : NSOperation -{ - void (^_block)(bool (^)()); -} - -@property (nonatomic, strong, readonly) SThreadPoolOperationCanelledHolder *cancelledHolder; - -@end - -@implementation SThreadPoolOperation - -- (instancetype)initWithBlock:(void (^)(bool (^)()))block -{ - self = [super init]; - if (self != nil) - { - _block = [block copy]; - _cancelledHolder = [[SThreadPoolOperationCanelledHolder alloc] init]; - _cancelledHolder.operation = self; - } - return self; -} - -- (void)main -{ - if (!_cancelledHolder->_cancelled) - { - SThreadPoolOperationCanelledHolder *cancelledHolder = _cancelledHolder; - _block(^bool - { - return cancelledHolder->_cancelled; - }); - } -} - -- (void)cancel -{ - _cancelledHolder->_cancelled = true; -} - -- (BOOL)isCancelled -{ - return _cancelledHolder->_cancelled; -} - -@end - @interface SThreadPool () { SQueue *_managementQueue; NSMutableArray *_threads; - NSMutableArray *_operations; + + NSMutableArray *_queues; + NSMutableArray *_takenQueues; pthread_mutex_t _mutex; pthread_cond_t _cond; @@ -91,39 +22,42 @@ + (void)threadEntryPoint:(SThreadPool *)threadPool { + SThreadPoolQueue *queue = nil; + while (true) { - SThreadPoolOperation *operation = nil; + SThreadPoolTask *task = nil; pthread_mutex_lock(&threadPool->_mutex); + + if (queue != nil) + { + [threadPool->_takenQueues removeObject:queue]; + if ([queue _hasTasks]) + [threadPool->_queues addObject:queue]; + } + while (true) { - while (threadPool->_operations.count == 0) + while (threadPool->_queues.count == 0) pthread_cond_wait(&threadPool->_cond, &threadPool->_mutex); - for (NSUInteger index = 0; index < threadPool->_operations.count; index++) - { - SThreadPoolOperation *maybeOperation = threadPool->_operations[index]; - if ([maybeOperation isCancelled]) - { - [threadPool->_operations removeObjectAtIndex:index]; - index--; - } - else if ([maybeOperation isReady]) - { - operation = maybeOperation; - [threadPool->_operations removeObjectAtIndex:index]; - break; - } - } + + queue = threadPool->_queues.firstObject; + task = [queue _popFirstTask]; + + if (queue != nil) + { + [threadPool->_takenQueues addObject:queue]; + [threadPool->_queues removeObjectAtIndex:0]; - if (operation != nil) break; + } } pthread_mutex_unlock(&threadPool->_mutex); @autoreleasepool { - [operation main]; + [task execute]; } } } @@ -146,7 +80,8 @@ [_managementQueue dispatch:^ { _threads = [[NSMutableArray alloc] init]; - _operations = [[NSMutableArray alloc] init]; + _queues = [[NSMutableArray alloc] init]; + _takenQueues = [[NSMutableArray alloc] init]; for (NSUInteger i = 0; i < threadCount; i++) { NSThread *thread = [[NSThread alloc] initWithTarget:[SThreadPool class] selector:@selector(threadEntryPoint:) object:self]; @@ -166,62 +101,28 @@ pthread_cond_destroy(&_cond); } -- (void)_addOperation:(SThreadPoolOperation *)operation +- (void)addTask:(SThreadPoolTask *)task { - pthread_mutex_lock(&_mutex); - [_operations addObject:operation]; - pthread_cond_signal(&_cond); - pthread_mutex_unlock(&_mutex); + SThreadPoolQueue *tempQueue = [self nextQueue]; + [tempQueue addTask:task]; } -- (id)addTask:(void (^)(bool (^)()))task +- (SThreadPoolQueue *)nextQueue { - SThreadPoolOperation *operation = [[SThreadPoolOperation alloc] initWithBlock:task]; - [_managementQueue dispatch:^ - { - [self _addOperation:operation]; - }]; - return operation.cancelledHolder; + return [[SThreadPoolQueue alloc] initWithThreadPool:self]; } -- (SThreadPoolTask *)prepareTask:(void (^)(bool (^)()))task -{ - SThreadPoolOperation *operation = [[SThreadPoolOperation alloc] initWithBlock:task]; - return [[SThreadPoolTask alloc] initWithOperation:operation]; -} - -- (id)startTask:(SThreadPoolTask *)task +- (void)_workOnQueue:(SThreadPoolQueue *)queue block:(void (^)())block { [_managementQueue dispatch:^ { - [self _addOperation:task.operation]; + pthread_mutex_lock(&_mutex); + block(); + if (![_queues containsObject:queue] && ![_takenQueues containsObject:queue]) + [_queues addObject:queue]; + pthread_cond_broadcast(&_cond); + pthread_mutex_unlock(&_mutex); }]; - return task.operation.cancelledHolder; -} - -- (void)cancelTask:(id)taskId -{ - if (taskId != nil) - ((SThreadPoolOperationCanelledHolder *)taskId)->_cancelled = true; -} - -@end - -@implementation SThreadPoolTask - -- (instancetype)initWithOperation:(SThreadPoolOperation *)operation -{ - self = [super init]; - if (self != nil) - { - _operation = operation; - } - return self; -} - -- (void)addDependency:(SThreadPoolTask *)task -{ - [_operation addDependency:task->_operation]; } @end diff --git a/SSignalKit/SThreadPoolQueue.h b/SSignalKit/SThreadPoolQueue.h new file mode 100644 index 0000000000..3d8d53b00c --- /dev/null +++ b/SSignalKit/SThreadPoolQueue.h @@ -0,0 +1,13 @@ +#import + +@class SThreadPool; +@class SThreadPoolTask; + +@interface SThreadPoolQueue : NSObject + +- (instancetype)initWithThreadPool:(SThreadPool *)threadPool; +- (void)addTask:(SThreadPoolTask *)task; +- (SThreadPoolTask *)_popFirstTask; +- (bool)_hasTasks; + +@end diff --git a/SSignalKit/SThreadPoolQueue.m b/SSignalKit/SThreadPoolQueue.m new file mode 100644 index 0000000000..ff857e642d --- /dev/null +++ b/SSignalKit/SThreadPoolQueue.m @@ -0,0 +1,51 @@ +#import "SThreadPoolQueue.h" + +#import "SThreadPool.h" + +@interface SThreadPoolQueue () +{ + __weak SThreadPool *_threadPool; + NSMutableArray *_tasks; +} + +@end + +@implementation SThreadPoolQueue + +- (instancetype)initWithThreadPool:(SThreadPool *)threadPool +{ + self = [super init]; + if (self != nil) + { + _threadPool = threadPool; + _tasks = [[NSMutableArray alloc] init]; + } + return self; +} + +- (void)addTask:(SThreadPoolTask *)task +{ + SThreadPool *threadPool = _threadPool; + [threadPool _workOnQueue:self block:^ + { + [_tasks addObject:task]; + }]; +} + +- (SThreadPoolTask *)_popFirstTask +{ + if (_tasks.count != 0) + { + SThreadPoolTask *task = _tasks[0]; + [_tasks removeObjectAtIndex:0]; + return task; + } + return nil; +} + +- (bool)_hasTasks +{ + return _tasks.count != 0; +} + +@end diff --git a/SSignalKit/SThreadPoolTask.h b/SSignalKit/SThreadPoolTask.h new file mode 100644 index 0000000000..e8da985ca0 --- /dev/null +++ b/SSignalKit/SThreadPoolTask.h @@ -0,0 +1,9 @@ +#import + +@interface SThreadPoolTask : NSObject + +- (instancetype)initWithBlock:(void (^)(bool (^)()))block; +- (void)execute; +- (void)cancel; + +@end diff --git a/SSignalKit/SThreadPoolTask.m b/SSignalKit/SThreadPoolTask.m new file mode 100644 index 0000000000..c967022e01 --- /dev/null +++ b/SSignalKit/SThreadPoolTask.m @@ -0,0 +1,53 @@ +#import "SThreadPoolTask.h" + +@interface SThreadPoolTaskState : NSObject +{ + @public + bool _cancelled; +} + +@end + +@implementation SThreadPoolTaskState + +@end + +@interface SThreadPoolTask () +{ + void (^_block)(bool (^)()); + SThreadPoolTaskState *_state; +} + +@end + +@implementation SThreadPoolTask + +- (instancetype)initWithBlock:(void (^)(bool (^)()))block +{ + self = [super init]; + if (self != nil) + { + _block = [block copy]; + _state = [[SThreadPoolTaskState alloc] init]; + } + return self; +} + +- (void)execute +{ + if (_state->_cancelled) + return; + + SThreadPoolTaskState *state = _state; + _block(^bool + { + return state->_cancelled; + }); +} + +- (void)cancel +{ + _state->_cancelled = true; +} + +@end diff --git a/SSignalKitTests/SSignalBasicTests.m b/SSignalKitTests/SSignalBasicTests.m index be66425d44..abe09c87a1 100644 --- a/SSignalKitTests/SSignalBasicTests.m +++ b/SSignalKitTests/SSignalBasicTests.m @@ -216,6 +216,7 @@ { dispatch_async(queue, ^ { + usleep(100); [subscriber putNext:@1]; }); @@ -229,6 +230,7 @@ { generated = true; } error:nil completed:nil]; + NSLog(@"dispose"); [disposable dispose]; }