|
@@ -136,17 +136,38 @@ static void job_txn_del_job(Job *job)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-static int job_txn_apply(JobTxn *txn, int fn(Job *))
|
|
|
+static int job_txn_apply(Job *job, int fn(Job *))
|
|
|
{
|
|
|
- Job *job, *next;
|
|
|
+ AioContext *inner_ctx;
|
|
|
+ Job *other_job, *next;
|
|
|
+ JobTxn *txn = job->txn;
|
|
|
int rc = 0;
|
|
|
|
|
|
- QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
|
|
|
- rc = fn(job);
|
|
|
+ /*
|
|
|
+ * Similar to job_completed_txn_abort, we take each job's lock before
|
|
|
+ * applying fn, but since we assume that outer_ctx is held by the caller,
|
|
|
+ * we need to release it here to avoid holding the lock twice - which would
|
|
|
+ * break AIO_WAIT_WHILE from within fn.
|
|
|
+ */
|
|
|
+ job_ref(job);
|
|
|
+ aio_context_release(job->aio_context);
|
|
|
+
|
|
|
+ QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
|
|
|
+ inner_ctx = other_job->aio_context;
|
|
|
+ aio_context_acquire(inner_ctx);
|
|
|
+ rc = fn(other_job);
|
|
|
+ aio_context_release(inner_ctx);
|
|
|
if (rc) {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Note that job->aio_context might have been changed by calling fn, so we
|
|
|
+ * can't use a local variable to cache it.
|
|
|
+ */
|
|
|
+ aio_context_acquire(job->aio_context);
|
|
|
+ job_unref(job);
|
|
|
return rc;
|
|
|
}
|
|
|
|
|
@@ -774,11 +795,11 @@ static void job_do_finalize(Job *job)
|
|
|
assert(job && job->txn);
|
|
|
|
|
|
/* prepare the transaction to complete */
|
|
|
- rc = job_txn_apply(job->txn, job_prepare);
|
|
|
+ rc = job_txn_apply(job, job_prepare);
|
|
|
if (rc) {
|
|
|
job_completed_txn_abort(job);
|
|
|
} else {
|
|
|
- job_txn_apply(job->txn, job_finalize_single);
|
|
|
+ job_txn_apply(job, job_finalize_single);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -824,10 +845,10 @@ static void job_completed_txn_success(Job *job)
|
|
|
assert(other_job->ret == 0);
|
|
|
}
|
|
|
|
|
|
- job_txn_apply(txn, job_transition_to_pending);
|
|
|
+ job_txn_apply(job, job_transition_to_pending);
|
|
|
|
|
|
/* If no jobs need manual finalization, automatically do so */
|
|
|
- if (job_txn_apply(txn, job_needs_finalize) == 0) {
|
|
|
+ if (job_txn_apply(job, job_needs_finalize) == 0) {
|
|
|
job_do_finalize(job);
|
|
|
}
|
|
|
}
|
|
@@ -849,9 +870,10 @@ static void job_completed(Job *job)
|
|
|
static void job_exit(void *opaque)
|
|
|
{
|
|
|
Job *job = (Job *)opaque;
|
|
|
- AioContext *ctx = job->aio_context;
|
|
|
+ AioContext *ctx;
|
|
|
|
|
|
- aio_context_acquire(ctx);
|
|
|
+ job_ref(job);
|
|
|
+ aio_context_acquire(job->aio_context);
|
|
|
|
|
|
/* This is a lie, we're not quiescent, but still doing the completion
|
|
|
* callbacks. However, completion callbacks tend to involve operations that
|
|
@@ -862,6 +884,14 @@ static void job_exit(void *opaque)
|
|
|
|
|
|
job_completed(job);
|
|
|
|
|
|
+ /*
|
|
|
+ * Note that calling job_completed can move the job to a different
|
|
|
+ * aio_context, so we cannot cache from above. job_txn_apply takes care of
|
|
|
+ * acquiring the new lock, and we ref/unref to avoid job_completed freeing
|
|
|
+ * the job underneath us.
|
|
|
+ */
|
|
|
+ ctx = job->aio_context;
|
|
|
+ job_unref(job);
|
|
|
aio_context_release(ctx);
|
|
|
}
|
|
|
|