nzw921rx commented on issue #10606:
URL: https://github.com/apache/seatunnel/issues/10606#issuecomment-4072191368

   Thanks for the great questions! Here's my proposed design for each:
   
   ### 1. Version field behavior
   
   The version field should be **included in both SET and WHERE**, but **NOT 
auto-incremented**.
   
   In CDC scenarios, the version value comes from the **source database**. The 
source is authoritative. So the generated SQL should look like:
   
   ```sql
   -- Current behavior (without optimistic locking)
   UPDATE table SET col1 = :col1, version = :version WHERE pk = :pk
   
   -- With optimistic.locking.field = "version"
   UPDATE table SET col1 = :col1, version = :version WHERE pk = :pk AND version 
< :version
   ```
   
   The key difference is adding `AND version < :version` to the WHERE clause. 
This ensures that only **newer** data (with a higher version) can overwrite 
existing rows.
   
   If the field is a timestamp type (e.g., `update_time`), the same logic 
applies — only update when the incoming timestamp is later than the existing 
one.
   
   No auto-increment is needed because:
   - In CDC mode, the source row already carries the correct version value
   - Auto-increment would conflict with the source-of-truth principle
   
   ### 2. Conflict resolution (0 rows affected)
   
   When UPDATE affects **0 rows**, the behavior should be **log warning + 
skip** by default.
   
   Reasoning:
   - 0 rows affected means the target already has a **newer version** than the 
incoming record
   - This is **expected behavior** in multi-parallelism CDC, not an error
   - Retrying would be pointless (the same stale data would still fail)
   - Throwing an exception would kill the job unnecessarily
   
   Additionally, I suggest adding an optional config to control this:
   
   | configKey | Type | Default | Description |
   | --- | --- | --- | --- |
   | `optimistic.locking.on-conflict` | Enum | `LOG_AND_SKIP` | Behavior when 
optimistic lock check fails. Options: `LOG_AND_SKIP`, `THROW_EXCEPTION` |
   
   Default `LOG_AND_SKIP` covers the common case. `THROW_EXCEPTION` is 
available for users who want strict consistency and prefer to fail fast.
   
   ### 3. Configuration constraints
   
   **Yes, it should require `enable_upsert = true`** (as the original proposal 
stated).
   
   Reasoning:
   - Optimistic locking only makes sense for UPDATE operations
   - In INSERT-only mode, there's no existing row to compare versions against
   - The version check is part of the UPDATE WHERE clause
   
   When `optimistic.locking.field` is set, the implementation should **bypass 
native upsert** (e.g., MySQL's `INSERT ON DUPLICATE KEY UPDATE`) and use the 
**query-based insert-or-update path** (`InsertOrUpdateByQueryExecutor`). This 
is because:
   
   - Native upsert SQL (e.g., `ON DUPLICATE KEY UPDATE`) has no WHERE clause on 
the update part — we can't add version comparison
   - The query-based path (`getUpdateStatement`) already has a WHERE clause 
that we can extend
   
   ### Implementation Summary
   
   **Changes to `JdbcDialect.getUpdateStatement()`:**
   
   ```java
   default String getUpdateStatement(
           String database, String tableName, String[] fieldNames,
           String[] conditionFields, boolean isPrimaryKeyUpdated,
           String optimisticLockField) {  // new parameter
   
       // ... existing SET/WHERE logic ...
       
       String conditionClause = Arrays.stream(conditionFields)
               .map(f -> format("%s = :%s", quoteIdentifier(f), f))
               .collect(Collectors.joining(" AND "));
   
       // Append optimistic lock condition
       if (optimisticLockField != null) {
           conditionClause += format(" AND %s < :%s",
                   quoteIdentifier(optimisticLockField), optimisticLockField);
       }
       
       return format("UPDATE %s SET %s WHERE %s",
               tableIdentifier(database, tableName), setClause, 
conditionClause);
   }
   ```
   
   **Changes to `InsertOrUpdateBatchStatementExecutor`:**
   
   Add affected-row check after `executeUpdate()`. If 0 rows affected and 
optimistic locking is enabled, handle according to `on-conflict` policy.
   
   **Files to modify:**
   1. `JdbcSinkOptions` — add `OPTIMISTIC_LOCKING_FIELD` and 
`OPTIMISTIC_LOCKING_ON_CONFLICT`
   2. `JdbcSinkConfig` — read new options
   3. `JdbcDialect.getUpdateStatement()` — accept and apply optimistic lock 
field
   4. `JdbcOutputFormatBuilder` — pass optimistic lock config to executor, 
force query-based path when enabled
   5. `InsertOrUpdateBatchStatementExecutor` — handle 0-row-affected scenario
   6. Docs (en & zh)
   7. Tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to