Browse Source

Scheduling improvements

By default now, SRWebSocket will schedule the network on a shared
background thread

It can also be configured to call the delegate methods on a non-main
queue
Mike Lewis 13 năm trước cách đây
mục cha
commit
75fd01cb41
3 tập tin đã thay đổi với 160 bổ sung48 xóa
  1. 5 1
      SocketRocket.xcodeproj/project.pbxproj
  2. 13 0
      SocketRocket/SRWebSocket.h
  3. 142 47
      SocketRocket/SRWebSocket.m

+ 5 - 1
SocketRocket.xcodeproj/project.pbxproj

@@ -7,6 +7,7 @@
 	objects = {
 
 /* Begin PBXBuildFile section */
+		27FF2C2916066747006EF077 /* Default-568h@2x.png in Resources */ = {isa = PBXBuildFile; fileRef = 27FF2C2816066746006EF077 /* Default-568h@2x.png */; };
 		F6016C7C146124B20037BB3D /* base64.c in Sources */ = {isa = PBXBuildFile; fileRef = F6016C7B146124B20037BB3D /* base64.c */; };
 		F6016C7F146124ED0037BB3D /* base64.h in Headers */ = {isa = PBXBuildFile; fileRef = F6016C7E146124ED0037BB3D /* base64.h */; };
 		F6016C8814620EC70037BB3D /* Security.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = F6A12CD3145122FC00C1D980 /* Security.framework */; };
@@ -59,6 +60,7 @@
 /* End PBXContainerItemProxy section */
 
 /* Begin PBXFileReference section */
+		27FF2C2816066746006EF077 /* Default-568h@2x.png */ = {isa = PBXFileReference; lastKnownFileType = image.png; name = "Default-568h@2x.png"; path = "../Default-568h@2x.png"; sourceTree = "<group>"; };
 		F6016C7B146124B20037BB3D /* base64.c */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.c; path = base64.c; sourceTree = "<group>"; };
 		F6016C7E146124ED0037BB3D /* base64.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = base64.h; sourceTree = "<group>"; };
 		F60CC29F14D4EA0500A005E4 /* SRTWebSocketOperation.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = SRTWebSocketOperation.h; sourceTree = "<group>"; };
@@ -175,6 +177,7 @@
 		F62417EB14D52F3C003CE997 /* Supporting Files */ = {
 			isa = PBXGroup;
 			children = (
+				27FF2C2816066746006EF077 /* Default-568h@2x.png */,
 				F62417EC14D52F3C003CE997 /* TestChat-Info.plist */,
 				F62417ED14D52F3C003CE997 /* InfoPlist.strings */,
 				F62417F014D52F3C003CE997 /* main.m */,
@@ -397,7 +400,7 @@
 		F6B208241450F597009315AF /* Project object */ = {
 			isa = PBXProject;
 			attributes = {
-				LastUpgradeCheck = 0440;
+				LastUpgradeCheck = 0450;
 			};
 			buildConfigurationList = F6B208271450F597009315AF /* Build configuration list for PBXProject "SocketRocket" */;
 			compatibilityVersion = "Xcode 3.2";
@@ -426,6 +429,7 @@
 			files = (
 				F62417EF14D52F3C003CE997 /* InfoPlist.strings in Resources */,
 				F62417F814D52F3C003CE997 /* MainStoryboard.storyboard in Resources */,
+				27FF2C2916066747006EF077 /* Default-568h@2x.png in Resources */,
 			);
 			runOnlyForDeploymentPostprocessing = 0;
 		};

+ 13 - 0
SocketRocket/SRWebSocket.h

@@ -50,6 +50,13 @@ extern NSString *const SRWebSocketErrorDomain;
 - (id)initWithURL:(NSURL *)url protocols:(NSArray *)protocols;
 - (id)initWithURL:(NSURL *)url;
 
+// Delegage queue will be +[NSOperationQueue mainQueue] by default
+- (void)setDelegateQueue:(NSOperationQueue*) queue;
+
+// By default, it will schedule itself on +[NSRunLoop SR_networkRunLoop] using defaultModes
+- (void)scheduleInRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
+- (void)unscheduleFromRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
+
 // SRWebSockets are intended one-time-use only.  Open should be called once and only once
 - (void)open;
 
@@ -88,3 +95,9 @@ extern NSString *const SRWebSocketErrorDomain;
 @property (nonatomic, retain) NSArray *SR_SSLPinnedCertificates;
 
 @end
+
+@interface NSRunLoop (SRWebSocket)
+
++ (NSRunLoop *)SR_networkRunLoop;
+
+@end

+ 142 - 47
SocketRocket/SRWebSocket.m

@@ -98,6 +98,13 @@ static inline void SRFastLog(NSString *format, ...);
 @end
 
 
+@interface _SRRunLoopThread : NSThread
+
+@property (nonatomic, readonly) NSRunLoop *runLoop;
+
+@end
+
+
 static NSString *newSHA1String(const char *bytes, size_t length) {
     uint8_t md[CC_SHA1_DIGEST_LENGTH];
     
@@ -188,16 +195,20 @@ typedef void (^data_callback)(SRWebSocket *webSocket,  NSData *data);
 - (BOOL)_checkHandshake:(CFHTTPMessageRef)httpMessage;
 - (void)_SR_commonInit;
 
-- (void)_connectToHost:(NSString *)host port:(NSInteger)port;
+- (void)_initializeStreams;
+- (void)_connect;
 
 @property (nonatomic) SRReadyState readyState;
 
+@property (nonatomic) NSOperationQueue *delegateQueue;
+
+
 @end
 
 
 @implementation SRWebSocket {
     NSInteger _webSocketVersion;
-    dispatch_queue_t _callbackQueue;
+    NSOperationQueue *_delegateQueue;
     dispatch_queue_t _workQueue;
     NSMutableArray *_consumers;
 
@@ -241,6 +252,8 @@ typedef void (^data_callback)(SRWebSocket *webSocket,  NSData *data);
     
     BOOL _isPumping;
     
+    BOOL _didSchedule;
+    
     // We use this to retain ourselves.
     __strong SRWebSocket *_selfRetain;
     
@@ -265,16 +278,9 @@ static __strong NSData *CRLFCRLF;
     if (self) {
         assert(request.URL);
         _url = request.URL;
-        NSString *scheme = [_url scheme];
-        
-        _requestedProtocols = [protocols copy];
-
-        assert([scheme isEqualToString:@"ws"] || [scheme isEqualToString:@"http"] || [scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]);
         _urlRequest = request;
         
-        if ([scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]) {
-            _secure = YES;
-        }
+        _requestedProtocols = [protocols copy];
         
         [self _SR_commonInit];
     }
@@ -300,6 +306,15 @@ static __strong NSData *CRLFCRLF;
 
 - (void)_SR_commonInit;
 {
+    
+    NSString *scheme = [_url scheme];
+    assert([scheme isEqualToString:@"ws"] || [scheme isEqualToString:@"http"] || [scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]);
+    
+    if ([scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]) {
+        _secure = YES;
+    }
+    
+    
     _readyState = SR_CONNECTING;
 
     _consumerStopped = YES;
@@ -308,8 +323,7 @@ static __strong NSData *CRLFCRLF;
     
     _workQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
     
-    _callbackQueue = dispatch_get_main_queue();
-    dispatch_retain(_callbackQueue);
+    _delegateQueue = [NSOperationQueue mainQueue];
     
     _readBuffer = [[NSMutableData alloc] init];
     _outputBuffer = [[NSMutableData alloc] init];
@@ -318,6 +332,8 @@ static __strong NSData *CRLFCRLF;
 
     _consumers = [[NSMutableArray alloc] init];
     
+    [self _initializeStreams];
+    
     // default handlers
 }
 
@@ -329,7 +345,6 @@ static __strong NSData *CRLFCRLF;
     [_inputStream close];
     [_outputStream close];
     
-    dispatch_release(_callbackQueue);
     dispatch_release(_workQueue);
     
     if (_receivedHTTPHeaders) {
@@ -353,20 +368,11 @@ static __strong NSData *CRLFCRLF;
 - (void)open;
 {
     assert(_url);
-    NSAssert(_readyState == SR_CONNECTING && _inputStream == nil && _outputStream == nil, @"Cannot call -(void)open on SRWebSocket more than once");
+    NSAssert(_readyState == SR_CONNECTING, @"Cannot call -(void)open on SRWebSocket more than once");
 
     _selfRetain = self;
     
-    NSInteger port = _url.port.integerValue;
-    if (port == 0) {
-        if (!_secure) {
-            port = 80;
-        } else {
-            port = 443;
-        }
-    }
-
-    [self _connectToHost:_url.host port:port];
+    [self _connect];
 }
 
 
@@ -418,11 +424,11 @@ static __strong NSData *CRLFCRLF;
         [self _readFrameNew];
     }
 
-    dispatch_async(_callbackQueue, ^{
+    [_delegateQueue addOperationWithBlock:^{
         if ([self.delegate respondsToSelector:@selector(webSocketDidOpen:)]) {
             [self.delegate webSocketDidOpen:self];
-        }
-    });
+        };
+    }];
 }
 
 
@@ -480,8 +486,18 @@ static __strong NSData *CRLFCRLF;
     [self _readHTTPHeader];
 }
 
-- (void)_connectToHost:(NSString *)host port:(NSInteger)port;
-{    
+- (void)_initializeStreams;
+{
+    NSInteger port = _url.port.integerValue;
+    if (port == 0) {
+        if (!_secure) {
+            port = 80;
+        } else {
+            port = 443;
+        }
+    }
+    NSString *host = _url.host;
+    
     CFReadStreamRef readStream = NULL;
     CFWriteStreamRef writeStream = NULL;
     
@@ -493,7 +509,7 @@ static __strong NSData *CRLFCRLF;
     
     if (_secure) {
         NSMutableDictionary *SSLOptions = [[NSMutableDictionary alloc] init];
-             
+        
         [_outputStream setProperty:(__bridge id)kCFStreamSocketSecurityLevelNegotiatedSSL forKey:(__bridge id)kCFStreamPropertySocketSecurityLevel];
         
         // If we're using pinned certs, don't validate the certificate chain
@@ -501,10 +517,10 @@ static __strong NSData *CRLFCRLF;
             [SSLOptions setValue:[NSNumber numberWithBool:NO] forKey:(__bridge id)kCFStreamSSLValidatesCertificateChain];
         }
         
-        #if DEBUG
+#if DEBUG
         [SSLOptions setValue:[NSNumber numberWithBool:NO] forKey:(__bridge id)kCFStreamSSLValidatesCertificateChain];
         NSLog(@"SocketRocket: In debug mode.  Allowing connection to any root cert");
-        #endif
+#endif
         
         [_outputStream setProperty:SSLOptions
                             forKey:(__bridge id)kCFStreamPropertySSLSettings];
@@ -512,16 +528,34 @@ static __strong NSData *CRLFCRLF;
     
     _inputStream.delegate = self;
     _outputStream.delegate = self;
+}
+
+- (void)_connect;
+{
     
-    // TODO schedule in a better run loop
-    [_outputStream scheduleInRunLoop:[NSRunLoop mainRunLoop] forMode:NSDefaultRunLoopMode];
-    [_inputStream scheduleInRunLoop:[NSRunLoop mainRunLoop] forMode:NSDefaultRunLoopMode];
+    if (!_didSchedule) {
+        [self scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
+    }
     
     
     [_outputStream open];
     [_inputStream open];
 }
 
+- (void)scheduleInRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
+{
+    _didSchedule = YES;
+    
+    [_outputStream scheduleInRunLoop:aRunLoop forMode:mode];
+    [_inputStream scheduleInRunLoop:aRunLoop forMode:mode];
+}
+
+- (void)unscheduleFromRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
+{
+    [_outputStream removeFromRunLoop:aRunLoop forMode:mode];
+    [_inputStream removeFromRunLoop:aRunLoop forMode:mode];
+}
+
 - (void)close;
 {
     [self closeWithCode:-1 reason:nil];
@@ -574,12 +608,12 @@ static __strong NSData *CRLFCRLF;
 - (void)_closeWithProtocolError:(NSString *)message;
 {
     // Need to shunt this on the _callbackQueue first to see if they received any messages 
-    dispatch_async(_callbackQueue, ^{
+    [_delegateQueue addOperationWithBlock:^{
         [self closeWithCode:SRStatusCodeProtocolError reason:message];
         dispatch_async(_workQueue, ^{
             [self _disconnect];
         });
-    });
+    }];
 }
 
 - (void)_failWithError:(NSError *)error;
@@ -587,11 +621,11 @@ static __strong NSData *CRLFCRLF;
     dispatch_async(_workQueue, ^{
         if (self.readyState != SR_CLOSED) {
             _failed = YES;
-            dispatch_async(_callbackQueue, ^{
+            [_delegateQueue addOperationWithBlock:^{
                 if ([self.delegate respondsToSelector:@selector(webSocket:didFailWithError:)]) {
                     [self.delegate webSocket:self didFailWithError:error];
                 }
-            });
+            }];
 
             self.readyState = SR_CLOSED;
             _selfRetain = nil;
@@ -634,11 +668,11 @@ static __strong NSData *CRLFCRLF;
 - (void)handlePing:(NSData *)pingData;
 {
     // Need to pingpong this off _callbackQueue first to make sure messages happen in order
-    dispatch_async(_callbackQueue, ^{
+    [_delegateQueue addOperationWithBlock:^{
         dispatch_async(_workQueue, ^{
             [self _sendFrameWithOpcode:SROpCodePong data:pingData];
         });
-    });
+    }];
 }
 
 - (void)handlePong;
@@ -649,9 +683,9 @@ static __strong NSData *CRLFCRLF;
 - (void)_handleMessage:(id)message
 {
     SRFastLog(@"Received message");
-    dispatch_async(_callbackQueue, ^{
+    [_delegateQueue addOperationWithBlock:^{
         [self.delegate webSocket:self didReceiveMessage:message];
-    });
+    }];
 }
 
 
@@ -994,11 +1028,11 @@ static const uint8_t SRPayloadLenMask   = 0x7F;
         [_inputStream close];
         
         if (!_failed) {
-            dispatch_async(_callbackQueue, ^{
+            [_delegateQueue addOperationWithBlock:^{
                 if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {
                     [self.delegate webSocket:self didCloseWithCode:_closeCode reason:_closeReason wasClean:YES];
                 }
-            });
+            }];
         }
         
         _selfRetain = nil;
@@ -1339,11 +1373,11 @@ static const size_t SRFrameHeaderOverhead = 32;
                     if (!_sentClose && !_failed) {
                         _sentClose = YES;
                         // If we get closed in this state it's probably not clean because we should be sending this when we send messages
-                        dispatch_async(_callbackQueue, ^{
+                        [_delegateQueue addOperationWithBlock:^{
                             if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {
                                 [self.delegate webSocket:self didCloseWithCode:0 reason:@"Stream end encountered" wasClean:NO];
                             }
-                        });
+                        }];
                     }
                 }
                 
@@ -1550,4 +1584,65 @@ static inline int32_t validate_dispatch_data_partial_string(NSData *data) {
 
 #endif
 
+static _SRRunLoopThread *networkThread = nil;
+static NSRunLoop *networkRunLoop = nil;
+
+@implementation NSRunLoop (SRWebSocket)
+
++ (NSRunLoop *)SR_networkRunLoop {
+    static dispatch_once_t onceToken;
+    dispatch_once(&onceToken, ^{
+        networkThread = [[_SRRunLoopThread alloc] init];
+        networkThread.name = @"com.squareup.SocketRocket.NetworkThread";
+        [networkThread start];
+        networkRunLoop = networkThread.runLoop;
+    });
+    
+    return networkRunLoop;
+}
+
+@end
+
+
+@implementation _SRRunLoopThread {
+    dispatch_group_t _waitGroup;
+}
+
+@synthesize runLoop = _runLoop;
+
+- (void)dealloc
+{
+    dispatch_release(_waitGroup);
+}
+
+- (id)init
+{
+    self = [super init];
+    if (self) {
+        _waitGroup = dispatch_group_create();
+        dispatch_group_enter(_waitGroup);
+    }
+    return self;
+}
+
+- (void)main;
+{
+    _runLoop = [NSRunLoop currentRunLoop];
+    dispatch_group_leave(_waitGroup);
+    
+    NSTimer *timer = [[NSTimer alloc] initWithFireDate:[NSDate distantFuture] interval:0.0 target:nil selector:nil userInfo:nil repeats:NO];
+    [_runLoop addTimer:timer forMode:NSDefaultRunLoopMode];
+    
+    while ([_runLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]]) {
+        
+    }
+    assert(NO);
+}
+
+- (NSRunLoop *)runLoop;
+{
+    dispatch_group_wait(_waitGroup, DISPATCH_TIME_FOREVER);
+    return _runLoop;
+}
 
+@end