Hisoka-X commented on code in PR #9412:
URL: https://github.com/apache/seatunnel/pull/9412#discussion_r2135161152


##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java:
##########
@@ -0,0 +1,1242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.debezium.connector.oracle.logminer.processor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.oracle.OracleConnection;
+import 
io.debezium.connector.oracle.OracleConnection.NonRelationalTableException;
+import io.debezium.connector.oracle.OracleConnectorConfig;
+import io.debezium.connector.oracle.OracleDatabaseSchema;
+import io.debezium.connector.oracle.OracleOffsetContext;
+import io.debezium.connector.oracle.OraclePartition;
+import io.debezium.connector.oracle.OracleSchemaChangeEventEmitter;
+import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
+import io.debezium.connector.oracle.Scn;
+import io.debezium.connector.oracle.logminer.LogMinerChangeRecordEmitter;
+import io.debezium.connector.oracle.logminer.events.DmlEvent;
+import io.debezium.connector.oracle.logminer.events.EventType;
+import io.debezium.connector.oracle.logminer.events.LobEraseEvent;
+import io.debezium.connector.oracle.logminer.events.LobWriteEvent;
+import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
+import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
+import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent;
+import io.debezium.connector.oracle.logminer.events.TruncateEvent;
+import io.debezium.connector.oracle.logminer.parser.DmlParserException;
+import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntry;
+import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntryImpl;
+import io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser;
+import io.debezium.connector.oracle.logminer.parser.SelectLobParser;
+import io.debezium.data.Envelope;
+import io.debezium.pipeline.EventDispatcher;
+import 
io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.util.Clock;
+import io.debezium.util.Strings;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An abstract implementation of {@link LogMinerEventProcessor} that all 
processors should extend.
+ *
+ * @author Chris Cranford
+ */
+public abstract class AbstractLogMinerEventProcessor<T extends 
AbstractTransaction>
+        implements LogMinerEventProcessor {
+
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(AbstractLogMinerEventProcessor.class);
+    private static final String NO_SEQUENCE_TRX_ID_SUFFIX = "ffffffff";
+
+    private final ChangeEventSourceContext context;
+    private final OracleConnectorConfig connectorConfig;
+    private final OracleDatabaseSchema schema;
+    private final OraclePartition partition;
+    private final OracleOffsetContext offsetContext;
+    private final EventDispatcher<OraclePartition, TableId> dispatcher;
+    private final OracleStreamingChangeEventSourceMetrics metrics;
+    private final LogMinerDmlParser dmlParser;
+    private final SelectLobParser selectLobParser;
+
+    protected final Counters counters;
+
+    private Scn currentOffsetScn = Scn.NULL;
+    private Map<Integer, Scn> currentOffsetCommitScns = new HashMap<>();
+    private Scn lastProcessedScn = Scn.NULL;
+    private boolean sequenceUnavailable = false;
+
+    public AbstractLogMinerEventProcessor(
+            ChangeEventSourceContext context,
+            OracleConnectorConfig connectorConfig,
+            OracleDatabaseSchema schema,
+            OraclePartition partition,
+            OracleOffsetContext offsetContext,
+            EventDispatcher<OraclePartition, TableId> dispatcher,
+            OracleStreamingChangeEventSourceMetrics metrics) {
+        this.context = context;
+        this.connectorConfig = connectorConfig;
+        this.schema = schema;
+        this.partition = partition;
+        this.offsetContext = offsetContext;
+        this.dispatcher = dispatcher;
+        this.metrics = metrics;
+        this.counters = new Counters();
+        this.dmlParser = new LogMinerDmlParser();
+        this.selectLobParser = new SelectLobParser();
+    }
+
+    protected OracleConnectorConfig getConfig() {
+        return connectorConfig;
+    }
+
+    protected OracleDatabaseSchema getSchema() {
+        return schema;
+    }
+
+    /**
+     * Check whether a transaction has been recently processed through either 
a commit or rollback.
+     *
+     * @param transactionId the unique transaction id
+     * @return true if the transaction has been recently processed, false 
otherwise
+     */
+    protected boolean isRecentlyProcessed(String transactionId) {
+        return false;
+    }
+
+    /**
+     * Checks whether the LogMinerEvent row for a schema change can be emitted.
+     *
+     * @param row the result set row
+     * @return true if the schema change has been seen, false otherwise.
+     */
+    protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) {
+        return false;
+    }
+
+    /**
+     * Return the last processed system change number handled by the processor.
+     *
+     * @return the last processed system change number, never {@code null}.
+     */
+    protected Scn getLastProcessedScn() {
+        return lastProcessedScn;
+    }
+
+    /**
+     * Returns the {@code TransactionCache} implementation.
+     *
+     * @return the transaction cache, never {@code null}
+     */
+    protected abstract Map<String, T> getTransactionCache();
+
+    /**
+     * Creates a new transaction based on the supplied {@code START} event.
+     *
+     * @param row the event row, must not be {@code null}
+     * @return the implementation-specific {@link Transaction} instance
+     */
+    protected abstract T createTransaction(LogMinerEventRow row);
+
+    /**
+     * Removes a specific transaction event by database row identifier.
+     *
+     * @param row the event row that contains the row identifier, must not be 
{@code null}
+     */
+    protected abstract void removeEventWithRowId(LogMinerEventRow row);
+
+    /**
+     * Returns the number of events associated with the specified transaction.
+     *
+     * @param transaction the transaction, must not be {@code null}
+     * @return the number of events in the transaction
+     */
+    protected abstract int getTransactionEventCount(T transaction);
+
+    // todo: can this be removed in favor of a single implementation?
+    protected boolean isTrxIdRawValue() {
+        return true;
+    }
+
+    @Override
+    public Scn process(OraclePartition partition, Scn startScn, Scn endScn)
+            throws SQLException, InterruptedException {
+        counters.reset();
+
+        try (PreparedStatement statement = createQueryStatement()) {
+            LOGGER.debug("Fetching results for SCN [{}, {}]", startScn, 
endScn);
+            statement.setFetchSize(getConfig().getLogMiningViewFetchSize());
+            statement.setFetchDirection(ResultSet.FETCH_FORWARD);
+            statement.setString(1, startScn.toString());
+            statement.setString(2, endScn.toString());
+
+            Instant queryStart = Instant.now();
+            try (ResultSet resultSet = statement.executeQuery()) {
+                metrics.setLastDurationOfBatchCapturing(
+                        Duration.between(queryStart, Instant.now()));
+
+                Instant startProcessTime = Instant.now();
+                processResults(this.partition, resultSet);
+
+                Duration totalTime = Duration.between(startProcessTime, 
Instant.now());
+                metrics.setLastCapturedDmlCount(counters.dmlCount);
+
+                if (counters.dmlCount > 0
+                        || counters.commitCount > 0
+                        || counters.rollbackCount > 0) {
+                    warnPotentiallyStuckScn(currentOffsetScn, 
currentOffsetCommitScns);
+
+                    currentOffsetScn = offsetContext.getScn();
+                    if (offsetContext.getCommitScn() != null) {
+                        currentOffsetCommitScns =
+                                
offsetContext.getCommitScn().getCommitScnForAllRedoThreads();
+                    }
+                }
+
+                LOGGER.debug("{}.", counters);
+                LOGGER.debug(
+                        "Processed in {} ms. Lag: {}. Offset SCN: {}, Offset 
Commit SCN: {}, Active Transactions: {}, Sleep: {}",
+                        totalTime.toMillis(),
+                        metrics.getLagFromSourceInMilliseconds(),
+                        offsetContext.getScn(),
+                        offsetContext.getCommitScn(),
+                        metrics.getNumberOfActiveTransactions(),
+                        metrics.getMillisecondToSleepBetweenMiningQuery());
+
+                metrics.addProcessedRows(counters.rows);
+                return calculateNewStartScn(
+                        endScn, 
offsetContext.getCommitScn().getMaxCommittedScn());
+            }
+        }
+    }
+
+    /**
+     * Create the JDBC query that will be used to fetch the mining result set.
+     *
+     * @return a prepared query statement, never {@code null}
+     * @throws SQLException if a database exception occurred creating the 
statement
+     */
+    protected abstract PreparedStatement createQueryStatement() throws 
SQLException;
+
+    /**
+     * Calculates the new starting system change number based on the current 
processing range.
+     *
+     * @param endScn the end system change number for the previously mined 
range, never {@code null}
+     * @param maxCommittedScn the maximum committed system change number, 
never {@code null}
+     * @return the system change number to start then next mining iteration 
from, never {@code null}
+     * @throws InterruptedException if the current thread is interrupted
+     */
+    protected abstract Scn calculateNewStartScn(Scn endScn, Scn 
maxCommittedScn)
+            throws InterruptedException;
+
+    /**
+     * Processes the LogMiner results.
+     *
+     * @param resultSet the result set from a LogMiner query
+     * @throws SQLException if a database exception occurred
+     * @throws InterruptedException if the dispatcher was interrupted sending 
an event
+     */
+    protected void processResults(OraclePartition partition, ResultSet 
resultSet)
+            throws SQLException, InterruptedException {
+        while (context.isRunning() && hasNextWithMetricsUpdate(resultSet)) {
+            counters.rows++;
+            processRow(
+                    partition,
+                    LogMinerEventRow.fromResultSet(
+                            resultSet, getConfig().getCatalogName(), 
isTrxIdRawValue()));
+        }
+    }
+
+    /**
+     * Processes a single LogMinerEventRow.
+     *
+     * @param row the event row, must not be {@code null}
+     * @throws SQLException if a database exception occurred
+     * @throws InterruptedException if the dispatcher was interrupted sending 
an event
+     */
+    protected void processRow(OraclePartition partition, LogMinerEventRow row)
+            throws SQLException, InterruptedException {
+        if (!row.getEventType().equals(EventType.MISSING_SCN)) {
+            lastProcessedScn = row.getScn();
+        }
+        // filter out all events that are captured as part of the initial 
snapshot
+        if (row.getScn().compareTo(offsetContext.getSnapshotScn()) < 0) {
+            Map<String, Scn> snapshotPendingTransactions =
+                    offsetContext.getSnapshotPendingTransactions();
+            if (snapshotPendingTransactions == null
+                    || 
!snapshotPendingTransactions.containsKey(row.getTransactionId())) {
+                LOGGER.debug(
+                        "Skipping event {} (SCN {}) because it is already 
encompassed by the initial snapshot",
+                        row.getEventType(),
+                        row.getScn());
+                return;
+            }
+        }
+        switch (row.getEventType()) {
+            case MISSING_SCN:
+                handleMissingScn(row);
+            case START:
+                handleStart(row);
+                break;
+            case COMMIT:
+                handleCommit(partition, row);
+                break;
+            case ROLLBACK:
+                handleRollback(row);
+                break;
+            case DDL:
+                handleSchemaChange(row);
+                break;
+            case SELECT_LOB_LOCATOR:
+                handleSelectLobLocator(row);
+                break;
+            case LOB_WRITE:
+                handleLobWrite(row);
+                break;
+            case LOB_ERASE:
+                handleLobErase(row);
+                break;
+            case INSERT:
+            case UPDATE:
+            case DELETE:
+                handleDataEvent(row);
+                break;
+            case UNSUPPORTED:
+                handleUnsupportedEvent(row);
+                break;
+        }
+    }
+
+    /**
+     * Handle processing a LogMinerEventRow for a {@code MISSING_SCN} event.
+     *
+     * @param row the result set row
+     */
+    protected void handleMissingScn(LogMinerEventRow row) {
+        LOGGER.warn("Missing SCN detected. {}", row);
+    }
+
+    /**
+     * Handle processing a LogMinerEventRow for a {@code START} event.
+     *
+     * @param row the result set row
+     */
+    protected void handleStart(LogMinerEventRow row) {
+        final String transactionId = row.getTransactionId();
+        final AbstractTransaction transaction = 
getTransactionCache().get(transactionId);
+        if (transaction == null && !isRecentlyProcessed(transactionId)) {
+            getTransactionCache().put(transactionId, createTransaction(row));
+            metrics.setActiveTransactions(getTransactionCache().size());
+        } else if (transaction != null && !isRecentlyProcessed(transactionId)) 
{
+            LOGGER.trace(
+                    "Transaction {} is not yet committed and START event 
detected.", transactionId);
+            transaction.start();
+        }
+    }
+
+    /**
+     * Handle processing a LogMinerEventRow for a {@code COMMIT} event.
+     *
+     * @param row the result set row
+     * @throws InterruptedException if the event dispatcher was interrupted 
sending events
+     */
+    protected void handleCommit(OraclePartition partition, LogMinerEventRow 
row)
+            throws InterruptedException {
+        final String transactionId = row.getTransactionId();
+        if (isRecentlyProcessed(transactionId)) {
+            LOGGER.debug("\tTransaction is already committed, skipped.");
+            return;
+        }
+
+        final T transaction = getAndRemoveTransactionFromCache(transactionId);
+        if (transaction == null) {
+            LOGGER.trace("Transaction {} not found, commit skipped.", 
transactionId);
+            return;
+        }
+
+        // Calculate the smallest SCN that remains in the transaction cache
+        final Scn smallestScn = getTransactionCacheMinimumScn();
+        metrics.setOldestScn(smallestScn.isNull() ? Scn.valueOf(-1) : 
smallestScn);
+
+        final Scn commitScn = row.getScn();
+        if (offsetContext.getCommitScn().hasCommitAlreadyBeenHandled(row)) {
+            final Scn lastCommittedScn =
+                    
offsetContext.getCommitScn().getCommitScnForRedoThread(row.getThread());
+            LOGGER.debug(
+                    "Transaction {} has already been processed. "
+                            + "Offset Commit SCN {}, Transaction Commit SCN 
{}, Last Seen Commit SCN {}.",
+                    transactionId,
+                    offsetContext.getCommitScn(),
+                    commitScn,
+                    lastCommittedScn);
+            removeTransactionAndEventsFromCache(transaction);
+            metrics.setActiveTransactions(getTransactionCache().size());
+            return;
+        }
+
+        counters.commitCount++;
+
+        int numEvents = getTransactionEventCount(transaction);
+        LOGGER.trace("Commit (smallest SCN {}) {}", smallestScn, row);
+        LOGGER.trace("Transaction {} has {} events", transactionId, numEvents);
+
+        final ZoneOffset databaseOffset = metrics.getDatabaseOffset();
+
+        final boolean skipExcludedUserName = 
isTransactionUserExcluded(transaction);
+        TransactionCommitConsumer.Handler<LogMinerEvent> delegate =
+                new TransactionCommitConsumer.Handler<LogMinerEvent>() {
+
+                    @Override
+                    public void accept(LogMinerEvent event, long 
eventsProcessed)
+                            throws InterruptedException {
+                        // Update SCN in offset context only if processed SCN 
less than SCN of other
+                        // transactions
+                        if (smallestScn.isNull() || 
commitScn.compareTo(smallestScn) < 0) {
+                            offsetContext.setScn(event.getScn());
+                            metrics.setOldestScn(event.getScn());
+                        }
+
+                        offsetContext.setEventScn(event.getScn());
+                        offsetContext.setTransactionId(transactionId);
+                        offsetContext.setSourceTime(
+                                event.getChangeTime()
+                                        
.minusSeconds(databaseOffset.getTotalSeconds()));
+                        offsetContext.setTableId(event.getTableId());
+                        offsetContext.setRedoThread(row.getThread());
+
+                        final DmlEvent dmlEvent = (DmlEvent) event;
+                        if (!skipExcludedUserName) {
+                            LogMinerChangeRecordEmitter 
logMinerChangeRecordEmitter;
+                            if (dmlEvent instanceof TruncateEvent) {
+                                // a truncate event is seen by logminer as a 
DDL event type.
+                                // So force this here to be a Truncate 
Operation.
+                                logMinerChangeRecordEmitter =
+                                        new LogMinerChangeRecordEmitter(
+                                                connectorConfig,
+                                                partition,
+                                                offsetContext,
+                                                Envelope.Operation.TRUNCATE,
+                                                
dmlEvent.getDmlEntry().getOldValues(),
+                                                
dmlEvent.getDmlEntry().getNewValues(),
+                                                
getSchema().tableFor(event.getTableId()),
+                                                getSchema(),
+                                                Clock.system());
+                            } else {
+                                logMinerChangeRecordEmitter =
+                                        new LogMinerChangeRecordEmitter(
+                                                connectorConfig,
+                                                partition,
+                                                offsetContext,
+                                                dmlEvent.getEventType(),
+                                                
dmlEvent.getDmlEntry().getOldValues(),
+                                                
dmlEvent.getDmlEntry().getNewValues(),
+                                                
getSchema().tableFor(event.getTableId()),
+                                                getSchema(),
+                                                Clock.system());
+                            }
+                            dispatcher.dispatchDataChangeEvent(
+                                    partition, event.getTableId(), 
logMinerChangeRecordEmitter);
+                        }
+                    }
+                };
+
+        // When a COMMIT is received, regardless of the number of events it 
has, it still
+        // must be recorded in the commit scn for the node to guarantee 
updates to the
+        // offsets. This must be done prior to dispatching the 
transaction-commit or the
+        // heartbeat event that follows commit dispatch.
+        offsetContext.getCommitScn().recordCommit(row);

Review Comment:
   record the commit scn even no events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to