[ 
https://issues.apache.org/jira/browse/IGNITE-19692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Puchkovskiy updated IGNITE-19692:
---------------------------------------
    Description: 
We need a mechanism that would allow to do the following:
 # Execute an operation on all (or some of) partitions of a table
 # The whole operation is split into sub-operations (each of which operate on a 
single partition)
 # Each sub-operation must be resilient: that is, if the node that hosts it 
restarts or the partition moves to another node, the operation should proceed
 # When a sub-operation ends, it notifies the operation tracker/coordinator
 # When all sub-operations end, the tracker might take some action (like 
starting a subsequent operation)
 # The tracker is also resilient

We need such a mechanism in a few places in the system:
 # Transaction cleanup?
 # Index build
 # Table data validation as a part of a schema change that requires a 
validation (like a narrowing type change)

Probably, more applications of the mechanism will emerge.

 

On the possible implementation: the tracker could be collocated with table's 
primary replica (that would guarantee that at most one tracker exists at all 
times). We could store the data needed to track the operation in the 
Meta-Storage under a prefix corresponding to the table, like 
'ops.<tableId>.<opType>.<opKey>'. We could store the completion status for each 
of the partitions there along with some operation-wide status.

h1. Approved design

h2. Definitions

An *operation* is run on a table as a whole, it consists of {*}jobs{*}, one job 
per partition.

A job might complete successfully or fail (that is, it finishes unsuccessfully; 
unexpected failures are a different matter).

If all jobs of an operation complete successfully, the operation itself 
completes successfully. If any job fails, the operation fails.

Some jobs might track their own progress and can be resumed. Others cannot and 
always start over when asked to start/resume.
h2. Goals
 # The operation must be resilient to a reboot or a leave of any of 
participating node
 # Also, it must be resilient to ‘primary node’ status changes (i.e. if a 
primary of any of table’s partition changes, this should not stall the 
operation forever or fail it completely)

h3. Non-goals
 # It is not proposed that the fact that an operation is running prohibit 
changes to the cluster (like holding a node from a restart or from leaving the 
cluster)
 # It is considered ok that some work would have to be repeated (if, for 
example, a node ceases to be a primary while it executes an operation that can 
only be run on a primary; in such a situation the new primary might restart the 
job from scratch)

h2. General properties

An operation’s jobs are executed in parallel.

If a job that is still not finished is reassigned to another node (due to 
primary change or due to logical topology change), the old executor stops 
executing the job (and cleans up if the job wrote something to disk); the new 
executor adopts the job and starts it from scratch.

If a node restarts and finds that it has a non-finished resumable job to which 
it’s still assigned, it resumes the job.
h2. Operation attributes
 * Type ID (a string identifying operation type, like ‘validate’, ‘buildIndex’)
 * Whether more than one operation instances of the type may be run on a table 
(example: ‘build index’ might build a few indices at the same time) or not (in 
the first version, we might only support operations that support many operation 
instances on a table at the same time)
 * Job logic run when a job is started/resumed
 * Job logic run when a job is canceled
 * Operation logic run when an operation is completed (success or failure)

Some operations can only succeed normally (‘build index’ is an example), others 
can also finish unsuccessfully [that is, end up in the ‘failed’ state] 
(‘validate table data’ is an example).

Some operations persist their progress on the disk (‘build index’ does this), 
others are ‘volatile’ and always start over (‘validate table data’ might work 
in this way, but it could also persist the position in the ‘cursor’ over the 
data being validated).
h2. Provisioned operations
 # Build an index. Must be executed on a primary; a few operations might be run 
concurrently (each for its own index); cannot fail; persists its progress
 # Validate table data (for constraints where indices are not needed): checks 
whether every actual (i.e. not overwritten) tuple satisfies a restriction (NOT 
NULL/CHECK constraint/that it fits in a type range). May be executed on a 
secondary; a few operations might be run concurrently; can fail; might be 
volatile or persist its progress

h2. Roles

There are two roles:
 # Operation {*}coordinator{*}, there is at most one coordinator per operation 
instance (this is achieved by colocating a coordinator with the primary replica 
of partition 0 of the table); its responsibilities are:
 ## Assigning executors to operation’s jobs (should not be implemented in the 
first version where only operations which jobs must be run on primaries are 
supported)
 ## Tracking operation completion
 ## Executing the completion logic


 # {*}Executor{*}, (any node may act as an executor); its responsibilities are:
 ## Start/resume job execution
 ## Cancel job and cleanup afterwards if this is needed
 ## Notify the coordinator about a job outcome (success or failure)

h2. Communication

All communication between participants happens via MetaStorage.
h2. MetaStorage

For operations which only allow one operation per table at a time, the 
operation root path is {_}distOp.<table-id>.<op-type>.<instance-id>{_}; for 
those that allow multiple operations at a time it’s 
_distOp.<table-id>.<op-type>.<object-id>.<instance-id>_ (for example, for 
‘build index’ <object-id> is an ID of an index).

The keys are:
 * _distOp.<table-id>.runOps_ contains a set of type IDs of currently run 
operations, used to maintain compatibility
 * {_}distOp.<table-id>.<op-type>.nextInstanceId{_}: ID of the next operation 
instance (per table per operation type)
 * {_}<rootPath>.arg{_}: serialized operation argument (for instance, for 
‘validate data’ it would contain the activation ts of the validating schema 
version)
 * _<rootPath>.state_ (running/success/failed): the operation state
 * _<rootPath>.<partition-id>.state_ (<absent>/success/failed): a job state

h2. How it works
h3. At startup
 # Subscribe to MetaStorage events to track operation completion (to detect its 
failure)
 # If there are persisted jobs on the node, for each of them find the 
corresponding operation and job in the MetaStorage; if the operation does not 
exist anymore OR the job is completed (has status success/failed) OR the 
current node is not assigned to the job anymore, then the local job must be 
cleaned up; otherwise, it should be resumed.

h3. When becoming a primary of partition 0 of a table (including startup)

The node becomes an operation coordinator for the table.
 # Subscribe to changes of job statuses in the MetaStorage (to track operation 
completion)
 # For each operation of the table that is in the ‘running’ state, if jobs’ 
states indicate completion, complete the operation

h3. When becoming a primary of a partition of a table (including startup)

Start/resume the job that belongs to the partition if it is not finished
h3. When losing primary status for a partition

Cancel the job run by this node for the partition (if there is such a job)
h3. Coordinator: when seeing a change of the status of a job of a coordinated 
operation

If the completion criteria are reached, complete the operation.
h3. Coordinator: when losing status of a primary of partition 0 of a table
 # Unsubscribe from the corresponding events (see the previous section)

h3. When seeing an operation status key disappearing

If running any jobs of the operation, cancel them all locally.
h3. When a job finishes (successfully or fails)
 # If it failed, execute cancel/cleanup code
 # Mark the job final status (success/failed) in the MetaStorage using a 
conditional invoke: if the operation’s status is ‘running’, AND the job status 
is <absent>, THEN set the job status to the new value
 # If the invoke fails, ignore the failure: the job has already been completed

h3. How the coordinator completes an operation

Operation’s completion code may produce any side-effects, but they should be 
idempotent as the operation’s completion code might be executed more than once 
(to make sure the operation gets completed in any case).
 # Read the operation status and runOps and compute runOps without the operation
 # If the status is not ‘running’, exit the procedure: the operation has 
already been completed
 # Execute the operation’s completion code
 # Execute the following invoke: IF the operation is in the ‘running’ state AND 
runOps is same as we read on step 1, THEN remove any keys under this operation 
instance root
 # If the invoke fails, restart from step 1

h3. How an operation is started
 # Take runOps and compute new runOps (with the new operation added)
 # If the operation is not compatible with any of members of runOps, return an 
error to the caller
 # Make the following invoke: IF runOps matches the old value, THEN change it 
to the new value AND set new operation status to ‘running’
 # If the invoke fails, return an error to the caller

h1. API

The following interface defines a distributed operation:

{code}
public interface DistributedOperation<A> {

    String typeName();

    void startOrResumeJob(A arg, TablePartitionId partitionId, JobContext 
context);

    void cancelJobAndCleanup(A arg, TablePartitionId partitionId);

    void completeOperation(A arg, int tableId, boolean success);
}
 
public interface JobContext {
    <A> void markJobComplete(A arg);
}
{code}

The following is the user-facing interface that allows to start operations:

{code}
public interface DistributedOperationManager {

    /** Returns operation instanceId. */

     <A> int startOperation(String typeId, int tableId, A arg) throws 
OperationStartException;
}
{code}

There is an internal method that is called by the JobContext to mark a job 
complete:

{code}
<A> void markJobComplete(int tableId, int operationInstanceId, int partitionId, 
A arg);
{code}

An implementation of DistributedOperationManager registers all the operations 
with itself. In the first version, we can just use hardcoding here; later, if 
needed, we can switch to loading the corresponding modules using ServiceLoader.

  was:
We need a mechanism that would allow to do the following:
 # Execute an operation on all (or some of) partitions of a table
 # The whole operation is split into sub-operations (each of which operate on a 
single partition)
 # Each sub-operation must be resilient: that is, if the node that hosts it 
restarts or the partition moves to another node, the operation should proceed
 # When a sub-operation ends, it notifies the operation tracker/coordinator
 # When all sub-operations end, the tracker might take some action (like 
starting a subsequent operation)
 # The tracker is also resilient

We need such a mechanism in a few places in the system:
 # Transaction cleanup?
 # Index build
 # Table data validation as a part of a schema change that requires a 
validation (like a narrowing type change)

Probably, more applications of the mechanism will emerge.

 

On the possible implementation: the tracker could be collocated with table's 
primary replica (that would guarantee that at most one tracker exists at all 
times). We could store the data needed to track the operation in the 
Meta-Storage under a prefix corresponding to the table, like 
'ops.<tableId>.<opType>.<opKey>'. We could store the completion status for each 
of the partitions there along with some operation-wide status.

h1. Approved design

h2. Definitions

An *operation* is run on a table as a whole, it consists of {*}jobs{*}, one job 
per partition.

A job might complete successfully or fail (that is, it finishes unsuccessfully; 
unexpected failures are a different matter).

If all jobs of an operation complete successfully, the operation itself 
completes successfully. If any job fails, the operation fails.

Some jobs might track their own progress and can be resumed. Others cannot and 
always start over when asked to start/resume.
h2. Goals
 # The operation must be resilient to a reboot or a leave of any of 
participating node
 # Also, it must be resilient to ‘primary node’ status changes (i.e. if a 
primary of any of table’s partition changes, this should not stall the 
operation forever or fail it completely)

h3. Non-goals
 # It is not proposed that the fact that an operation is running prohibit 
changes to the cluster (like holding a node from a restart or from leaving the 
cluster)
 # It is considered ok that some work would have to be repeated (if, for 
example, a node ceases to be a primary while it executes an operation that can 
only be run on a primary; in such a situation the new primary might restart the 
job from scratch)

h2. General properties

An operation’s jobs are executed in parallel.

If a job that is still not finished is reassigned to another node (due to 
primary change or due to logical topology change), the old executor stops 
executing the job (and cleans up if the job wrote something to disk); the new 
executor adopts the job and starts it from scratch.

If a node restarts and finds that it has a non-finished resumable job to which 
it’s still assigned, it resumes the job.
h2. Operation attributes
 * Type ID (a string identifying operation type, like ‘validate’, ‘buildIndex’)
 * Whether more than one operation instances of the type may be run on a table 
(example: ‘build index’ might build a few indices at the same time) or not (in 
the first version, we might only support operations that support many operation 
instances on a table at the same time)
 * Job logic run when a job is started/resumed
 * Job logic run when a job is canceled
 * Operation logic run when an operation is completed (success or failure)

Some operations can only succeed normally (‘build index’ is an example), others 
can also finish unsuccessfully [that is, end up in the ‘failed’ state] 
(‘validate table data’ is an example).

Some operations persist their progress on the disk (‘build index’ does this), 
others are ‘volatile’ and always start over (‘validate table data’ might work 
in this way, but it could also persist the position in the ‘cursor’ over the 
data being validated).
h2. Provisioned operations
 # Build an index. Must be executed on a primary; a few operations might be run 
concurrently (each for its own index); cannot fail; persists its progress
 # Validate table data (for constraints where indices are not needed): checks 
whether every actual (i.e. not overwritten) tuple satisfies a restriction (NOT 
NULL/CHECK constraint/that it fits in a type range). May be executed on a 
secondary; a few operations might be run concurrently; can fail; might be 
volatile or persist its progress

h2. Roles

There are two roles:
 # Operation {*}coordinator{*}, there is at most one coordinator per operation 
instance (this is achieved by colocating a coordinator with the primary replica 
of partition 0 of the table); its responsibilities are:
 ## Assigning executors to operation’s jobs (should not be implemented in the 
first version where only operations which jobs must be run on primaries are 
supported)
 ## Tracking operation completion
 ## Executing the completion logic


 # {*}Executor{*}, (any node may act as an executor); its responsibilities are:
 ## Start/resume job execution
 ## Cancel job and cleanup afterwards if this is needed
 ## Notify the coordinator about a job outcome (success or failure)

h2. Communication

All communication between participants happens via MetaStorage.
h2. MetaStorage

For operations which only allow one operation per table at a time, the 
operation root path is {_}distOp.<table-id>.<op-type>.<instance-id>{_}; for 
those that allow multiple operations at a time it’s 
_distOp.<table-id>.<op-type>.<object-id>.<instance-id>_ (for example, for 
‘build index’ <object-id> is an ID of an index).

The keys are:
 * _distOp.<table-id>.runOps_ contains a set of type IDs of currently run 
operations, used to maintain compatibility
 * {_}distOp.<table-id>.<op-type>.nextInstanceId{_}: ID of the next operation 
instance (per table per operation type)
 * {_}<rootPath>.arg{_}: serialized operation argument (for instance, for 
‘validate data’ it would contain the activation ts of the validating schema 
version)
 * _<rootPath>.state_ (running/success/failed): the operation state
 * _<rootPath>.<partition-id>.state_ (<absent>/success/failed): a job state

h2. How it works
h3. At startup
 # Subscribe to MetaStorage events to track operation completion (to detect its 
failure)
 # If there are persisted jobs on the node, for each of them find the 
corresponding operation and job in the MetaStorage; if the operation does not 
exist anymore OR the job is completed (has status success/failed) OR the 
current node is not assigned to the job anymore, then the local job must be 
cleaned up; otherwise, it should be resumed.

h3. When becoming a primary of partition 0 of a table (including startup)

The node becomes an operation coordinator for the table.
 # Subscribe to changes of job statuses in the MetaStorage (to track operation 
completion)
 # For each operation of the table that is in the ‘running’ state, if jobs’ 
states indicate completion, complete the operation

h3. When becoming a primary of a partition of a table (including startup)

Start/resume the job that belongs to the partition if it is not finished
h3. When losing primary status for a partition

Cancel the job run by this node for the partition (if there is such a job)
h3. Coordinator: when seeing a change of the status of a job of a coordinated 
operation

If the completion criteria are reached, complete the operation.
h3. Coordinator: when losing status of a primary of partition 0 of a table
 # Unsubscribe from the corresponding events (see the previous section)

h3. When seeing an operation status key disappearing

If running any jobs of the operation, cancel them all locally.
h3. When a job finishes (successfully or fails)
 # If it failed, execute cancel/cleanup code
 # Mark the job final status (success/failed) in the MetaStorage using a 
conditional invoke: if the operation’s status is ‘running’, AND the job status 
is <absent>, THEN set the job status to the new value
 # If the invoke fails, ignore the failure: the job has already been completed

h3. How the coordinator completes an operation

Operation’s completion code may produce any side-effects, but they should be 
idempotent as the operation’s completion code might be executed more than once 
(to make sure the operation gets completed in any case).
 # Read the operation status and runOps and compute runOps without the operation
 # If the status is not ‘running’, exit the procedure: the operation has 
already been completed
 # Execute the operation’s completion code
 # Execute the following invoke: IF the operation is in the ‘running’ state AND 
runOps is same as we read on step 1, THEN remove any keys under this operation 
instance root
 # If the invoke fails, restart from step 1

h3. How an operation is started
 # Take runOps and compute new runOps (with the new operation added)
 # If the operation is not compatible with any of members of runOps, return an 
error to the caller
 # Make the following invoke: IF runOps matches the old value, THEN change it 
to the new value AND set new operation status to ‘running’
 # If the invoke fails, return an error to the caller

h1. API

The following interface defines a distributed operation:

{code}
public interface DistributedOperation<A> {

    String typeName();

    void startOrResumeJob(A arg, TablePartitionId partitionId, JobContext 
context);

    void cancelJobAndCleanup(A arg, TablePartitionId partitionId);

    void completeOperation(A arg, int tableId, boolean success);
}
 
public interface JobContext {
    <A> void markJobComplete(A arg);
}
{code}

The following is the user-facing interface that allows to start operations:

public interface DistributedOperationManager {

    /** Returns operation instanceId. */

     <A> int startOperation(String typeId, int tableId, A arg) throws 
OperationStartException;
}

There is an internal method that is called by the JobContext to mark a job 
complete:

<A> void markJobComplete(int tableId, int operationInstanceId, int partitionId, 
A arg);

An implementation of DistributedOperationManager registers all the operations 
with itself. In the first version, we can just use hardcoding here; later, if 
needed, we can switch to loading the corresponding modules using ServiceLoader.


> Design Resilient Distributed Operations mechanism
> -------------------------------------------------
>
>                 Key: IGNITE-19692
>                 URL: https://issues.apache.org/jira/browse/IGNITE-19692
>             Project: Ignite
>          Issue Type: Task
>            Reporter: Roman Puchkovskiy
>            Assignee: Roman Puchkovskiy
>            Priority: Major
>              Labels: ignite-3
>             Fix For: 3.0.0-beta2
>
>
> We need a mechanism that would allow to do the following:
>  # Execute an operation on all (or some of) partitions of a table
>  # The whole operation is split into sub-operations (each of which operate on 
> a single partition)
>  # Each sub-operation must be resilient: that is, if the node that hosts it 
> restarts or the partition moves to another node, the operation should proceed
>  # When a sub-operation ends, it notifies the operation tracker/coordinator
>  # When all sub-operations end, the tracker might take some action (like 
> starting a subsequent operation)
>  # The tracker is also resilient
> We need such a mechanism in a few places in the system:
>  # Transaction cleanup?
>  # Index build
>  # Table data validation as a part of a schema change that requires a 
> validation (like a narrowing type change)
> Probably, more applications of the mechanism will emerge.
>  
> On the possible implementation: the tracker could be collocated with table's 
> primary replica (that would guarantee that at most one tracker exists at all 
> times). We could store the data needed to track the operation in the 
> Meta-Storage under a prefix corresponding to the table, like 
> 'ops.<tableId>.<opType>.<opKey>'. We could store the completion status for 
> each of the partitions there along with some operation-wide status.
> h1. Approved design
> h2. Definitions
> An *operation* is run on a table as a whole, it consists of {*}jobs{*}, one 
> job per partition.
> A job might complete successfully or fail (that is, it finishes 
> unsuccessfully; unexpected failures are a different matter).
> If all jobs of an operation complete successfully, the operation itself 
> completes successfully. If any job fails, the operation fails.
> Some jobs might track their own progress and can be resumed. Others cannot 
> and always start over when asked to start/resume.
> h2. Goals
>  # The operation must be resilient to a reboot or a leave of any of 
> participating node
>  # Also, it must be resilient to ‘primary node’ status changes (i.e. if a 
> primary of any of table’s partition changes, this should not stall the 
> operation forever or fail it completely)
> h3. Non-goals
>  # It is not proposed that the fact that an operation is running prohibit 
> changes to the cluster (like holding a node from a restart or from leaving 
> the cluster)
>  # It is considered ok that some work would have to be repeated (if, for 
> example, a node ceases to be a primary while it executes an operation that 
> can only be run on a primary; in such a situation the new primary might 
> restart the job from scratch)
> h2. General properties
> An operation’s jobs are executed in parallel.
> If a job that is still not finished is reassigned to another node (due to 
> primary change or due to logical topology change), the old executor stops 
> executing the job (and cleans up if the job wrote something to disk); the new 
> executor adopts the job and starts it from scratch.
> If a node restarts and finds that it has a non-finished resumable job to 
> which it’s still assigned, it resumes the job.
> h2. Operation attributes
>  * Type ID (a string identifying operation type, like ‘validate’, 
> ‘buildIndex’)
>  * Whether more than one operation instances of the type may be run on a 
> table (example: ‘build index’ might build a few indices at the same time) or 
> not (in the first version, we might only support operations that support many 
> operation instances on a table at the same time)
>  * Job logic run when a job is started/resumed
>  * Job logic run when a job is canceled
>  * Operation logic run when an operation is completed (success or failure)
> Some operations can only succeed normally (‘build index’ is an example), 
> others can also finish unsuccessfully [that is, end up in the ‘failed’ state] 
> (‘validate table data’ is an example).
> Some operations persist their progress on the disk (‘build index’ does this), 
> others are ‘volatile’ and always start over (‘validate table data’ might work 
> in this way, but it could also persist the position in the ‘cursor’ over the 
> data being validated).
> h2. Provisioned operations
>  # Build an index. Must be executed on a primary; a few operations might be 
> run concurrently (each for its own index); cannot fail; persists its progress
>  # Validate table data (for constraints where indices are not needed): checks 
> whether every actual (i.e. not overwritten) tuple satisfies a restriction 
> (NOT NULL/CHECK constraint/that it fits in a type range). May be executed on 
> a secondary; a few operations might be run concurrently; can fail; might be 
> volatile or persist its progress
> h2. Roles
> There are two roles:
>  # Operation {*}coordinator{*}, there is at most one coordinator per 
> operation instance (this is achieved by colocating a coordinator with the 
> primary replica of partition 0 of the table); its responsibilities are:
>  ## Assigning executors to operation’s jobs (should not be implemented in the 
> first version where only operations which jobs must be run on primaries are 
> supported)
>  ## Tracking operation completion
>  ## Executing the completion logic
>  # {*}Executor{*}, (any node may act as an executor); its responsibilities 
> are:
>  ## Start/resume job execution
>  ## Cancel job and cleanup afterwards if this is needed
>  ## Notify the coordinator about a job outcome (success or failure)
> h2. Communication
> All communication between participants happens via MetaStorage.
> h2. MetaStorage
> For operations which only allow one operation per table at a time, the 
> operation root path is {_}distOp.<table-id>.<op-type>.<instance-id>{_}; for 
> those that allow multiple operations at a time it’s 
> _distOp.<table-id>.<op-type>.<object-id>.<instance-id>_ (for example, for 
> ‘build index’ <object-id> is an ID of an index).
> The keys are:
>  * _distOp.<table-id>.runOps_ contains a set of type IDs of currently run 
> operations, used to maintain compatibility
>  * {_}distOp.<table-id>.<op-type>.nextInstanceId{_}: ID of the next operation 
> instance (per table per operation type)
>  * {_}<rootPath>.arg{_}: serialized operation argument (for instance, for 
> ‘validate data’ it would contain the activation ts of the validating schema 
> version)
>  * _<rootPath>.state_ (running/success/failed): the operation state
>  * _<rootPath>.<partition-id>.state_ (<absent>/success/failed): a job state
> h2. How it works
> h3. At startup
>  # Subscribe to MetaStorage events to track operation completion (to detect 
> its failure)
>  # If there are persisted jobs on the node, for each of them find the 
> corresponding operation and job in the MetaStorage; if the operation does not 
> exist anymore OR the job is completed (has status success/failed) OR the 
> current node is not assigned to the job anymore, then the local job must be 
> cleaned up; otherwise, it should be resumed.
> h3. When becoming a primary of partition 0 of a table (including startup)
> The node becomes an operation coordinator for the table.
>  # Subscribe to changes of job statuses in the MetaStorage (to track 
> operation completion)
>  # For each operation of the table that is in the ‘running’ state, if jobs’ 
> states indicate completion, complete the operation
> h3. When becoming a primary of a partition of a table (including startup)
> Start/resume the job that belongs to the partition if it is not finished
> h3. When losing primary status for a partition
> Cancel the job run by this node for the partition (if there is such a job)
> h3. Coordinator: when seeing a change of the status of a job of a coordinated 
> operation
> If the completion criteria are reached, complete the operation.
> h3. Coordinator: when losing status of a primary of partition 0 of a table
>  # Unsubscribe from the corresponding events (see the previous section)
> h3. When seeing an operation status key disappearing
> If running any jobs of the operation, cancel them all locally.
> h3. When a job finishes (successfully or fails)
>  # If it failed, execute cancel/cleanup code
>  # Mark the job final status (success/failed) in the MetaStorage using a 
> conditional invoke: if the operation’s status is ‘running’, AND the job 
> status is <absent>, THEN set the job status to the new value
>  # If the invoke fails, ignore the failure: the job has already been completed
> h3. How the coordinator completes an operation
> Operation’s completion code may produce any side-effects, but they should be 
> idempotent as the operation’s completion code might be executed more than 
> once (to make sure the operation gets completed in any case).
>  # Read the operation status and runOps and compute runOps without the 
> operation
>  # If the status is not ‘running’, exit the procedure: the operation has 
> already been completed
>  # Execute the operation’s completion code
>  # Execute the following invoke: IF the operation is in the ‘running’ state 
> AND runOps is same as we read on step 1, THEN remove any keys under this 
> operation instance root
>  # If the invoke fails, restart from step 1
> h3. How an operation is started
>  # Take runOps and compute new runOps (with the new operation added)
>  # If the operation is not compatible with any of members of runOps, return 
> an error to the caller
>  # Make the following invoke: IF runOps matches the old value, THEN change it 
> to the new value AND set new operation status to ‘running’
>  # If the invoke fails, return an error to the caller
> h1. API
> The following interface defines a distributed operation:
> {code}
> public interface DistributedOperation<A> {
>     String typeName();
>     void startOrResumeJob(A arg, TablePartitionId partitionId, JobContext 
> context);
>     void cancelJobAndCleanup(A arg, TablePartitionId partitionId);
>     void completeOperation(A arg, int tableId, boolean success);
> }
>  
> public interface JobContext {
>     <A> void markJobComplete(A arg);
> }
> {code}
> The following is the user-facing interface that allows to start operations:
> {code}
> public interface DistributedOperationManager {
>     /** Returns operation instanceId. */
>      <A> int startOperation(String typeId, int tableId, A arg) throws 
> OperationStartException;
> }
> {code}
> There is an internal method that is called by the JobContext to mark a job 
> complete:
> {code}
> <A> void markJobComplete(int tableId, int operationInstanceId, int 
> partitionId, A arg);
> {code}
> An implementation of DistributedOperationManager registers all the operations 
> with itself. In the first version, we can just use hardcoding here; later, if 
> needed, we can switch to loading the corresponding modules using 
> ServiceLoader.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to