|
@@ -58,7 +58,6 @@ struct ThreadPool {
|
|
QemuMutex lock;
|
|
QemuMutex lock;
|
|
QemuCond worker_stopped;
|
|
QemuCond worker_stopped;
|
|
QemuSemaphore sem;
|
|
QemuSemaphore sem;
|
|
- int max_threads;
|
|
|
|
QEMUBH *new_thread_bh;
|
|
QEMUBH *new_thread_bh;
|
|
|
|
|
|
/* The following variables are only accessed from one AioContext. */
|
|
/* The following variables are only accessed from one AioContext. */
|
|
@@ -71,8 +70,27 @@ struct ThreadPool {
|
|
int new_threads; /* backlog of threads we need to create */
|
|
int new_threads; /* backlog of threads we need to create */
|
|
int pending_threads; /* threads created but not running yet */
|
|
int pending_threads; /* threads created but not running yet */
|
|
bool stopping;
|
|
bool stopping;
|
|
|
|
+ int min_threads;
|
|
|
|
+ int max_threads;
|
|
};
|
|
};
|
|
|
|
|
|
|
|
+static inline bool back_to_sleep(ThreadPool *pool, int ret)
|
|
|
|
+{
|
|
|
|
+ /*
|
|
|
|
+ * The semaphore timed out, we should exit the loop except when:
|
|
|
|
+ * - There is work to do, we raced with the signal.
|
|
|
|
+ * - The max threads threshold just changed, we raced with the signal.
|
|
|
|
+ * - The thread pool forces a minimum number of readily available threads.
|
|
|
|
+ */
|
|
|
|
+ if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
|
|
|
|
+ pool->cur_threads > pool->max_threads ||
|
|
|
|
+ pool->cur_threads <= pool->min_threads)) {
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return false;
|
|
|
|
+}
|
|
|
|
+
|
|
static void *worker_thread(void *opaque)
|
|
static void *worker_thread(void *opaque)
|
|
{
|
|
{
|
|
ThreadPool *pool = opaque;
|
|
ThreadPool *pool = opaque;
|
|
@@ -91,8 +109,9 @@ static void *worker_thread(void *opaque)
|
|
ret = qemu_sem_timedwait(&pool->sem, 10000);
|
|
ret = qemu_sem_timedwait(&pool->sem, 10000);
|
|
qemu_mutex_lock(&pool->lock);
|
|
qemu_mutex_lock(&pool->lock);
|
|
pool->idle_threads--;
|
|
pool->idle_threads--;
|
|
- } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
|
|
|
|
- if (ret == -1 || pool->stopping) {
|
|
|
|
|
|
+ } while (back_to_sleep(pool, ret));
|
|
|
|
+ if (ret == -1 || pool->stopping ||
|
|
|
|
+ pool->cur_threads > pool->max_threads) {
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -294,6 +313,33 @@ void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
|
|
thread_pool_submit_aio(pool, func, arg, NULL, NULL);
|
|
thread_pool_submit_aio(pool, func, arg, NULL, NULL);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
|
|
|
|
+{
|
|
|
|
+ qemu_mutex_lock(&pool->lock);
|
|
|
|
+
|
|
|
|
+ pool->min_threads = ctx->thread_pool_min;
|
|
|
|
+ pool->max_threads = ctx->thread_pool_max;
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * We either have to:
|
|
|
|
+ * - Increase the number available of threads until over the min_threads
|
|
|
|
+ * threshold.
|
|
|
|
+ * - Decrease the number of available threads until under the max_threads
|
|
|
|
+ * threshold.
|
|
|
|
+ * - Do nothing. The current number of threads fall in between the min and
|
|
|
|
+ * max thresholds. We'll let the pool manage itself.
|
|
|
|
+ */
|
|
|
|
+ for (int i = pool->cur_threads; i < pool->min_threads; i++) {
|
|
|
|
+ spawn_thread(pool);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for (int i = pool->cur_threads; i > pool->max_threads; i--) {
|
|
|
|
+ qemu_sem_post(&pool->sem);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ qemu_mutex_unlock(&pool->lock);
|
|
|
|
+}
|
|
|
|
+
|
|
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
|
|
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
|
|
{
|
|
{
|
|
if (!ctx) {
|
|
if (!ctx) {
|
|
@@ -306,11 +352,12 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
|
|
qemu_mutex_init(&pool->lock);
|
|
qemu_mutex_init(&pool->lock);
|
|
qemu_cond_init(&pool->worker_stopped);
|
|
qemu_cond_init(&pool->worker_stopped);
|
|
qemu_sem_init(&pool->sem, 0);
|
|
qemu_sem_init(&pool->sem, 0);
|
|
- pool->max_threads = 64;
|
|
|
|
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
|
|
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
|
|
|
|
|
|
QLIST_INIT(&pool->head);
|
|
QLIST_INIT(&pool->head);
|
|
QTAILQ_INIT(&pool->request_list);
|
|
QTAILQ_INIT(&pool->request_list);
|
|
|
|
+
|
|
|
|
+ thread_pool_update_params(pool, ctx);
|
|
}
|
|
}
|
|
|
|
|
|
ThreadPool *thread_pool_new(AioContext *ctx)
|
|
ThreadPool *thread_pool_new(AioContext *ctx)
|