The goal here is to add a new method to transactions that allows
developers to specify a callback that will get invoked only once
all jobs spawned by a transaction are completed, allowing developers
the chance to perform actions conditionally pending complete success,
partial failure, or complete failure.

In order to register the new callback to be invoked, a user must request
a callback pointer and closure by calling new_action_cb_wrapper, which
creates a wrapper around an opaque pointer and callback that would have
originally been passed to e.g. backup_start().

The function will return a function pointer and a new opaque pointer to
be passed instead. The transaction system will effectively intercept the
original callbacks and perform book-keeping on the transaction after it
has delivered the original enveloped callback.

This means that Transaction Action callback methods will be called after
all callbacks triggered by all Actions in the Transactional group have
been received.

This feature has no knowledge of any jobs spawned by Actions that do not
inform the system via new_action_cb_wrapper().

For an example of how to use the feature, please skip ahead to:
'block: drive_backup transaction callback support' which serves as an example
for how to hook up a post-transaction callback to the Drive Backup action.


Note 1: Defining a callback method alone is not sufficient to have the new
        method invoked. You must call new_action_cb_wrapper() AND ensure the
        callback it returns is the one used as the callback for the job
        launched by the action.

Note 2: You can use this feature for any system that registers completions of
        an asynchronous task via a callback of the form
        (void *opaque, int ret), not just block job callbacks.

Signed-off-by: John Snow <js...@redhat.com>
---
 blockdev.c | 191 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 187 insertions(+), 4 deletions(-)

diff --git a/blockdev.c b/blockdev.c
index f806d40..d404251 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -1239,6 +1239,8 @@ typedef struct BlkActionState BlkActionState;
  * @abort: Abort the changes on fail, can be NULL.
  * @clean: Clean up resources after all transaction actions have called
  *         commit() or abort(). Can be NULL.
+ * @cb: Executed after all jobs launched by actions in the transaction finish,
+ *      but only if requested by new_action_cb_wrapper() prior to clean().
  *
  * Only prepare() may fail. In a single transaction, only one of commit() or
  * abort() will be called. clean() will always be called if it is present.
@@ -1249,6 +1251,7 @@ typedef struct BlkActionOps {
     void (*commit)(BlkActionState *common);
     void (*abort)(BlkActionState *common);
     void (*clean)(BlkActionState *common);
+    void (*cb)(BlkActionState *common);
 } BlkActionOps;
 
 /**
@@ -1257,19 +1260,46 @@ typedef struct BlkActionOps {
  * by a transaction group.
  *
  * @jobs: A reference count that tracks how many jobs still need to complete.
+ * @status: A cumulative return code for all actions that have reported
+ *          a return code via callback in the transaction.
  * @actions: A list of all Actions in the Transaction.
+ *           However, once the transaction has completed, it will be only a 
list
+ *           of transactions that have registered a post-transaction callback.
  */
 typedef struct BlkTransactionState {
     int jobs;
+    int status;
     QTAILQ_HEAD(actions, BlkActionState) actions;
 } BlkTransactionState;
 
+typedef void (CallbackFn)(void *opaque, int ret);
+
+/**
+ * BlkActionCallbackData:
+ * Necessary state for intercepting and
+ * re-delivering a callback triggered by an Action.
+ *
+ * @opaque: The data to be given to the encapsulated callback when
+ *          a job launched by an Action completes.
+ * @ret: The status code that was delivered to the encapsulated callback.
+ * @callback: The encapsulated callback to invoke upon completion of
+ *            the Job launched by the Action.
+ */
+typedef struct BlkActionCallbackData {
+    void *opaque;
+    int ret;
+    CallbackFn *callback;
+} BlkActionCallbackData;
+
 /**
  * BlkActionState:
  * Describes one Action's state within a Transaction.
  *
  * @action: QAPI-defined enum identifying which Action to perform.
  * @ops: Table of ActionOps this Action can perform.
+ * @transaction: A pointer back to the Transaction this Action belongs to.
+ * @cb_data: Information on this Action's encapsulated callback, if any.
+ * @refcount: reference count, allowing access to this state beyond clean().
  * @entry: List membership for all Actions in this Transaction.
  *
  * This structure must be arranged as first member in a subclassed type,
@@ -1279,6 +1309,9 @@ typedef struct BlkTransactionState {
 struct BlkActionState {
     TransactionAction *action;
     const BlkActionOps *ops;
+    BlkTransactionState *transaction;
+    BlkActionCallbackData *cb_data;
+    int refcount;
     QTAILQ_ENTRY(BlkActionState) entry;
 };
 
@@ -1294,6 +1327,26 @@ static BlkTransactionState 
*new_blk_transaction_state(void)
     return bts;
 }
 
+static BlkActionState *put_blk_action_state(BlkActionState *state)
+{
+    BlkActionState *bas = state;
+
+    state->refcount--;
+    if (state->refcount == 0) {
+
+        QTAILQ_FOREACH(bas, &state->transaction->actions, entry) {
+            if (bas == state) {
+                QTAILQ_REMOVE(&state->transaction->actions, bas, entry);
+                break;
+            }
+        }
+        g_free(state->cb_data);
+        g_free(state);
+        return NULL;
+    }
+    return state;
+}
+
 static void destroy_blk_transaction_state(BlkTransactionState *bts)
 {
     BlkActionState *bas, *bas_next;
@@ -1301,23 +1354,151 @@ static void 
destroy_blk_transaction_state(BlkTransactionState *bts)
     /* The list should in normal cases be empty,
      * but in case someone really just wants to kibosh the whole deal: */
     QTAILQ_FOREACH_SAFE(bas, &bts->actions, entry, bas_next) {
-        QTAILQ_REMOVE(&bts->actions, bas, entry);
-        g_free(bas);
+        put_blk_action_state(bas);
     }
 
     g_free(bts);
 }
 
+static void transaction_run_cb_action_ops(BlkTransactionState *bts)
+{
+    BlkActionState *bas, *bas_next;
+
+    QTAILQ_FOREACH_SAFE(bas, &bts->actions, entry, bas_next) {
+        if (bas->ops->cb) {
+            bas->ops->cb(bas);
+        }
+
+        g_free(bas->cb_data);
+        bas->cb_data = NULL;
+        put_blk_action_state(bas);
+    }
+}
+
 static BlkTransactionState *transaction_job_complete(BlkTransactionState *bts)
 {
     bts->jobs--;
     if (bts->jobs == 0) {
+        transaction_run_cb_action_ops(bts);
         destroy_blk_transaction_state(bts);
         return NULL;
     }
     return bts;
 }
 
+static void transaction_job_cancel(BlkActionState *bas)
+{
+    assert(bas->transaction->jobs > 1);
+    transaction_job_complete(bas->transaction);
+}
+
+/**
+ * Intercept a callback that was issued due to a transactional action.
+ */
+static void transaction_action_callback(void *opaque, int ret)
+{
+    BlkActionState *common = opaque;
+    BlkActionCallbackData *cb_data = common->cb_data;
+    CallbackFn *callback = cb_data->callback;
+
+    /* Prepare data for ops->cb() */
+    cb_data->callback = NULL;
+    cb_data->ret = ret;
+
+    /* Cumulative return code will track the first error discovered */
+    if (!common->transaction->status) {
+        common->transaction->status = ret;
+    }
+
+    /* Deliver the intercepted callback prior to marking it as completed */
+    callback(cb_data->opaque, cb_data->ret);
+    transaction_job_complete(common->transaction);
+}
+
+/**
+ * Create a new transactional callback wrapper.
+ *
+ * Given a callback and a closure, generate a new
+ * callback and closure that will invoke the
+ * given callback with the given closure.
+ *
+ * After all wrappers in the transactional group have
+ * been processed, each action's .cb() method will be
+ * invoked.
+ *
+ * @common The transaction action state to set a callback for.
+ * @opaque A closure intended for the encapsulated callback.
+ * @callback The callback we are encapsulating.
+ * @new_opaque[out]: The closure to be used instead of @opaque.
+ *
+ * @return The callback to be used instead of @callback.
+ */
+__attribute__((__unused__))
+static CallbackFn *new_action_cb_wrapper(BlkActionState *common,
+                                         void *opaque,
+                                         CallbackFn *callback,
+                                         void **new_opaque)
+{
+    BlkActionCallbackData *cb_data = g_new0(BlkActionCallbackData, 1);
+    assert(new_opaque);
+
+    /* Stash the original callback information */
+    cb_data->opaque = opaque;
+    cb_data->callback = callback;
+    common->cb_data = cb_data;
+
+    /* The BAS itself will serve as our new closure */
+    *new_opaque = common;
+    common->refcount++;
+
+    /* Inform the transaction to expect one more return */
+    common->transaction->jobs++;
+
+    /* Lastly, the actual callback function to handle the interception. */
+    return transaction_action_callback;
+}
+
+/**
+ * Undo any actions performed by the above call.
+ */
+__attribute__((__unused__))
+static void cancel_action_cb_wrapper(BlkActionState *common)
+{
+    /* Stage 0: Wrapper was never created: */
+    if (common->cb_data == NULL) {
+        assert(common->refcount == 1);
+        return;
+    }
+
+    /* Stage 1: Callback was created, but the job never launched.
+     * (We assume that the job cannot possibly be running, because
+     *  we assume it has been synchronously canceled if it was.)
+     *
+     * We also assume that any cleanup normally handled by cb()
+     * is not needed, because it should have been prepared after
+     * the job launched.
+     */
+    if (common->cb_data && common->cb_data->callback) {
+        transaction_job_cancel(common);
+        goto cleanup;
+    }
+
+    /* Stage 2: Job already completed, or was canceled.
+     *
+     * Force an error in the callback data and just invoke the completion
+     * handler to perform appropriate cleanup for us.
+     */
+    if (common->cb_data && !common->cb_data->callback) {
+        common->cb_data->ret = -ECANCELED;
+        common->ops->cb(common);
+    }
+
+ cleanup:
+    g_free(common->cb_data);
+    common->cb_data = NULL;
+    put_blk_action_state(common);
+}
+
 /* internal snapshot private data */
 typedef struct InternalSnapshotState {
     BlkActionState common;
@@ -1927,8 +2108,10 @@ void qmp_transaction(TransactionActionList *dev_list, 
Error **errp)
         assert(ops->instance_size > 0);
 
         state = g_malloc0(ops->instance_size);
+        state->refcount = 1;
         state->ops = ops;
         state->action = dev_info;
+        state->transaction = bts;
         QTAILQ_INSERT_TAIL(&bts->actions, state, entry);
 
         state->ops->prepare(state, &local_err);
@@ -1959,10 +2142,10 @@ exit:
         if (state->ops->clean) {
             state->ops->clean(state);
         }
-        QTAILQ_REMOVE(&bts->actions, state, entry);
-        g_free(state);
+        put_blk_action_state(state);
     }
 
+    /* The implicit 'job' of the transaction itself is complete: */
     transaction_job_complete(bts);
 }
 
-- 
2.1.0


Reply via email to