Browse Source

WIP: Processor Queue

xcbosa-itx 2 years ago
parent
commit
8d727f2ac3

BIN
.DS_Store


+ 1 - 0
CMakeLists.txt

@@ -7,6 +7,7 @@ aux_source_directory(. DIR_SRCS)
 
 
 add_subdirectory(utils)
 add_subdirectory(utils)
 add_subdirectory(httpserver)
 add_subdirectory(httpserver)
+add_subdirectory(processor)
 
 
 add_executable(FRPCWebUI ${DIR_SRCS})
 add_executable(FRPCWebUI ${DIR_SRCS})
 
 

BIN
data/html/testimg.png


+ 1 - 1
httpserver/CMakeLists.txt

@@ -2,4 +2,4 @@ set(CMAKE_CXX_STANDARD 17)
 
 
 aux_source_directory(. DIR_SRC)
 aux_source_directory(. DIR_SRC)
 
 
-add_library(HttpServer ${DIR_SRC} http-server.h)
+add_library(HttpServer ${DIR_SRC})

+ 24 - 4
httpserver/ClientConnection.cpp

@@ -3,8 +3,10 @@
 //
 //
 
 
 #include "ClientConnection.h"
 #include "ClientConnection.h"
+#include "../processor/processor.h"
 
 
 using namespace std;
 using namespace std;
+using namespace xc::processor;
 
 
 namespace xc {
 namespace xc {
     namespace httpserver {
     namespace httpserver {
@@ -101,6 +103,28 @@ namespace xc {
 
 
                 RequestData requestData(url, method, headers, body.str());
                 RequestData requestData(url, method, headers, body.str());
                 cout << "[HTTPServer] Received " << method << " Request URL = " << url << endl;
                 cout << "[HTTPServer] Received " << method << " Request URL = " << url << endl;
+                RequestProcessTask task(requestData);
+                processor::enqueueTask(&task);
+                ::time_t start, now;
+                ::time(&start);
+                while (true) {
+                    usleep(1000 * 10);
+                    if (task.isFinish()) {
+                        ResponseData *taskResponse = task.getResponse();
+                        taskResponse->writeTo(clWrite);
+                        delete taskResponse;
+                        break;
+                    }
+                    ::time(&now);
+                    if (::difftime(now, start) > conf::taskProcessTimeoutSeconds) {
+                        cout << "[HTTPServer-Warn] Task failed because time out" << endl;
+                        conf::errorPageTimeout.writeTo(clWrite);
+                        processor::deleteTask(&task);
+                        break;
+                    }
+                }
+                cleanUpAndDestroy();
+                return;
             } else {
             } else {
                 conf::errorPage.applyReplacements(400, {
                 conf::errorPage.applyReplacements(400, {
                     Replacement("errorCode", "400"),
                     Replacement("errorCode", "400"),
@@ -109,10 +133,6 @@ namespace xc {
                 cleanUpAndDestroy();
                 cleanUpAndDestroy();
                 return;
                 return;
             }
             }
-
-            TextResponseData(200, string(requestBuff)).writeTo(clWrite);
-            cleanUpAndDestroy();
-            return;
         }
         }
 
 
         void ClientConnection::cleanUpAndDestroy() {
         void ClientConnection::cleanUpAndDestroy() {

+ 1 - 0
httpserver/httpserver-private.h

@@ -16,6 +16,7 @@
 #include <sys/socket.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <netinet/in.h>
 #include <unistd.h>
 #include <unistd.h>
+#include <time.h>
 #include "../webui.h"
 #include "../webui.h"
 
 
 using namespace std;
 using namespace std;

+ 5 - 0
processor/CMakeLists.txt

@@ -0,0 +1,5 @@
+set(CMAKE_CXX_STANDARD 17)
+
+aux_source_directory(. DIR_SRC)
+
+add_library(Processor ${DIR_SRC})

+ 28 - 0
processor/RequestProcessTask.cpp

@@ -0,0 +1,28 @@
+//
+// Created by xcbosa on 2023/1/28.
+//
+
+#include "RequestProcessTask.h"
+
+namespace xc {
+    namespace processor {
+        RequestProcessTask::RequestProcessTask(RequestData requestData): request(requestData) {
+            this->response = nullptr;
+            this->finish = false;
+        }
+
+        void RequestProcessTask::processFinish(ResponseData *responseData) {
+            this->response = responseData;
+            this->finish = true;
+            // Todo-Fix: 当Task已脱离
+        }
+
+        bool RequestProcessTask::isFinish() {
+            return this->isFinish();
+        }
+
+        ResponseData *RequestProcessTask::getResponse() {
+            return this->response;
+        }
+    } // xc
+} // processor

+ 25 - 0
processor/RequestProcessTask.h

@@ -0,0 +1,25 @@
+//
+// Created by xcbosa on 2023/1/28.
+//
+
+#pragma once
+
+#include "processor-private.h"
+
+namespace xc {
+    namespace processor {
+
+        class RequestProcessTask {
+        public:
+            RequestProcessTask(RequestData requestData);
+            void processFinish(ResponseData *responseData);
+            bool isFinish();
+            ResponseData *getResponse();
+        private:
+            bool finish;
+            RequestData request;
+            ResponseData *response; // 由处理程序创建,由 ClientConnection 在发送后使用 delete 释放
+        };
+
+    } // xc
+} // processor

+ 37 - 0
processor/RequestProcessorManager.cpp

@@ -0,0 +1,37 @@
+//
+// Created by xcbosa on 2023/1/28.
+//
+
+#include "RequestProcessorManager.h"
+
+namespace xc {
+    namespace processor {
+        auto taskQueue = deque<RequestProcessTask *>();
+        auto taskQueueMutex = mutex();
+
+        void enqueueTask(RequestProcessTask *task) {
+            taskQueueMutex.lock();
+            taskQueue.push_back(task);
+            taskQueueMutex.unlock();
+        }
+
+        void deleteTask(RequestProcessTask *task) {
+            taskQueueMutex.lock();
+            std::remove_if(taskQueue.begin(), taskQueue.end(), [task](RequestProcessTask *t) { return task == t; });
+            taskQueueMutex.unlock();
+        }
+
+        RequestProcessTask dequeueTaskSync() {
+            while (true) {
+                taskQueueMutex.lock();
+                if (!taskQueue.empty()) {
+                    RequestProcessTask task = *taskQueue.front();
+                    taskQueueMutex.unlock();
+                    return task;
+                }
+                taskQueueMutex.unlock();
+                usleep(1000 * 10);
+            }
+        }
+    }
+}

+ 21 - 0
processor/RequestProcessorManager.h

@@ -0,0 +1,21 @@
+//
+// Created by xcbosa on 2023/1/28.
+//
+
+#pragma once
+
+#include "processor-private.h"
+#include "RequestProcessTask.h"
+
+namespace xc {
+    namespace processor {
+        /*由 ClientConnection 放入请求*/
+        void enqueueTask(RequestProcessTask *task);
+
+        /*当该请求超时,则由 ClientConnection 负责调用此方法删除请求*/
+        void deleteTask(RequestProcessTask *task);
+
+        /*从处理队列中取出一个,如果没有则等待*/
+        RequestProcessTask dequeueTaskSync();
+    }
+}

+ 21 - 0
processor/processor-private.h

@@ -0,0 +1,21 @@
+//
+// Created by xcbosa on 2023/1/28.
+//
+
+#pragma once
+
+#include <vector>
+#include <map>
+#include <deque>
+#include <string>
+#include <iostream>
+#include <thread>
+#include <mutex>
+#include <sstream>
+#include <fstream>
+#include <unistd.h>
+
+#include "../utils/utils.h"
+
+using namespace std;
+using namespace xc::utils;

+ 8 - 0
processor/processor.h

@@ -0,0 +1,8 @@
+//
+// Created by xcbosa on 2023/1/28.
+//
+
+#pragma once
+
+#include "RequestProcessorManager.h"
+#include "RequestProcessTask.h"

+ 1 - 0
utils/BinaryResponseData.h

@@ -5,6 +5,7 @@
 #pragma once
 #pragma once
 
 
 #include "utils-private.h"
 #include "utils-private.h"
+#include "ResponseData.h"
 
 
 namespace xc {
 namespace xc {
     namespace utils {
     namespace utils {

+ 20 - 0
webuiconf.h

@@ -5,6 +5,7 @@
 #pragma once
 #pragma once
 
 
 #include <vector>
 #include <vector>
+#include <map>
 #include "utils/utils.h"
 #include "utils/utils.h"
 
 
 using namespace std;
 using namespace std;
@@ -12,8 +13,22 @@ using namespace xc::utils;
 
 
 namespace xc::conf {
 namespace xc::conf {
     const int clientSocketTimeoutSeconds = 3;
     const int clientSocketTimeoutSeconds = 3;
+    const int taskProcessTimeoutSeconds = 1;
     const int mtu = 1536;
     const int mtu = 1536;
 
 
+    const map<string, string> fileExtensionToMimeTypes = {
+            { ".html", "text/html" },
+            { ".htm", "text/html" },
+            { ".js", "text/javascript" },
+            { ".xcnb", "text/xc-notebook" },
+            { ".ccdproj", "application/c-code-develop-project" },
+            { ".png", "image/png" },
+            { ".jpg", "image/jpeg" },
+            { ".jpeg", "image/jpeg" },
+            { ".tiff", "image/tiff" },
+            { "default", "application/octet-stream" },
+    };
+
     const IncompleteFileResponseData errorPage(FileResponseData(500, "html/error.html", "text/html"));
     const IncompleteFileResponseData errorPage(FileResponseData(500, "html/error.html", "text/html"));
 
 
     const auto errorPage400 = errorPage.applyReplacements(400, {
     const auto errorPage400 = errorPage.applyReplacements(400, {
@@ -30,4 +45,9 @@ namespace xc::conf {
         Replacement("errorMessage", "服务器内部错误,可能是服务器访问量过大,请稍后重试"),
         Replacement("errorMessage", "服务器内部错误,可能是服务器访问量过大,请稍后重试"),
         Replacement("errorCode", "500")
         Replacement("errorCode", "500")
     });
     });
+
+    const auto errorPageTimeout = errorPage.applyReplacements(550, {
+            Replacement("errorMessage", "服务器任务处理已超时,可能服务器访问量过大,请稍后重试"),
+            Replacement("errorCode", "550")
+    });
 }
 }