Эх сурвалжийг харах

Merge remote-tracking branch 'origin/threading-improvements'

Conflicts:
	SocketRocket.xcodeproj/project.pbxproj
	SocketRocket/SRWebSocket.m
Mike Lewis 13 жил өмнө
parent
commit
37f811eabf

BIN
Default-568h@2x.png


+ 0 - 2
SRWebSocketTests/SRTAutobahnTests.m

@@ -159,8 +159,6 @@
         for (NSException *e in run.exceptions) {
             [aRun addException:e];
         }
-        
-//        [aRun ]
     }
     [aRun stop];
     

+ 5 - 0
SRWebSocketTests/SRTWebSocketOperation.m

@@ -61,6 +61,11 @@
     _webSocket = nil;
 }
 
+- (void)webSocket:(SRWebSocket *)webSocket didReceiveMessage:(id)message;
+{
+    NSAssert(NO, @"Not implemented");
+}
+
 - (void)webSocket:(SRWebSocket *)webSocket didFailWithError:(NSError *)error;
 {
     _error = error;

+ 4 - 1
SocketRocket.xcodeproj/project.pbxproj

@@ -7,6 +7,7 @@
 	objects = {
 
 /* Begin PBXBuildFile section */
+		27FF2C2916066747006EF077 /* Default-568h@2x.png in Resources */ = {isa = PBXBuildFile; fileRef = 27FF2C2816066746006EF077 /* Default-568h@2x.png */; };
 		F6016C7C146124B20037BB3D /* base64.c in Sources */ = {isa = PBXBuildFile; fileRef = F6016C7B146124B20037BB3D /* base64.c */; };
 		F6016C7F146124ED0037BB3D /* base64.h in Headers */ = {isa = PBXBuildFile; fileRef = F6016C7E146124ED0037BB3D /* base64.h */; };
 		F6016C8814620EC70037BB3D /* Security.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = F6A12CD3145122FC00C1D980 /* Security.framework */; };
@@ -60,6 +61,7 @@
 /* End PBXContainerItemProxy section */
 
 /* Begin PBXFileReference section */
+		27FF2C2816066746006EF077 /* Default-568h@2x.png */ = {isa = PBXFileReference; lastKnownFileType = image.png; name = "Default-568h@2x.png"; path = "../Default-568h@2x.png"; sourceTree = "<group>"; };
 		F6016C7B146124B20037BB3D /* base64.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; path = base64.c; sourceTree = "<group>"; };
 		F6016C7E146124ED0037BB3D /* base64.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = base64.h; sourceTree = "<group>"; };
 		F60CC29F14D4EA0500A005E4 /* SRTWebSocketOperation.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = SRTWebSocketOperation.h; sourceTree = "<group>"; };
@@ -400,7 +402,7 @@
 		F6B208241450F597009315AF /* Project object */ = {
 			isa = PBXProject;
 			attributes = {
-				LastUpgradeCheck = 0440;
+				LastUpgradeCheck = 0450;
 			};
 			buildConfigurationList = F6B208271450F597009315AF /* Build configuration list for PBXProject "SocketRocket" */;
 			compatibilityVersion = "Xcode 3.2";
@@ -681,6 +683,7 @@
 				COPY_PHASE_STRIP = YES;
 				GCC_PREPROCESSOR_DEFINITIONS = (
 					"OS_OBJECT_USE_OBJC_RETAIN_RELEASE=0",
+					NDEBUG,
 					"$(inherited)",
 				);
 				GCC_SYMBOLS_PRIVATE_EXTERN = YES;

+ 1 - 2
SocketRocket.xcodeproj/xcshareddata/xcschemes/SocketRocket.xcscheme

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <Scheme
-   LastUpgradeVersion = "0440"
+   LastUpgradeVersion = "0450"
    version = "1.7">
    <BuildAction
       parallelizeBuildables = "YES"
@@ -116,7 +116,6 @@
       buildConfiguration = "Debug"
       ignoresPersistentStateOnLaunch = "NO"
       debugDocumentVersioning = "YES"
-      enableOpenGLFrameCaptureMode = "0"
       allowLocationSimulation = "YES">
       <AdditionalOptions>
          <AdditionalOption

+ 1 - 1
SocketRocket.xcodeproj/xcshareddata/xcschemes/SocketRocketOSX.xcscheme

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <Scheme
-   LastUpgradeVersion = "0440"
+   LastUpgradeVersion = "0450"
    version = "1.3">
    <BuildAction
       parallelizeBuildables = "YES"

+ 1 - 1
SocketRocket.xcodeproj/xcshareddata/xcschemes/SocketRocketTests.xcscheme

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <Scheme
-   LastUpgradeVersion = "0440"
+   LastUpgradeVersion = "0450"
    version = "1.3">
    <BuildAction
       parallelizeBuildables = "YES"

+ 2 - 2
SocketRocket.xcodeproj/xcshareddata/xcschemes/TestChat.xcscheme

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <Scheme
-   LastUpgradeVersion = "0440"
+   LastUpgradeVersion = "0450"
    version = "1.3">
    <BuildAction
       parallelizeBuildables = "YES"
@@ -23,7 +23,7 @@
       </BuildActionEntries>
    </BuildAction>
    <TestAction
-      selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.GDB"
+      selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
       selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.GDB"
       shouldUseLaunchSchemeArgsEnv = "YES"
       buildConfiguration = "Debug">

+ 15 - 0
SocketRocket/SRWebSocket.h

@@ -50,6 +50,15 @@ extern NSString *const SRWebSocketErrorDomain;
 - (id)initWithURL:(NSURL *)url protocols:(NSArray *)protocols;
 - (id)initWithURL:(NSURL *)url;
 
+// Delegage queue will be dispatch_main_queue by default
+// Only can set OperationQeuue or dispatch_queue
+- (void)setDelegateOperationQueue:(NSOperationQueue*) queue;
+- (void)setDelegateDispatchQueue:(dispatch_queue_t) queue;
+
+// By default, it will schedule itself on +[NSRunLoop SR_networkRunLoop] using defaultModes
+- (void)scheduleInRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
+- (void)unscheduleFromRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
+
 // SRWebSockets are intended one-time-use only.  Open should be called once and only once
 - (void)open;
 
@@ -88,3 +97,9 @@ extern NSString *const SRWebSocketErrorDomain;
 @property (nonatomic, retain) NSArray *SR_SSLPinnedCertificates;
 
 @end
+
+@interface NSRunLoop (SRWebSocket)
+
++ (NSRunLoop *)SR_networkRunLoop;
+
+@end

+ 237 - 60
SocketRocket/SRWebSocket.m

@@ -99,6 +99,13 @@ static inline void SRFastLog(NSString *format, ...);
 @end
 
 
+@interface _SRRunLoopThread : NSThread
+
+@property (nonatomic, readonly) NSRunLoop *runLoop;
+
+@end
+
+
 static NSString *newSHA1String(const char *bytes, size_t length) {
     uint8_t md[CC_SHA1_DIGEST_LENGTH];
     
@@ -151,9 +158,6 @@ typedef void (^data_callback)(SRWebSocket *webSocket,  NSData *data);
     BOOL _readToCurrentFrame;
     BOOL _unmaskBytes;
 }
-
-- (id)initWithScanner:(stream_scanner)scanner handler:(data_callback)handler bytesNeeded:(size_t)bytesNeeded readToCurrentFrame:(BOOL)readToCurrentFrame unmaskBytes:(BOOL)unmaskBytes;
-
 @property (nonatomic, copy, readonly) stream_scanner consumer;
 @property (nonatomic, copy, readonly) data_callback handler;
 @property (nonatomic, assign) size_t bytesNeeded;
@@ -162,6 +166,15 @@ typedef void (^data_callback)(SRWebSocket *webSocket,  NSData *data);
 
 @end
 
+// This class is not thread-safe, and is expected to always be run on the same queue
+@interface SRIOConsumerPool : NSObject
+
+- (id)initWithBufferCapacity:(NSUInteger)poolSize;
+
+- (SRIOConsumer *)consumerWithScanner:(stream_scanner)scanner handler:(data_callback)handler bytesNeeded:(size_t)bytesNeeded readToCurrentFrame:(BOOL)readToCurrentFrame unmaskBytes:(BOOL)unmaskBytes;
+- (void)returnConsumer:(SRIOConsumer *)consumer;
+
+@end
 
 @interface SRWebSocket ()  <NSStreamDelegate>
 
@@ -189,16 +202,23 @@ typedef void (^data_callback)(SRWebSocket *webSocket,  NSData *data);
 - (BOOL)_checkHandshake:(CFHTTPMessageRef)httpMessage;
 - (void)_SR_commonInit;
 
-- (void)_connectToHost:(NSString *)host port:(NSInteger)port;
+- (void)_initializeStreams;
+- (void)_connect;
 
 @property (nonatomic) SRReadyState readyState;
 
+@property (nonatomic) NSOperationQueue *delegateOperationQueue;
+@property (nonatomic) dispatch_queue_t delegateDispatchQueue;
+
 @end
 
 
 @implementation SRWebSocket {
     NSInteger _webSocketVersion;
-    dispatch_queue_t _callbackQueue;
+    
+    NSOperationQueue *_delegateOperationQueue;
+    dispatch_queue_t _delegateDispatchQueue;
+    
     dispatch_queue_t _workQueue;
     NSMutableArray *_consumers;
 
@@ -242,10 +262,13 @@ typedef void (^data_callback)(SRWebSocket *webSocket,  NSData *data);
     
     BOOL _isPumping;
     
+    BOOL _didSchedule;
+    
     // We use this to retain ourselves.
     __strong SRWebSocket *_selfRetain;
     
     NSArray *_requestedProtocols;
+    SRIOConsumerPool *_consumerPool;
 }
 
 @synthesize delegate = _delegate;
@@ -266,16 +289,9 @@ static __strong NSData *CRLFCRLF;
     if (self) {
         assert(request.URL);
         _url = request.URL;
-        NSString *scheme = [[_url scheme] lowercaseString];
-        
-        _requestedProtocols = [protocols copy];
-
-        assert([scheme isEqualToString:@"ws"] || [scheme isEqualToString:@"http"] || [scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]);
         _urlRequest = request;
         
-        if ([scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]) {
-            _secure = YES;
-        }
+        _requestedProtocols = [protocols copy];
         
         [self _SR_commonInit];
     }
@@ -301,6 +317,15 @@ static __strong NSData *CRLFCRLF;
 
 - (void)_SR_commonInit;
 {
+    
+    NSString *scheme = _url.scheme.lowercaseString;
+    assert([scheme isEqualToString:@"ws"] || [scheme isEqualToString:@"http"] || [scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]);
+    
+    if ([scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]) {
+        _secure = YES;
+    }
+    
+    
     _readyState = SR_CONNECTING;
 
     _consumerStopped = YES;
@@ -309,8 +334,8 @@ static __strong NSData *CRLFCRLF;
     
     _workQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
     
-    _callbackQueue = dispatch_get_main_queue();
-    dispatch_retain(_callbackQueue);
+    _delegateDispatchQueue = dispatch_get_main_queue();
+    dispatch_retain(_delegateDispatchQueue);
     
     _readBuffer = [[NSMutableData alloc] init];
     _outputBuffer = [[NSMutableData alloc] init];
@@ -318,6 +343,9 @@ static __strong NSData *CRLFCRLF;
     _currentFrameData = [[NSMutableData alloc] init];
 
     _consumers = [[NSMutableArray alloc] init];
+    _consumerPool = [[SRIOConsumerPool alloc] init];
+    
+    [self _initializeStreams];
     
     // default handlers
 }
@@ -330,13 +358,17 @@ static __strong NSData *CRLFCRLF;
     [_inputStream close];
     [_outputStream close];
     
-    dispatch_release(_callbackQueue);
     dispatch_release(_workQueue);
     
     if (_receivedHTTPHeaders) {
         CFRelease(_receivedHTTPHeaders);
         _receivedHTTPHeaders = NULL;
     }
+    
+    if (_delegateDispatchQueue) {
+        dispatch_release(_delegateDispatchQueue);
+        _delegateDispatchQueue = NULL;
+    }
 }
 
 #ifndef NDEBUG
@@ -354,23 +386,36 @@ static __strong NSData *CRLFCRLF;
 - (void)open;
 {
     assert(_url);
-    NSAssert(_readyState == SR_CONNECTING && _inputStream == nil && _outputStream == nil, @"Cannot call -(void)open on SRWebSocket more than once");
+    NSAssert(_readyState == SR_CONNECTING, @"Cannot call -(void)open on SRWebSocket more than once");
 
     _selfRetain = self;
     
-    NSInteger port = _url.port.integerValue;
-    if (port == 0) {
-        if (!_secure) {
-            port = 80;
-        } else {
-            port = 443;
-        }
-    }
-
-    [self _connectToHost:_url.host port:port];
+    [self _connect];
 }
 
+// Calls block on delegate queue
+- (void)_performDelegateBlock:(dispatch_block_t)block;
+{
+    if (_delegateOperationQueue) {
+        [_delegateOperationQueue addOperationWithBlock:block];
+    } else {
+        assert(_delegateDispatchQueue);
+        dispatch_async(_delegateDispatchQueue, block);
+    }
+}
 
+- (void)setDelegateDispatchQueue:(dispatch_queue_t)queue;
+{
+    if (queue) {
+        dispatch_retain(queue);
+    }
+    
+    if (_delegateDispatchQueue) {
+        dispatch_release(_delegateDispatchQueue);
+    }
+    
+    _delegateDispatchQueue = queue;
+}
 
 - (BOOL)_checkHandshake:(CFHTTPMessageRef)httpMessage;
 {
@@ -419,11 +464,11 @@ static __strong NSData *CRLFCRLF;
         [self _readFrameNew];
     }
 
-    dispatch_async(_callbackQueue, ^{
+    [self _performDelegateBlock:^{
         if ([self.delegate respondsToSelector:@selector(webSocketDidOpen:)]) {
             [self.delegate webSocketDidOpen:self];
-        }
-    });
+        };
+    }];
 }
 
 
@@ -481,8 +526,18 @@ static __strong NSData *CRLFCRLF;
     [self _readHTTPHeader];
 }
 
-- (void)_connectToHost:(NSString *)host port:(NSInteger)port;
-{    
+- (void)_initializeStreams;
+{
+    NSInteger port = _url.port.integerValue;
+    if (port == 0) {
+        if (!_secure) {
+            port = 80;
+        } else {
+            port = 443;
+        }
+    }
+    NSString *host = _url.host;
+    
     CFReadStreamRef readStream = NULL;
     CFWriteStreamRef writeStream = NULL;
     
@@ -494,7 +549,7 @@ static __strong NSData *CRLFCRLF;
     
     if (_secure) {
         NSMutableDictionary *SSLOptions = [[NSMutableDictionary alloc] init];
-             
+        
         [_outputStream setProperty:(__bridge id)kCFStreamSocketSecurityLevelNegotiatedSSL forKey:(__bridge id)kCFStreamPropertySocketSecurityLevel];
         
         // If we're using pinned certs, don't validate the certificate chain
@@ -502,10 +557,10 @@ static __strong NSData *CRLFCRLF;
             [SSLOptions setValue:[NSNumber numberWithBool:NO] forKey:(__bridge id)kCFStreamSSLValidatesCertificateChain];
         }
         
-        #if DEBUG
+#if DEBUG
         [SSLOptions setValue:[NSNumber numberWithBool:NO] forKey:(__bridge id)kCFStreamSSLValidatesCertificateChain];
         NSLog(@"SocketRocket: In debug mode.  Allowing connection to any root cert");
-        #endif
+#endif
         
         [_outputStream setProperty:SSLOptions
                             forKey:(__bridge id)kCFStreamPropertySSLSettings];
@@ -513,16 +568,34 @@ static __strong NSData *CRLFCRLF;
     
     _inputStream.delegate = self;
     _outputStream.delegate = self;
+}
+
+- (void)_connect;
+{
     
-    // TODO schedule in a better run loop
-    [_outputStream scheduleInRunLoop:[NSRunLoop mainRunLoop] forMode:NSDefaultRunLoopMode];
-    [_inputStream scheduleInRunLoop:[NSRunLoop mainRunLoop] forMode:NSDefaultRunLoopMode];
+    if (!_didSchedule) {
+        [self scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
+    }
     
     
     [_outputStream open];
     [_inputStream open];
 }
 
+- (void)scheduleInRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
+{
+    _didSchedule = YES;
+    
+    [_outputStream scheduleInRunLoop:aRunLoop forMode:mode];
+    [_inputStream scheduleInRunLoop:aRunLoop forMode:mode];
+}
+
+- (void)unscheduleFromRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
+{
+    [_outputStream removeFromRunLoop:aRunLoop forMode:mode];
+    [_inputStream removeFromRunLoop:aRunLoop forMode:mode];
+}
+
 - (void)close;
 {
     [self closeWithCode:-1 reason:nil];
@@ -575,12 +648,12 @@ static __strong NSData *CRLFCRLF;
 - (void)_closeWithProtocolError:(NSString *)message;
 {
     // Need to shunt this on the _callbackQueue first to see if they received any messages 
-    dispatch_async(_callbackQueue, ^{
+    [self _performDelegateBlock:^{
         [self closeWithCode:SRStatusCodeProtocolError reason:message];
         dispatch_async(_workQueue, ^{
             [self _disconnect];
         });
-    });
+    }];
 }
 
 - (void)_failWithError:(NSError *)error;
@@ -588,11 +661,11 @@ static __strong NSData *CRLFCRLF;
     dispatch_async(_workQueue, ^{
         if (self.readyState != SR_CLOSED) {
             _failed = YES;
-            dispatch_async(_callbackQueue, ^{
+            [self _performDelegateBlock:^{
                 if ([self.delegate respondsToSelector:@selector(webSocket:didFailWithError:)]) {
                     [self.delegate webSocket:self didFailWithError:error];
                 }
-            });
+            }];
 
             self.readyState = SR_CLOSED;
             _selfRetain = nil;
@@ -635,11 +708,11 @@ static __strong NSData *CRLFCRLF;
 - (void)handlePing:(NSData *)pingData;
 {
     // Need to pingpong this off _callbackQueue first to make sure messages happen in order
-    dispatch_async(_callbackQueue, ^{
+    [self _performDelegateBlock:^{
         dispatch_async(_workQueue, ^{
             [self _sendFrameWithOpcode:SROpCodePong data:pingData];
         });
-    });
+    }];
 }
 
 - (void)handlePong;
@@ -650,9 +723,9 @@ static __strong NSData *CRLFCRLF;
 - (void)_handleMessage:(id)message
 {
     SRFastLog(@"Received message");
-    dispatch_async(_callbackQueue, ^{
+    [self _performDelegateBlock:^{
         [self.delegate webSocket:self didReceiveMessage:message];
-    });
+    }];
 }
 
 
@@ -995,11 +1068,11 @@ static const uint8_t SRPayloadLenMask   = 0x7F;
         [_inputStream close];
         
         if (!_failed) {
-            dispatch_async(_callbackQueue, ^{
+            [self _performDelegateBlock:^{
                 if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {
                     [self.delegate webSocket:self didCloseWithCode:_closeCode reason:_closeReason wasClean:YES];
                 }
-            });
+            }];
         }
         
         _selfRetain = nil;
@@ -1017,14 +1090,14 @@ static const uint8_t SRPayloadLenMask   = 0x7F;
     assert(dispatch_get_current_queue() == _workQueue);
     assert(dataLength);
     
-    [_consumers addObject:[[SRIOConsumer alloc] initWithScanner:nil handler:callback bytesNeeded:dataLength readToCurrentFrame:readToCurrentFrame unmaskBytes:unmaskBytes]];
+    [_consumers addObject:[_consumerPool consumerWithScanner:nil handler:callback bytesNeeded:dataLength readToCurrentFrame:readToCurrentFrame unmaskBytes:unmaskBytes]];
     [self _pumpScanner];
 }
 
 - (void)_addConsumerWithScanner:(stream_scanner)consumer callback:(data_callback)callback dataLength:(size_t)dataLength;
 {    
     assert(dispatch_get_current_queue() == _workQueue);
-    [_consumers addObject:[[SRIOConsumer alloc] initWithScanner:consumer handler:callback bytesNeeded:dataLength readToCurrentFrame:NO unmaskBytes:NO]];
+    [_consumers addObject:[_consumerPool consumerWithScanner:consumer handler:callback bytesNeeded:dataLength readToCurrentFrame:NO unmaskBytes:NO]];
     [self _pumpScanner];
 }
 
@@ -1156,11 +1229,13 @@ static const char CRLFCRLFBytes[] = {'\r', '\n', '\r', '\n'};
             if (consumer.bytesNeeded == 0) {
                 [_consumers removeObjectAtIndex:0];
                 consumer.handler(self, nil);
+                [_consumerPool returnConsumer:consumer];
                 didWork = YES;
             }
         } else if (foundSize) {
             [_consumers removeObjectAtIndex:0];
             consumer.handler(self, slice);
+            [_consumerPool returnConsumer:consumer];
             didWork = YES;
         }
     }
@@ -1303,7 +1378,6 @@ static const size_t SRFrameHeaderOverhead = 32;
                 if (self.readyState >= SR_CLOSING) {
                     return;
                 }
-
                 assert(_readBuffer);
                 
                 if (self.readyState == SR_CONNECTING && aStream == _inputStream) {
@@ -1338,11 +1412,11 @@ static const size_t SRFrameHeaderOverhead = 32;
                     if (!_sentClose && !_failed) {
                         _sentClose = YES;
                         // If we get closed in this state it's probably not clean because we should be sending this when we send messages
-                        dispatch_async(_callbackQueue, ^{
+                        [self _performDelegateBlock:^{
                             if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {
                                 [self.delegate webSocket:self didCloseWithCode:0 reason:@"Stream end encountered" wasClean:NO];
                             }
-                        });
+                        }];
                     }
                 }
                 
@@ -1395,20 +1469,62 @@ static const size_t SRFrameHeaderOverhead = 32;
 @synthesize readToCurrentFrame = _readToCurrentFrame;
 @synthesize unmaskBytes = _unmaskBytes;
 
-- (id)initWithScanner:(stream_scanner)scanner handler:(data_callback)handler bytesNeeded:(size_t)bytesNeeded readToCurrentFrame:(BOOL)readToCurrentFrame unmaskBytes:(BOOL)unmaskBytes;
+- (void)setupWithScanner:(stream_scanner)scanner handler:(data_callback)handler bytesNeeded:(size_t)bytesNeeded readToCurrentFrame:(BOOL)readToCurrentFrame unmaskBytes:(BOOL)unmaskBytes;
+{
+    _scanner = [scanner copy];
+    _handler = [handler copy];
+    _bytesNeeded = bytesNeeded;
+    _readToCurrentFrame = readToCurrentFrame;
+    _unmaskBytes = unmaskBytes;
+    assert(_scanner || _bytesNeeded);
+}
+
+
+@end
+
+
+@implementation SRIOConsumerPool {
+    NSUInteger _poolSize;
+    NSMutableArray *_bufferedConsumers;
+}
+
+- (id)initWithBufferCapacity:(NSUInteger)poolSize;
 {
     self = [super init];
     if (self) {
-        _scanner = [scanner copy];
-        _handler = [handler copy];
-        _bytesNeeded = bytesNeeded;
-        _readToCurrentFrame = readToCurrentFrame;
-        _unmaskBytes = unmaskBytes;
-        assert(_scanner || _bytesNeeded);
+        _poolSize = poolSize;
+        _bufferedConsumers = [[NSMutableArray alloc] initWithCapacity:poolSize];
     }
     return self;
 }
 
+- (id)init
+{
+    return [self initWithBufferCapacity:8];
+}
+
+- (SRIOConsumer *)consumerWithScanner:(stream_scanner)scanner handler:(data_callback)handler bytesNeeded:(size_t)bytesNeeded readToCurrentFrame:(BOOL)readToCurrentFrame unmaskBytes:(BOOL)unmaskBytes;
+{
+    SRIOConsumer *consumer = nil;
+    if (_bufferedConsumers.count) {
+        consumer = [_bufferedConsumers lastObject];
+        [_bufferedConsumers removeLastObject];
+    } else {
+        consumer = [[SRIOConsumer alloc] init];
+    }
+    
+    [consumer setupWithScanner:scanner handler:handler bytesNeeded:bytesNeeded readToCurrentFrame:readToCurrentFrame unmaskBytes:unmaskBytes];
+    
+    return consumer;
+}
+
+- (void)returnConsumer:(SRIOConsumer *)consumer;
+{
+    if (_bufferedConsumers.count < _poolSize) {
+        [_bufferedConsumers addObject:consumer];
+    }
+}
+
 @end
 
 
@@ -1544,4 +1660,65 @@ static inline int32_t validate_dispatch_data_partial_string(NSData *data) {
 
 #endif
 
+static _SRRunLoopThread *networkThread = nil;
+static NSRunLoop *networkRunLoop = nil;
+
+@implementation NSRunLoop (SRWebSocket)
+
++ (NSRunLoop *)SR_networkRunLoop {
+    static dispatch_once_t onceToken;
+    dispatch_once(&onceToken, ^{
+        networkThread = [[_SRRunLoopThread alloc] init];
+        networkThread.name = @"com.squareup.SocketRocket.NetworkThread";
+        [networkThread start];
+        networkRunLoop = networkThread.runLoop;
+    });
+    
+    return networkRunLoop;
+}
+
+@end
+
+
+@implementation _SRRunLoopThread {
+    dispatch_group_t _waitGroup;
+}
+
+@synthesize runLoop = _runLoop;
+
+- (void)dealloc
+{
+    dispatch_release(_waitGroup);
+}
+
+- (id)init
+{
+    self = [super init];
+    if (self) {
+        _waitGroup = dispatch_group_create();
+        dispatch_group_enter(_waitGroup);
+    }
+    return self;
+}
+
+- (void)main;
+{
+    _runLoop = [NSRunLoop currentRunLoop];
+    dispatch_group_leave(_waitGroup);
+    
+    NSTimer *timer = [[NSTimer alloc] initWithFireDate:[NSDate distantFuture] interval:0.0 target:nil selector:nil userInfo:nil repeats:NO];
+    [_runLoop addTimer:timer forMode:NSDefaultRunLoopMode];
+    
+    while ([_runLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]]) {
+        
+    }
+    assert(NO);
+}
+
+- (NSRunLoop *)runLoop;
+{
+    dispatch_group_wait(_waitGroup, DISPATCH_TIME_FOREVER);
+    return _runLoop;
+}
 
+@end