qemu-coroutine-lock.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. /*
  2. * coroutine queues and locks
  3. *
  4. * Copyright (c) 2011 Kevin Wolf <kwolf@redhat.com>
  5. *
  6. * Permission is hereby granted, free of charge, to any person obtaining a copy
  7. * of this software and associated documentation files (the "Software"), to deal
  8. * in the Software without restriction, including without limitation the rights
  9. * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. * copies of the Software, and to permit persons to whom the Software is
  11. * furnished to do so, subject to the following conditions:
  12. *
  13. * The above copyright notice and this permission notice shall be included in
  14. * all copies or substantial portions of the Software.
  15. *
  16. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  19. * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. * THE SOFTWARE.
  23. *
  24. * The lock-free mutex implementation is based on OSv
  25. * (core/lfmutex.cc, include/lockfree/mutex.hh).
  26. * Copyright (C) 2013 Cloudius Systems, Ltd.
  27. */
  28. #include "qemu/osdep.h"
  29. #include "qemu/coroutine.h"
  30. #include "qemu/coroutine_int.h"
  31. #include "qemu/processor.h"
  32. #include "qemu/queue.h"
  33. #include "block/aio.h"
  34. #include "trace.h"
  35. void qemu_co_queue_init(CoQueue *queue)
  36. {
  37. QSIMPLEQ_INIT(&queue->entries);
  38. }
  39. void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock)
  40. {
  41. Coroutine *self = qemu_coroutine_self();
  42. QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
  43. if (lock) {
  44. qemu_lockable_unlock(lock);
  45. }
  46. /* There is no race condition here. Other threads will call
  47. * aio_co_schedule on our AioContext, which can reenter this
  48. * coroutine but only after this yield and after the main loop
  49. * has gone through the next iteration.
  50. */
  51. qemu_coroutine_yield();
  52. assert(qemu_in_coroutine());
  53. /* TODO: OSv implements wait morphing here, where the wakeup
  54. * primitive automatically places the woken coroutine on the
  55. * mutex's queue. This avoids the thundering herd effect.
  56. * This could be implemented for CoMutexes, but not really for
  57. * other cases of QemuLockable.
  58. */
  59. if (lock) {
  60. qemu_lockable_lock(lock);
  61. }
  62. }
  63. static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
  64. {
  65. Coroutine *next;
  66. if (QSIMPLEQ_EMPTY(&queue->entries)) {
  67. return false;
  68. }
  69. while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) {
  70. QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
  71. aio_co_wake(next);
  72. if (single) {
  73. break;
  74. }
  75. }
  76. return true;
  77. }
  78. bool coroutine_fn qemu_co_queue_next(CoQueue *queue)
  79. {
  80. assert(qemu_in_coroutine());
  81. return qemu_co_queue_do_restart(queue, true);
  82. }
  83. void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue)
  84. {
  85. assert(qemu_in_coroutine());
  86. qemu_co_queue_do_restart(queue, false);
  87. }
  88. bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock)
  89. {
  90. Coroutine *next;
  91. next = QSIMPLEQ_FIRST(&queue->entries);
  92. if (!next) {
  93. return false;
  94. }
  95. QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
  96. if (lock) {
  97. qemu_lockable_unlock(lock);
  98. }
  99. aio_co_wake(next);
  100. if (lock) {
  101. qemu_lockable_lock(lock);
  102. }
  103. return true;
  104. }
  105. bool qemu_co_queue_empty(CoQueue *queue)
  106. {
  107. return QSIMPLEQ_FIRST(&queue->entries) == NULL;
  108. }
  109. /* The wait records are handled with a multiple-producer, single-consumer
  110. * lock-free queue. There cannot be two concurrent pop_waiter() calls
  111. * because pop_waiter() can only be called while mutex->handoff is zero.
  112. * This can happen in three cases:
  113. * - in qemu_co_mutex_unlock, before the hand-off protocol has started.
  114. * In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and
  115. * not take part in the handoff.
  116. * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from
  117. * qemu_co_mutex_unlock. In this case, qemu_co_mutex_unlock will fail
  118. * the cmpxchg (it will see either 0 or the next sequence value) and
  119. * exit. The next hand-off cannot begin until qemu_co_mutex_lock has
  120. * woken up someone.
  121. * - in qemu_co_mutex_unlock, if it takes the hand-off token itself.
  122. * In this case another iteration starts with mutex->handoff == 0;
  123. * a concurrent qemu_co_mutex_lock will fail the cmpxchg, and
  124. * qemu_co_mutex_unlock will go back to case (1).
  125. *
  126. * The following functions manage this queue.
  127. */
  128. typedef struct CoWaitRecord {
  129. Coroutine *co;
  130. QSLIST_ENTRY(CoWaitRecord) next;
  131. } CoWaitRecord;
  132. static void push_waiter(CoMutex *mutex, CoWaitRecord *w)
  133. {
  134. w->co = qemu_coroutine_self();
  135. QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
  136. }
  137. static void move_waiters(CoMutex *mutex)
  138. {
  139. QSLIST_HEAD(, CoWaitRecord) reversed;
  140. QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
  141. while (!QSLIST_EMPTY(&reversed)) {
  142. CoWaitRecord *w = QSLIST_FIRST(&reversed);
  143. QSLIST_REMOVE_HEAD(&reversed, next);
  144. QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
  145. }
  146. }
  147. static CoWaitRecord *pop_waiter(CoMutex *mutex)
  148. {
  149. CoWaitRecord *w;
  150. if (QSLIST_EMPTY(&mutex->to_pop)) {
  151. move_waiters(mutex);
  152. if (QSLIST_EMPTY(&mutex->to_pop)) {
  153. return NULL;
  154. }
  155. }
  156. w = QSLIST_FIRST(&mutex->to_pop);
  157. QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
  158. return w;
  159. }
  160. static bool has_waiters(CoMutex *mutex)
  161. {
  162. return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
  163. }
  164. void qemu_co_mutex_init(CoMutex *mutex)
  165. {
  166. memset(mutex, 0, sizeof(*mutex));
  167. }
  168. static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
  169. {
  170. /* Read co before co->ctx; pairs with smp_wmb() in
  171. * qemu_coroutine_enter().
  172. */
  173. smp_read_barrier_depends();
  174. mutex->ctx = co->ctx;
  175. aio_co_wake(co);
  176. }
  177. static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
  178. CoMutex *mutex)
  179. {
  180. Coroutine *self = qemu_coroutine_self();
  181. CoWaitRecord w;
  182. unsigned old_handoff;
  183. trace_qemu_co_mutex_lock_entry(mutex, self);
  184. w.co = self;
  185. push_waiter(mutex, &w);
  186. /* This is the "Responsibility Hand-Off" protocol; a lock() picks from
  187. * a concurrent unlock() the responsibility of waking somebody up.
  188. */
  189. old_handoff = atomic_mb_read(&mutex->handoff);
  190. if (old_handoff &&
  191. has_waiters(mutex) &&
  192. atomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
  193. /* There can be no concurrent pops, because there can be only
  194. * one active handoff at a time.
  195. */
  196. CoWaitRecord *to_wake = pop_waiter(mutex);
  197. Coroutine *co = to_wake->co;
  198. if (co == self) {
  199. /* We got the lock ourselves! */
  200. assert(to_wake == &w);
  201. mutex->ctx = ctx;
  202. return;
  203. }
  204. qemu_co_mutex_wake(mutex, co);
  205. }
  206. qemu_coroutine_yield();
  207. trace_qemu_co_mutex_lock_return(mutex, self);
  208. }
  209. void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
  210. {
  211. AioContext *ctx = qemu_get_current_aio_context();
  212. Coroutine *self = qemu_coroutine_self();
  213. int waiters, i;
  214. /* Running a very small critical section on pthread_mutex_t and CoMutex
  215. * shows that pthread_mutex_t is much faster because it doesn't actually
  216. * go to sleep. What happens is that the critical section is shorter
  217. * than the latency of entering the kernel and thus FUTEX_WAIT always
  218. * fails. With CoMutex there is no such latency but you still want to
  219. * avoid wait and wakeup. So introduce it artificially.
  220. */
  221. i = 0;
  222. retry_fast_path:
  223. waiters = atomic_cmpxchg(&mutex->locked, 0, 1);
  224. if (waiters != 0) {
  225. while (waiters == 1 && ++i < 1000) {
  226. if (atomic_read(&mutex->ctx) == ctx) {
  227. break;
  228. }
  229. if (atomic_read(&mutex->locked) == 0) {
  230. goto retry_fast_path;
  231. }
  232. cpu_relax();
  233. }
  234. waiters = atomic_fetch_inc(&mutex->locked);
  235. }
  236. if (waiters == 0) {
  237. /* Uncontended. */
  238. trace_qemu_co_mutex_lock_uncontended(mutex, self);
  239. mutex->ctx = ctx;
  240. } else {
  241. qemu_co_mutex_lock_slowpath(ctx, mutex);
  242. }
  243. mutex->holder = self;
  244. self->locks_held++;
  245. }
  246. void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
  247. {
  248. Coroutine *self = qemu_coroutine_self();
  249. trace_qemu_co_mutex_unlock_entry(mutex, self);
  250. assert(mutex->locked);
  251. assert(mutex->holder == self);
  252. assert(qemu_in_coroutine());
  253. mutex->ctx = NULL;
  254. mutex->holder = NULL;
  255. self->locks_held--;
  256. if (atomic_fetch_dec(&mutex->locked) == 1) {
  257. /* No waiting qemu_co_mutex_lock(). Pfew, that was easy! */
  258. return;
  259. }
  260. for (;;) {
  261. CoWaitRecord *to_wake = pop_waiter(mutex);
  262. unsigned our_handoff;
  263. if (to_wake) {
  264. qemu_co_mutex_wake(mutex, to_wake->co);
  265. break;
  266. }
  267. /* Some concurrent lock() is in progress (we know this because
  268. * mutex->locked was >1) but it hasn't yet put itself on the wait
  269. * queue. Pick a sequence number for the handoff protocol (not 0).
  270. */
  271. if (++mutex->sequence == 0) {
  272. mutex->sequence = 1;
  273. }
  274. our_handoff = mutex->sequence;
  275. atomic_mb_set(&mutex->handoff, our_handoff);
  276. if (!has_waiters(mutex)) {
  277. /* The concurrent lock has not added itself yet, so it
  278. * will be able to pick our handoff.
  279. */
  280. break;
  281. }
  282. /* Try to do the handoff protocol ourselves; if somebody else has
  283. * already taken it, however, we're done and they're responsible.
  284. */
  285. if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
  286. break;
  287. }
  288. }
  289. trace_qemu_co_mutex_unlock_return(mutex, self);
  290. }
  291. void qemu_co_rwlock_init(CoRwlock *lock)
  292. {
  293. memset(lock, 0, sizeof(*lock));
  294. qemu_co_queue_init(&lock->queue);
  295. qemu_co_mutex_init(&lock->mutex);
  296. }
  297. void qemu_co_rwlock_rdlock(CoRwlock *lock)
  298. {
  299. Coroutine *self = qemu_coroutine_self();
  300. qemu_co_mutex_lock(&lock->mutex);
  301. /* For fairness, wait if a writer is in line. */
  302. while (lock->pending_writer) {
  303. qemu_co_queue_wait(&lock->queue, &lock->mutex);
  304. }
  305. lock->reader++;
  306. qemu_co_mutex_unlock(&lock->mutex);
  307. /* The rest of the read-side critical section is run without the mutex. */
  308. self->locks_held++;
  309. }
  310. void qemu_co_rwlock_unlock(CoRwlock *lock)
  311. {
  312. Coroutine *self = qemu_coroutine_self();
  313. assert(qemu_in_coroutine());
  314. if (!lock->reader) {
  315. /* The critical section started in qemu_co_rwlock_wrlock. */
  316. qemu_co_queue_restart_all(&lock->queue);
  317. } else {
  318. self->locks_held--;
  319. qemu_co_mutex_lock(&lock->mutex);
  320. lock->reader--;
  321. assert(lock->reader >= 0);
  322. /* Wakeup only one waiting writer */
  323. if (!lock->reader) {
  324. qemu_co_queue_next(&lock->queue);
  325. }
  326. }
  327. qemu_co_mutex_unlock(&lock->mutex);
  328. }
  329. void qemu_co_rwlock_downgrade(CoRwlock *lock)
  330. {
  331. Coroutine *self = qemu_coroutine_self();
  332. /* lock->mutex critical section started in qemu_co_rwlock_wrlock or
  333. * qemu_co_rwlock_upgrade.
  334. */
  335. assert(lock->reader == 0);
  336. lock->reader++;
  337. qemu_co_mutex_unlock(&lock->mutex);
  338. /* The rest of the read-side critical section is run without the mutex. */
  339. self->locks_held++;
  340. }
  341. void qemu_co_rwlock_wrlock(CoRwlock *lock)
  342. {
  343. qemu_co_mutex_lock(&lock->mutex);
  344. lock->pending_writer++;
  345. while (lock->reader) {
  346. qemu_co_queue_wait(&lock->queue, &lock->mutex);
  347. }
  348. lock->pending_writer--;
  349. /* The rest of the write-side critical section is run with
  350. * the mutex taken, so that lock->reader remains zero.
  351. * There is no need to update self->locks_held.
  352. */
  353. }
  354. void qemu_co_rwlock_upgrade(CoRwlock *lock)
  355. {
  356. Coroutine *self = qemu_coroutine_self();
  357. qemu_co_mutex_lock(&lock->mutex);
  358. assert(lock->reader > 0);
  359. lock->reader--;
  360. lock->pending_writer++;
  361. while (lock->reader) {
  362. qemu_co_queue_wait(&lock->queue, &lock->mutex);
  363. }
  364. lock->pending_writer--;
  365. /* The rest of the write-side critical section is run with
  366. * the mutex taken, similar to qemu_co_rwlock_wrlock. Do
  367. * not account for the lock twice in self->locks_held.
  368. */
  369. self->locks_held--;
  370. }