Browse Source

Fixing missing last messages when connection closed immediately after

Mike Lewis 13 years ago
parent
commit
9e37c74527
1 changed files with 41 additions and 30 deletions
  1. 41 30
      SocketRocket/SRWebSocket.m

+ 41 - 30
SocketRocket/SRWebSocket.m

@@ -646,6 +646,7 @@ static __strong NSData *CRLFCRLF;
 
 - (void)_handleMessage:(id)message
 {
+    SRFastLog(@"Received message");
     dispatch_async(_callbackQueue, ^{
         [self.delegate webSocket:self didReceiveMessage:message];
     });
@@ -1004,28 +1005,27 @@ static const uint8_t SRPayloadLenMask   = 0x7F;
 
 - (void)_addConsumerWithScanner:(stream_scanner)consumer callback:(data_callback)callback;
 {
+    assert(dispatch_get_current_queue() == _workQueue);
     [self _addConsumerWithScanner:consumer callback:callback dataLength:0];
 }
 
 - (void)_addConsumerWithDataLength:(size_t)dataLength callback:(data_callback)callback readToCurrentFrame:(BOOL)readToCurrentFrame unmaskBytes:(BOOL)unmaskBytes;
 {   
+    assert(dispatch_get_current_queue() == _workQueue);
     assert(dataLength);
     
-    dispatch_async(_workQueue, ^{
-        [_consumers addObject:[[SRIOConsumer alloc] initWithScanner:nil handler:callback bytesNeeded:dataLength readToCurrentFrame:readToCurrentFrame unmaskBytes:unmaskBytes]];
-        [self _pumpScanner];
-    });
+    [_consumers addObject:[[SRIOConsumer alloc] initWithScanner:nil handler:callback bytesNeeded:dataLength readToCurrentFrame:readToCurrentFrame unmaskBytes:unmaskBytes]];
+    [self _pumpScanner];
 }
 
 - (void)_addConsumerWithScanner:(stream_scanner)consumer callback:(data_callback)callback dataLength:(size_t)dataLength;
 {    
-    dispatch_async(_workQueue, ^{
-        [_consumers addObject:[[SRIOConsumer alloc] initWithScanner:consumer handler:callback bytesNeeded:dataLength readToCurrentFrame:NO unmaskBytes:NO]];
-        [self _pumpScanner];
-    });
+    assert(dispatch_get_current_queue() == _workQueue);
+    [_consumers addObject:[[SRIOConsumer alloc] initWithScanner:consumer handler:callback bytesNeeded:dataLength readToCurrentFrame:NO unmaskBytes:NO]];
+    [self _pumpScanner];
 }
 
-     
+
 static const char CRLFCRLFBytes[] = {'\r', '\n', '\r', '\n'};
 
 - (void)_readUntilHeaderCompleteWithCallback:(data_callback)dataHandler;
@@ -1058,27 +1058,29 @@ static const char CRLFCRLFBytes[] = {'\r', '\n', '\r', '\n'};
     [self _addConsumerWithScanner:consumer callback:dataHandler];
 }
 
--(void)_pumpScanner;
-{
-    assert(dispatch_get_current_queue() == _workQueue);
 
+// Returns true if did work
+- (BOOL)_innerPumpScanner {
+    
+    BOOL didWork = NO;
+    
     if (self.readyState >= SR_CLOSING) {
-        return;
+        return didWork;
     }
     
     if (!_consumers.count) {
-        return;
+        return didWork;
     }
     
     size_t curSize = _readBuffer.length - _readBufferOffset;
     if (!curSize) {
-        return;
+        return didWork;
     }
-
+    
     SRIOConsumer *consumer = [_consumers objectAtIndex:0];
     
     size_t bytesNeeded = consumer.bytesNeeded;
-   
+    
     size_t foundSize = 0;
     if (consumer.consumer) {
         NSData *tempView = [NSData dataWithBytesNoCopy:(char *)_readBuffer.bytes + _readBufferOffset length:_readBuffer.length - _readBufferOffset freeWhenDone:NO];  
@@ -1091,24 +1093,24 @@ static const char CRLFCRLFBytes[] = {'\r', '\n', '\r', '\n'};
             foundSize = curSize;
         }
     }
-
+    
     NSData *slice = nil;
     if (consumer.readToCurrentFrame || foundSize) {
         NSRange sliceRange = NSMakeRange(_readBufferOffset, foundSize);
         slice = [_readBuffer subdataWithRange:sliceRange];
         
         _readBufferOffset += foundSize;
-
+        
         if (_readBufferOffset > 4096 && _readBufferOffset > (_readBuffer.length >> 1)) {
             _readBuffer = [[NSMutableData alloc] initWithBytes:(char *)_readBuffer.bytes + _readBufferOffset length:_readBuffer.length - _readBufferOffset];            _readBufferOffset = 0;
         }
         
         if (consumer.unmaskBytes) {
             NSMutableData *mutableSlice = [slice mutableCopy];
-           
+            
             NSUInteger len = mutableSlice.length;
             uint8_t *bytes = mutableSlice.mutableBytes;
-
+            
             for (int i = 0; i < len; i++) {
                 bytes[i] = bytes[i] ^ _currentReadMaskKey[_currentReadMaskOffset % sizeof(_currentReadMaskKey)];
                 _currentReadMaskOffset += 1;
@@ -1116,12 +1118,12 @@ static const char CRLFCRLFBytes[] = {'\r', '\n', '\r', '\n'};
             
             slice = mutableSlice;
         }
-
+        
         if (consumer.readToCurrentFrame) {
             [_currentFrameData appendData:slice];
             
             _readOpCount += 1;
-
+            
             if (_currentFrameOpcode == SROpCodeTextFrame) {
                 // Validate UTF8 stuff.
                 size_t currentDataSize = _currentFrameData.length;
@@ -1138,28 +1140,36 @@ static const char CRLFCRLFBytes[] = {'\r', '\n', '\r', '\n'};
                         dispatch_async(_workQueue, ^{
                             [self _disconnect];
                         });
-                        return;
+                        return didWork;
                     } else {
                         _currentStringScanPosition += valid_utf8_size;
                     }
                 } 
-
+                
             }
             
             consumer.bytesNeeded -= foundSize;
             
             if (consumer.bytesNeeded == 0) {
-                consumer.handler(self, nil);
                 [_consumers removeObjectAtIndex:0];
+                consumer.handler(self, nil);
+                didWork = YES;
             }
         } else if (foundSize) {
-            consumer.handler(self, slice);
             [_consumers removeObjectAtIndex:0];
+            consumer.handler(self, slice);
+            didWork = YES;
         }
+    }
+    return didWork;
+}
+
+-(void)_pumpScanner;
+{
+    assert(dispatch_get_current_queue() == _workQueue);
+    
+    while ([self _innerPumpScanner]) {
         
-        dispatch_async(_workQueue, ^{
-            [self _pumpScanner];
-        });
     }
 }
 
@@ -1306,6 +1316,7 @@ static const size_t SRFrameHeaderOverhead = 32;
             }
                 
             case NSStreamEventEndEncountered: {
+                [self _pumpScanner];
                 SRFastLog(@"NSStreamEventEndEncountered %@", aStream);
                 if (aStream.streamError) {
                     [self _failWithError:aStream.streamError];