no message

This commit is contained in:
Peter 2015-03-27 13:14:50 +03:00
parent 5fa95f6145
commit c41a6d09cc
10 changed files with 232 additions and 172 deletions

View File

@ -80,6 +80,10 @@
D087632A1A839EDC00632240 /* SDisposableSet.h in Headers */ = {isa = PBXBuildFile; fileRef = D08763281A839EDC00632240 /* SDisposableSet.h */; settings = {ATTRIBUTES = (Public, ); }; };
D087632B1A839EDC00632240 /* SDisposableSet.m in Sources */ = {isa = PBXBuildFile; fileRef = D08763291A839EDC00632240 /* SDisposableSet.m */; };
D087632C1A839EE800632240 /* SDisposableSet.m in Sources */ = {isa = PBXBuildFile; fileRef = D08763291A839EDC00632240 /* SDisposableSet.m */; };
D089E0311AC48EA7009A744B /* SThreadPoolTask.m in Sources */ = {isa = PBXBuildFile; fileRef = D089E02D1AC48EA7009A744B /* SThreadPoolTask.m */; };
D089E0321AC48EA7009A744B /* SThreadPoolQueue.m in Sources */ = {isa = PBXBuildFile; fileRef = D089E02E1AC48EA7009A744B /* SThreadPoolQueue.m */; };
D089E0331AC48EA7009A744B /* SThreadPoolQueue.h in Headers */ = {isa = PBXBuildFile; fileRef = D089E02F1AC48EA7009A744B /* SThreadPoolQueue.h */; settings = {ATTRIBUTES = (Public, ); }; };
D089E0341AC48EA7009A744B /* SThreadPoolTask.h in Headers */ = {isa = PBXBuildFile; fileRef = D089E0301AC48EA7009A744B /* SThreadPoolTask.h */; settings = {ATTRIBUTES = (Public, ); }; };
/* End PBXBuildFile section */
/* Begin PBXContainerItemProxy section */
@ -161,6 +165,10 @@
D06F10721A85882000485185 /* SSignalPerformanceTests.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = SSignalPerformanceTests.m; sourceTree = "<group>"; };
D08763281A839EDC00632240 /* SDisposableSet.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = SDisposableSet.h; sourceTree = "<group>"; };
D08763291A839EDC00632240 /* SDisposableSet.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = SDisposableSet.m; sourceTree = "<group>"; };
D089E02D1AC48EA7009A744B /* SThreadPoolTask.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = SThreadPoolTask.m; sourceTree = "<group>"; };
D089E02E1AC48EA7009A744B /* SThreadPoolQueue.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = SThreadPoolQueue.m; sourceTree = "<group>"; };
D089E02F1AC48EA7009A744B /* SThreadPoolQueue.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = SThreadPoolQueue.h; sourceTree = "<group>"; };
D089E0301AC48EA7009A744B /* SThreadPoolTask.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = SThreadPoolTask.h; sourceTree = "<group>"; };
/* End PBXFileReference section */
/* Begin PBXFrameworksBuildPhase section */
@ -252,6 +260,10 @@
D0445E1D1A7C2D7300267924 /* SSubscriber.m */,
D0445E1E1A7C2D7300267924 /* SThreadPool.h */,
D0445E1F1A7C2D7300267924 /* SThreadPool.m */,
D089E02F1AC48EA7009A744B /* SThreadPoolQueue.h */,
D089E02E1AC48EA7009A744B /* SThreadPoolQueue.m */,
D089E0301AC48EA7009A744B /* SThreadPoolTask.h */,
D089E02D1AC48EA7009A744B /* SThreadPoolTask.m */,
D0445E4C1A7C2D8A00267924 /* SQueue.h */,
D0445E4D1A7C2D8A00267924 /* SQueue.m */,
D0445E521A7C2E9D00267924 /* STimer.h */,
@ -301,9 +313,11 @@
D0445E481A7C2D7300267924 /* SSubscriber.h in Headers */,
D0445E2E1A7C2D7300267924 /* SMulticastSignalManager.h in Headers */,
D0445E271A7C2D7300267924 /* SBlockDisposable.h in Headers */,
D089E0331AC48EA7009A744B /* SThreadPoolQueue.h in Headers */,
D0445E4A1A7C2D7300267924 /* SThreadPool.h in Headers */,
D0445E541A7C2E9D00267924 /* STimer.h in Headers */,
D0445E301A7C2D7300267924 /* SSignal.h in Headers */,
D089E0341AC48EA7009A744B /* SThreadPoolTask.h in Headers */,
D0445E321A7C2D7300267924 /* SSignal+Accumulate.h in Headers */,
D0445E361A7C2D7300267924 /* SSignal+Combine.h in Headers */,
D0445E2A1A7C2D7300267924 /* SDisposable.h in Headers */,
@ -449,6 +463,7 @@
D0445E241A7C2D7300267924 /* SAtomic.m in Sources */,
D0445E4B1A7C2D7300267924 /* SThreadPool.m in Sources */,
D0445E551A7C2E9D00267924 /* STimer.m in Sources */,
D089E0321AC48EA7009A744B /* SThreadPoolQueue.m in Sources */,
D0445E3D1A7C2D7300267924 /* SSignal+Mapping.m in Sources */,
D0445E351A7C2D7300267924 /* SSignal+Catch.m in Sources */,
D0445E411A7C2D7300267924 /* SSignal+Multicast.m in Sources */,
@ -458,6 +473,7 @@
D0445E471A7C2D7300267924 /* SSignal+Timing.m in Sources */,
D0445E311A7C2D7300267924 /* SSignal.m in Sources */,
D087632B1A839EDC00632240 /* SDisposableSet.m in Sources */,
D089E0311AC48EA7009A744B /* SThreadPoolTask.m in Sources */,
D0445E491A7C2D7300267924 /* SSubscriber.m in Sources */,
D0445E431A7C2D7300267924 /* SSignal+SideEffects.m in Sources */,
D0445E3F1A7C2D7300267924 /* SSignal+Meta.m in Sources */,

View File

@ -35,40 +35,31 @@
{
return [[SSignal alloc] initWithGenerator:^id<SDisposable> (SSubscriber *subscriber)
{
SAtomic *atomicLastTask = [[SAtomic alloc] initWithValue:nil];
SThreadPoolQueue *queue = [threadPool nextQueue];
return [self startWithNext:^(id next)
{
SThreadPoolTask *task = [threadPool prepareTask:^(bool (^cancelled)())
SThreadPoolTask *task = [[SThreadPoolTask alloc] initWithBlock:^(bool (^cancelled)())
{
if (!cancelled())
[subscriber putNext:next];
}];
SThreadPoolTask *lastTask = [atomicLastTask swap:task];
if (lastTask != nil)
[task addDependency:lastTask];
[threadPool startTask:task];
[queue addTask:task];
} error:^(id error)
{
SThreadPoolTask *task = [threadPool prepareTask:^(bool (^cancelled)())
SThreadPoolTask *task = [[SThreadPoolTask alloc] initWithBlock:^(bool (^cancelled)())
{
if (!cancelled())
[subscriber putError:error];
}];
SThreadPoolTask *lastTask = [atomicLastTask swap:task];
if (lastTask != nil)
[task addDependency:lastTask];
[threadPool startTask:task];
[queue addTask:task];
} completed:^
{
SThreadPoolTask *task = [threadPool prepareTask:^(bool (^cancelled)())
SThreadPoolTask *task = [[SThreadPoolTask alloc] initWithBlock:^(bool (^cancelled)())
{
if (!cancelled())
[subscriber putCompletion];
}];
SThreadPoolTask *lastTask = [atomicLastTask swap:task];
if (lastTask != nil)
[task addDependency:lastTask];
[threadPool startTask:task];
[queue addTask:task];
}];
}];
}
@ -111,7 +102,7 @@
{
SMetaDisposable *disposable = [[SMetaDisposable alloc] init];
id taskId = [threadPool addTask:^(bool (^cancelled)())
SThreadPoolTask *task = [[SThreadPoolTask alloc] initWithBlock:^(bool (^cancelled)())
{
if (cancelled && cancelled())
return;
@ -130,9 +121,11 @@
[disposable setDisposable:[[SBlockDisposable alloc] initWithBlock:^
{
[threadPool cancelTask:taskId];
[task cancel];
}]];
[threadPool addTask:task];
return disposable;
}];
}

View File

@ -2,6 +2,35 @@
#import "SBlockDisposable.h"
@interface SSubscriberDisposable : NSObject <SDisposable>
{
SSubscriber *_subscriber;
id<SDisposable> _disposable;
}
@end
@implementation SSubscriberDisposable
- (instancetype)initWithSubscriber:(SSubscriber *)subscriber disposable:(id<SDisposable>)disposable
{
self = [super init];
if (self != nil)
{
_subscriber = subscriber;
_disposable = disposable;
}
return self;
}
- (void)dispose
{
[_subscriber _markTerminatedWithoutDisposal];
[_disposable dispose];
}
@end
@interface SSignal ()
{
}
@ -25,11 +54,7 @@
SSubscriber *subscriber = [[SSubscriber alloc] initWithNext:next error:error completed:completed];
id<SDisposable> disposable = _generator(subscriber);
[subscriber _assignDisposable:disposable];
return [[SBlockDisposable alloc] initWithBlock:^
{
[subscriber _markTerminatedWithoutDisposal];
[disposable dispose];
}];
return [[SSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable];
}
- (id<SDisposable>)startWithNext:(void (^)(id next))next
@ -37,7 +62,7 @@
SSubscriber *subscriber = [[SSubscriber alloc] initWithNext:next error:nil completed:nil];
id<SDisposable> disposable = _generator(subscriber);
[subscriber _assignDisposable:disposable];
return subscriber;
return [[SSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable];
}
- (id<SDisposable>)startWithNext:(void (^)(id next))next completed:(void (^)())completed
@ -45,7 +70,7 @@
SSubscriber *subscriber = [[SSubscriber alloc] initWithNext:next error:nil completed:completed];
id<SDisposable> disposable = _generator(subscriber);
[subscriber _assignDisposable:disposable];
return subscriber;
return [[SSubscriberDisposable alloc] initWithSubscriber:subscriber disposable:disposable];
}
@end

View File

@ -1,18 +1,15 @@
#import <Foundation/Foundation.h>
@interface SThreadPoolTask : NSObject
- (void)addDependency:(SThreadPoolTask *)task;
@end
#import <SSignalKit/SThreadPoolTask.h>
#import <SSignalKit/SThreadPoolQueue.h>
@interface SThreadPool : NSObject
- (instancetype)initWithThreadCount:(NSUInteger)threadCount threadPriority:(double)threadPriority;
- (id)addTask:(void (^)(bool (^)()))task;
- (SThreadPoolTask *)prepareTask:(void (^)(bool (^)()))task;
- (id)startTask:(SThreadPoolTask *)task;
- (void)cancelTask:(id)taskId;
- (void)addTask:(SThreadPoolTask *)task;
- (SThreadPoolQueue *)nextQueue;
- (void)_workOnQueue:(SThreadPoolQueue *)queue block:(void (^)())block;
@end

View File

@ -4,82 +4,13 @@
#import <pthread.h>
#import "SQueue.h"
@class SThreadPoolOperation;
@interface SThreadPoolTask ()
@property (nonatomic, strong, readonly) SThreadPoolOperation *operation;
- (instancetype)initWithOperation:(SThreadPoolOperation *)operation;
@end
@interface SThreadPoolOperationCanelledHolder : NSObject
{
@public
volatile bool _cancelled;
}
@property (nonatomic, weak) SThreadPoolOperation *operation;
@end
@implementation SThreadPoolOperationCanelledHolder
@end
@interface SThreadPoolOperation : NSOperation
{
void (^_block)(bool (^)());
}
@property (nonatomic, strong, readonly) SThreadPoolOperationCanelledHolder *cancelledHolder;
@end
@implementation SThreadPoolOperation
- (instancetype)initWithBlock:(void (^)(bool (^)()))block
{
self = [super init];
if (self != nil)
{
_block = [block copy];
_cancelledHolder = [[SThreadPoolOperationCanelledHolder alloc] init];
_cancelledHolder.operation = self;
}
return self;
}
- (void)main
{
if (!_cancelledHolder->_cancelled)
{
SThreadPoolOperationCanelledHolder *cancelledHolder = _cancelledHolder;
_block(^bool
{
return cancelledHolder->_cancelled;
});
}
}
- (void)cancel
{
_cancelledHolder->_cancelled = true;
}
- (BOOL)isCancelled
{
return _cancelledHolder->_cancelled;
}
@end
@interface SThreadPool ()
{
SQueue *_managementQueue;
NSMutableArray *_threads;
NSMutableArray *_operations;
NSMutableArray *_queues;
NSMutableArray *_takenQueues;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
@ -91,39 +22,42 @@
+ (void)threadEntryPoint:(SThreadPool *)threadPool
{
SThreadPoolQueue *queue = nil;
while (true)
{
SThreadPoolOperation *operation = nil;
SThreadPoolTask *task = nil;
pthread_mutex_lock(&threadPool->_mutex);
if (queue != nil)
{
[threadPool->_takenQueues removeObject:queue];
if ([queue _hasTasks])
[threadPool->_queues addObject:queue];
}
while (true)
{
while (threadPool->_operations.count == 0)
while (threadPool->_queues.count == 0)
pthread_cond_wait(&threadPool->_cond, &threadPool->_mutex);
for (NSUInteger index = 0; index < threadPool->_operations.count; index++)
{
SThreadPoolOperation *maybeOperation = threadPool->_operations[index];
if ([maybeOperation isCancelled])
{
[threadPool->_operations removeObjectAtIndex:index];
index--;
}
else if ([maybeOperation isReady])
{
operation = maybeOperation;
[threadPool->_operations removeObjectAtIndex:index];
break;
}
}
queue = threadPool->_queues.firstObject;
task = [queue _popFirstTask];
if (queue != nil)
{
[threadPool->_takenQueues addObject:queue];
[threadPool->_queues removeObjectAtIndex:0];
if (operation != nil)
break;
}
}
pthread_mutex_unlock(&threadPool->_mutex);
@autoreleasepool
{
[operation main];
[task execute];
}
}
}
@ -146,7 +80,8 @@
[_managementQueue dispatch:^
{
_threads = [[NSMutableArray alloc] init];
_operations = [[NSMutableArray alloc] init];
_queues = [[NSMutableArray alloc] init];
_takenQueues = [[NSMutableArray alloc] init];
for (NSUInteger i = 0; i < threadCount; i++)
{
NSThread *thread = [[NSThread alloc] initWithTarget:[SThreadPool class] selector:@selector(threadEntryPoint:) object:self];
@ -166,62 +101,28 @@
pthread_cond_destroy(&_cond);
}
- (void)_addOperation:(SThreadPoolOperation *)operation
- (void)addTask:(SThreadPoolTask *)task
{
pthread_mutex_lock(&_mutex);
[_operations addObject:operation];
pthread_cond_signal(&_cond);
pthread_mutex_unlock(&_mutex);
SThreadPoolQueue *tempQueue = [self nextQueue];
[tempQueue addTask:task];
}
- (id)addTask:(void (^)(bool (^)()))task
- (SThreadPoolQueue *)nextQueue
{
SThreadPoolOperation *operation = [[SThreadPoolOperation alloc] initWithBlock:task];
[_managementQueue dispatch:^
{
[self _addOperation:operation];
}];
return operation.cancelledHolder;
return [[SThreadPoolQueue alloc] initWithThreadPool:self];
}
- (SThreadPoolTask *)prepareTask:(void (^)(bool (^)()))task
{
SThreadPoolOperation *operation = [[SThreadPoolOperation alloc] initWithBlock:task];
return [[SThreadPoolTask alloc] initWithOperation:operation];
}
- (id)startTask:(SThreadPoolTask *)task
- (void)_workOnQueue:(SThreadPoolQueue *)queue block:(void (^)())block
{
[_managementQueue dispatch:^
{
[self _addOperation:task.operation];
pthread_mutex_lock(&_mutex);
block();
if (![_queues containsObject:queue] && ![_takenQueues containsObject:queue])
[_queues addObject:queue];
pthread_cond_broadcast(&_cond);
pthread_mutex_unlock(&_mutex);
}];
return task.operation.cancelledHolder;
}
- (void)cancelTask:(id)taskId
{
if (taskId != nil)
((SThreadPoolOperationCanelledHolder *)taskId)->_cancelled = true;
}
@end
@implementation SThreadPoolTask
- (instancetype)initWithOperation:(SThreadPoolOperation *)operation
{
self = [super init];
if (self != nil)
{
_operation = operation;
}
return self;
}
- (void)addDependency:(SThreadPoolTask *)task
{
[_operation addDependency:task->_operation];
}
@end

View File

@ -0,0 +1,13 @@
#import <Foundation/Foundation.h>
@class SThreadPool;
@class SThreadPoolTask;
@interface SThreadPoolQueue : NSObject
- (instancetype)initWithThreadPool:(SThreadPool *)threadPool;
- (void)addTask:(SThreadPoolTask *)task;
- (SThreadPoolTask *)_popFirstTask;
- (bool)_hasTasks;
@end

View File

@ -0,0 +1,51 @@
#import "SThreadPoolQueue.h"
#import "SThreadPool.h"
@interface SThreadPoolQueue ()
{
__weak SThreadPool *_threadPool;
NSMutableArray *_tasks;
}
@end
@implementation SThreadPoolQueue
- (instancetype)initWithThreadPool:(SThreadPool *)threadPool
{
self = [super init];
if (self != nil)
{
_threadPool = threadPool;
_tasks = [[NSMutableArray alloc] init];
}
return self;
}
- (void)addTask:(SThreadPoolTask *)task
{
SThreadPool *threadPool = _threadPool;
[threadPool _workOnQueue:self block:^
{
[_tasks addObject:task];
}];
}
- (SThreadPoolTask *)_popFirstTask
{
if (_tasks.count != 0)
{
SThreadPoolTask *task = _tasks[0];
[_tasks removeObjectAtIndex:0];
return task;
}
return nil;
}
- (bool)_hasTasks
{
return _tasks.count != 0;
}
@end

View File

@ -0,0 +1,9 @@
#import <Foundation/Foundation.h>
@interface SThreadPoolTask : NSObject
- (instancetype)initWithBlock:(void (^)(bool (^)()))block;
- (void)execute;
- (void)cancel;
@end

View File

@ -0,0 +1,53 @@
#import "SThreadPoolTask.h"
@interface SThreadPoolTaskState : NSObject
{
@public
bool _cancelled;
}
@end
@implementation SThreadPoolTaskState
@end
@interface SThreadPoolTask ()
{
void (^_block)(bool (^)());
SThreadPoolTaskState *_state;
}
@end
@implementation SThreadPoolTask
- (instancetype)initWithBlock:(void (^)(bool (^)()))block
{
self = [super init];
if (self != nil)
{
_block = [block copy];
_state = [[SThreadPoolTaskState alloc] init];
}
return self;
}
- (void)execute
{
if (_state->_cancelled)
return;
SThreadPoolTaskState *state = _state;
_block(^bool
{
return state->_cancelled;
});
}
- (void)cancel
{
_state->_cancelled = true;
}
@end

View File

@ -216,6 +216,7 @@
{
dispatch_async(queue, ^
{
usleep(100);
[subscriber putNext:@1];
});
@ -229,6 +230,7 @@
{
generated = true;
} error:nil completed:nil];
NSLog(@"dispose");
[disposable dispose];
}