Browse Source

Adding optimization to reuse SRIOConsumers

Mike Lewis 13 years ago
parent
commit
01d3e34b20
1 changed files with 64 additions and 15 deletions
  1. 64 15
      SocketRocket/SRWebSocket.m

+ 64 - 15
SocketRocket/SRWebSocket.m

@@ -157,9 +157,6 @@ typedef void (^data_callback)(SRWebSocket *webSocket,  NSData *data);
     BOOL _readToCurrentFrame;
     BOOL _unmaskBytes;
 }
-
-- (id)initWithScanner:(stream_scanner)scanner handler:(data_callback)handler bytesNeeded:(size_t)bytesNeeded readToCurrentFrame:(BOOL)readToCurrentFrame unmaskBytes:(BOOL)unmaskBytes;
-
 @property (nonatomic, copy, readonly) stream_scanner consumer;
 @property (nonatomic, copy, readonly) data_callback handler;
 @property (nonatomic, assign) size_t bytesNeeded;
@@ -168,6 +165,15 @@ typedef void (^data_callback)(SRWebSocket *webSocket,  NSData *data);
 
 @end
 
+// This class is not thread-safe, and is expected to always be run on the same queue
+@interface SRIOConsumerPool : NSObject
+
+- (id)initWithBufferCapacity:(NSUInteger)poolSize;
+
+- (SRIOConsumer *)consumerWithScanner:(stream_scanner)scanner handler:(data_callback)handler bytesNeeded:(size_t)bytesNeeded readToCurrentFrame:(BOOL)readToCurrentFrame unmaskBytes:(BOOL)unmaskBytes;
+- (void)returnConsumer:(SRIOConsumer *)consumer;
+
+@end
 
 @interface SRWebSocket ()  <NSStreamDelegate>
 
@@ -258,6 +264,7 @@ typedef void (^data_callback)(SRWebSocket *webSocket,  NSData *data);
     __strong SRWebSocket *_selfRetain;
     
     NSArray *_requestedProtocols;
+    SRIOConsumerPool *_consumerPool;
 }
 
 @synthesize delegate = _delegate;
@@ -331,6 +338,7 @@ static __strong NSData *CRLFCRLF;
     _currentFrameData = [[NSMutableData alloc] init];
 
     _consumers = [[NSMutableArray alloc] init];
+    _consumerPool = [[SRIOConsumerPool alloc] init];
     
     [self _initializeStreams];
     
@@ -1050,14 +1058,14 @@ static const uint8_t SRPayloadLenMask   = 0x7F;
     assert(dispatch_get_current_queue() == _workQueue);
     assert(dataLength);
     
-    [_consumers addObject:[[SRIOConsumer alloc] initWithScanner:nil handler:callback bytesNeeded:dataLength readToCurrentFrame:readToCurrentFrame unmaskBytes:unmaskBytes]];
+    [_consumers addObject:[_consumerPool consumerWithScanner:nil handler:callback bytesNeeded:dataLength readToCurrentFrame:readToCurrentFrame unmaskBytes:unmaskBytes]];
     [self _pumpScanner];
 }
 
 - (void)_addConsumerWithScanner:(stream_scanner)consumer callback:(data_callback)callback dataLength:(size_t)dataLength;
 {    
     assert(dispatch_get_current_queue() == _workQueue);
-    [_consumers addObject:[[SRIOConsumer alloc] initWithScanner:consumer handler:callback bytesNeeded:dataLength readToCurrentFrame:NO unmaskBytes:NO]];
+    [_consumers addObject:[_consumerPool consumerWithScanner:consumer handler:callback bytesNeeded:dataLength readToCurrentFrame:NO unmaskBytes:NO]];
     [self _pumpScanner];
 }
 
@@ -1189,11 +1197,13 @@ static const char CRLFCRLFBytes[] = {'\r', '\n', '\r', '\n'};
             if (consumer.bytesNeeded == 0) {
                 [_consumers removeObjectAtIndex:0];
                 consumer.handler(self, nil);
+                [_consumerPool returnConsumer:consumer];
                 didWork = YES;
             }
         } else if (foundSize) {
             [_consumers removeObjectAtIndex:0];
             consumer.handler(self, slice);
+            [_consumerPool returnConsumer:consumer];
             didWork = YES;
         }
     }
@@ -1337,8 +1347,6 @@ static const size_t SRFrameHeaderOverhead = 32;
                 if (self.readyState >= SR_CLOSING) {
                     return;
                 }
-                
-
                 assert(_readBuffer);
                 
                 if (self.readyState == SR_CONNECTING && aStream == _inputStream) {
@@ -1417,7 +1425,6 @@ static const size_t SRFrameHeaderOverhead = 32;
                 break;
         }
     });
-
 }
 
 @end
@@ -1431,20 +1438,62 @@ static const size_t SRFrameHeaderOverhead = 32;
 @synthesize readToCurrentFrame = _readToCurrentFrame;
 @synthesize unmaskBytes = _unmaskBytes;
 
-- (id)initWithScanner:(stream_scanner)scanner handler:(data_callback)handler bytesNeeded:(size_t)bytesNeeded readToCurrentFrame:(BOOL)readToCurrentFrame unmaskBytes:(BOOL)unmaskBytes;
+- (void)setupWithScanner:(stream_scanner)scanner handler:(data_callback)handler bytesNeeded:(size_t)bytesNeeded readToCurrentFrame:(BOOL)readToCurrentFrame unmaskBytes:(BOOL)unmaskBytes;
+{
+    _scanner = [scanner copy];
+    _handler = [handler copy];
+    _bytesNeeded = bytesNeeded;
+    _readToCurrentFrame = readToCurrentFrame;
+    _unmaskBytes = unmaskBytes;
+    assert(_scanner || _bytesNeeded);
+}
+
+
+@end
+
+
+@implementation SRIOConsumerPool {
+    NSUInteger _poolSize;
+    NSMutableArray *_bufferedConsumers;
+}
+
+- (id)initWithBufferCapacity:(NSUInteger)poolSize;
 {
     self = [super init];
     if (self) {
-        _scanner = [scanner copy];
-        _handler = [handler copy];
-        _bytesNeeded = bytesNeeded;
-        _readToCurrentFrame = readToCurrentFrame;
-        _unmaskBytes = unmaskBytes;
-        assert(_scanner || _bytesNeeded);
+        _poolSize = poolSize;
+        _bufferedConsumers = [[NSMutableArray alloc] initWithCapacity:poolSize];
     }
     return self;
 }
 
+- (id)init
+{
+    return [self initWithBufferCapacity:8];
+}
+
+- (SRIOConsumer *)consumerWithScanner:(stream_scanner)scanner handler:(data_callback)handler bytesNeeded:(size_t)bytesNeeded readToCurrentFrame:(BOOL)readToCurrentFrame unmaskBytes:(BOOL)unmaskBytes;
+{
+    SRIOConsumer *consumer = nil;
+    if (_bufferedConsumers.count) {
+        consumer = [_bufferedConsumers lastObject];
+        [_bufferedConsumers removeLastObject];
+    } else {
+        consumer = [[SRIOConsumer alloc] init];
+    }
+    
+    [consumer setupWithScanner:scanner handler:handler bytesNeeded:bytesNeeded readToCurrentFrame:readToCurrentFrame unmaskBytes:unmaskBytes];
+    
+    return consumer;
+}
+
+- (void)returnConsumer:(SRIOConsumer *)consumer;
+{
+    if (_bufferedConsumers.count < _poolSize) {
+        [_bufferedConsumers addObject:consumer];
+    }
+}
+
 @end