This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 66922d06bd0 [fix](streaming-job) fix postgres cdc multi-table
publication data loss and binlog duplicate key (#64075)
66922d06bd0 is described below
commit 66922d06bd02e37edaa3add3908cb2b8ca08075c
Author: wudi <[email protected]>
AuthorDate: Fri Jun 5 14:08:19 2026 +0800
[fix](streaming-job) fix postgres cdc multi-table publication data loss and
binlog duplicate key (#64075)
## What problem does this PR solve?
Several fixes and hardening for from-to (at-least-once) PostgreSQL /
MySQL CDC streaming jobs.
### 1. Multi-table snapshot data loss (PostgreSQL)
For a job with **multiple tables** and `offset=initial`, each
snapshot/backfill split rewrote the shared per-job publication to only
its own table (`ALTER PUBLICATION ... SET TABLE <one table>`), so during
snapshot the publication kept flipping between single tables. Logical
decoding evaluates publication membership at the WAL position of each
change, so a row written to a table while the publication temporarily
excluded it is filtered out and **permanently lost**, violating
at-least-once. Single-table jobs and user-provided publications are not
affected.
Fix: Doris creates and owns the publication as the full `include_tables`
set up-front in `PostgresSourceReader.initialize`, and publication
autocreate is always DISABLED so it is never rewritten per split.
### 2. Binlog "Duplicate key" when a same-named table exists in another
schema
`doReadTableColumn` filtered the columns returned by `getColumns` only
by TABLE_NAME. Because the schema argument of `getColumns` is a LIKE
pattern, a schema whose name matches via wildcard (e.g. `_`) and
contains a same-named table leaks its columns, which then collide on
column name and throw `IllegalStateException: Duplicate key` on the
binlog reader. Fix: also compare SCHEMA_NAME.
### 3. cdc_client stream-load writer thread leak
`DorisBatchStreamLoad.close()` previously only `shutdown()` the
consumer, leaving any producer parked on a full `flushQueue`. Every job
close leaked one writer thread, eventually exhausting the pool
(`RejectedExecutionException`). `close()` now flags exit +
`loadThreadAlive=false`, `clear()`s the queue to release the parked
`put()`, and `shutdownNow()` to interrupt a consumer stuck in a
stream-load HTTP call.
### Stabilize flaky cases & new regression
- `test_streaming_insert_job_fetch_meta_error`: assert PAUSED status and
error message inside the poll (the job is auto-resumable and
oscillates), instead of a separate read afterwards.
- `test_streaming_postgres_job_special_offset`: use a UNIQUE-key table,
wait for the full snapshot before asserting, and explicitly assert rows
before the ALTER LSN are skipped.
- New
`test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table`:
multi-table snapshot with concurrent DML on every table, asserting no
row of either table is lost .
---
.../postgresql/connection/PostgresConnection.java | 9 +-
.../connection/PostgresReplicationConnection.java | 932 ---------------------
.../cdcclient/service/PipelineCoordinator.java | 9 +-
.../doris/cdcclient/sink/DorisBatchStreamLoad.java | 7 +-
.../source/reader/mysql/MySqlSourceReader.java | 7 +-
.../reader/postgres/PostgresSourceReader.java | 93 +-
.../src/main/resources/application.properties | 4 +-
...ob_snapshot_with_concurrent_dml_multi_table.out | 43 +
...snapshot_with_concurrent_dml_multi_table.groovy | 163 ++++
...st_streaming_postgres_job_special_offset.groovy | 11 +-
...st_streaming_insert_job_fetch_meta_error.groovy | 9 +-
11 files changed, 327 insertions(+), 960 deletions(-)
diff --git
a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
index 2f6ca5756dd..dbaf60f2a04 100644
---
a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
+++
b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
@@ -60,6 +60,7 @@ import java.util.regex.Pattern;
* Copied from Flink Cdc 3.6.0
*
* <p>Line 820~854: modified getColumnValue method to fix FLINK-39748.
+ * <p>Line 699-703, 705-706: doReadTableColumn also matches SCHEMA_NAME
(besides TABLE_NAME, FLINK-38965) to avoid Duplicate key from a decoy schema's
same-named table.
*/
public class PostgresConnection extends JdbcConnection {
@@ -695,8 +696,14 @@ public class PostgresConnection extends JdbcConnection {
// - When querying 'user_sink', the pattern may also match 'userbsink'
(due to '_')
// - When querying 'user%data' (where % is literal), it may match
'user_test_data' (due to
// '%')
+ // The schema name passed to getColumns is also a LIKE pattern, so a
decoy schema
+ // (e.g. 'cdcXtest' matched by 'cdc_test' via '_') can return a
same-named table whose
+ // TABLE_NAME still equals tableId.table(); compare the schema too,
otherwise those
+ // columns merge with the real table's and collide on column name
(Duplicate key).
+ final String resultSchemaName = columnMetadata.getString(2);
final String resultTableName = columnMetadata.getString(3);
- if (!tableId.table().equals(resultTableName)) {
+ if (!tableId.table().equals(resultTableName)
+ || (tableId.schema() != null &&
!tableId.schema().equals(resultSchemaName))) {
return Optional.empty();
}
diff --git
a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
deleted file mode 100644
index eeee1e725ef..00000000000
---
a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java
+++ /dev/null
@@ -1,932 +0,0 @@
-/*
- * Copyright Debezium Authors.
- *
- * Licensed under the Apache Software License version 2.0, available at
http://www.apache.org/licenses/LICENSE-2.0
- */
-
-package io.debezium.connector.postgresql.connection;
-
-import io.debezium.DebeziumException;
-import io.debezium.connector.postgresql.PostgresConnectorConfig;
-import io.debezium.connector.postgresql.PostgresSchema;
-import io.debezium.connector.postgresql.TypeRegistry;
-import io.debezium.connector.postgresql.spi.SlotCreationResult;
-import io.debezium.jdbc.JdbcConfiguration;
-import io.debezium.jdbc.JdbcConnection;
-import io.debezium.jdbc.JdbcConnectionException;
-import io.debezium.relational.RelationalTableFilters;
-import io.debezium.relational.TableId;
-import io.debezium.util.Clock;
-import io.debezium.util.Metronome;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.postgresql.core.BaseConnection;
-import org.postgresql.core.ServerVersion;
-import org.postgresql.replication.PGReplicationStream;
-import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
-import org.postgresql.util.PSQLException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.SQLWarning;
-import java.sql.Statement;
-import java.time.Duration;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static java.lang.Math.toIntExact;
-
-/**
- * Copied from Flink Cdc 3.5.0
- *
- * <p>Line 248~251, 258: add publish_via_partition_root for partition table
(FILTERED mode).
- */
-public class PostgresReplicationConnection extends JdbcConnection implements
ReplicationConnection {
-
- private static Logger LOGGER =
LoggerFactory.getLogger(PostgresReplicationConnection.class);
-
- private final String slotName;
- private final String publicationName;
- private final RelationalTableFilters tableFilter;
- private final PostgresConnectorConfig.AutoCreateMode
publicationAutocreateMode;
- private final PostgresConnectorConfig.LogicalDecoder plugin;
- private final boolean dropSlotOnClose;
- private final PostgresConnectorConfig connectorConfig;
- private final Duration statusUpdateInterval;
- private final MessageDecoder messageDecoder;
- private final PostgresConnection jdbcConnection;
- private final TypeRegistry typeRegistry;
- private final Properties streamParams;
-
- private Lsn defaultStartingPos;
- private SlotCreationResult slotCreationInfo;
- private boolean hasInitedSlot;
-
- private Lsn endingPos;
-
- /**
- * Creates a new replication connection with the given params.
- *
- * @param config the JDBC configuration for the connection; may not be null
- * @param slotName the name of the DB slot for logical replication; may
not be null
- * @param publicationName the name of the DB publication for logical
replication; may not be
- * null
- * @param tableFilter the tables to watch of the DB publication for
logical replication; may not
- * be null
- * @param publicationAutocreateMode the mode for publication autocreation;
may not be null
- * @param plugin decoder matching the server side plug-in used for
streaming changes; may not be
- * null
- * @param dropSlotOnClose whether the replication slot should be dropped
once the connection is
- * closed
- * @param statusUpdateInterval the interval at which the replication
connection should
- * periodically send status
- * @param doSnapshot whether the connector is doing snapshot
- * @param jdbcConnection general PostgreSQL JDBC connection
- * @param typeRegistry registry with PostgreSQL types
- * @param streamParams additional parameters to pass to the replication
stream
- * @param schema the schema; must not be null
- * <p>updates to the server
- */
- private PostgresReplicationConnection(
- PostgresConnectorConfig config,
- String slotName,
- String publicationName,
- RelationalTableFilters tableFilter,
- PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode,
- PostgresConnectorConfig.LogicalDecoder plugin,
- boolean dropSlotOnClose,
- boolean doSnapshot,
- Duration statusUpdateInterval,
- PostgresConnection jdbcConnection,
- TypeRegistry typeRegistry,
- Properties streamParams,
- PostgresSchema schema) {
- super(
- addDefaultSettings(config.getJdbcConfig()),
- PostgresConnection.FACTORY,
- null,
- null,
- "\"",
- "\"");
-
- this.connectorConfig = config;
- this.slotName = slotName;
- this.publicationName = publicationName;
- this.tableFilter = tableFilter;
- this.publicationAutocreateMode = publicationAutocreateMode;
- this.plugin = plugin;
- this.dropSlotOnClose = dropSlotOnClose;
- this.statusUpdateInterval = statusUpdateInterval;
- this.messageDecoder =
- plugin.messageDecoder(new MessageDecoderContext(config,
schema), jdbcConnection);
- this.jdbcConnection = jdbcConnection;
- this.typeRegistry = typeRegistry;
- this.streamParams = streamParams;
- this.slotCreationInfo = null;
- this.hasInitedSlot = false;
- }
-
- private static JdbcConfiguration addDefaultSettings(JdbcConfiguration
configuration) {
- // first copy the parent's default settings...
- // then set some additional replication specific settings
- return JdbcConfiguration.adapt(
- PostgresConnection.addDefaultSettings(
- configuration,
PostgresConnection.CONNECTION_STREAMING)
- .edit()
- .with("replication", "database")
- .with(
- "preferQueryMode",
- "simple") // replication protocol only
supports simple query mode
- .build());
- }
-
- private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException,
InterruptedException {
- try (PostgresConnection connection =
- new PostgresConnection(
- connectorConfig.getJdbcConfig(),
PostgresConnection.CONNECTION_SLOT_INFO)) {
- return connection.readReplicationSlotInfo(slotName,
plugin.getPostgresPluginName());
- }
- }
-
- protected void initPublication() {
- String createPublicationStmt;
- String tableFilterString = null;
- if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) {
- LOGGER.info("Initializing PgOutput logical decoder publication");
- try {
- // Unless the autocommit is disabled the SELECT publication
query will stay running
- Connection conn = pgConnection();
- conn.setAutoCommit(false);
-
- String selectPublication =
- String.format(
- "SELECT COUNT(1) FROM pg_publication WHERE
pubname = '%s'",
- publicationName);
- try (Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery(selectPublication)) {
- if (rs.next()) {
- Long count = rs.getLong(1);
- // Close eagerly as the transaction might stay running
- if (count == 0L) {
- LOGGER.info(
- "Creating new publication '{}' for plugin
'{}'",
- publicationName,
- plugin);
- switch (publicationAutocreateMode) {
- case DISABLED:
- throw new ConnectException(
- "Publication autocreation is
disabled, please create one and restart the connector.");
- case ALL_TABLES:
- createPublicationStmt =
- String.format(
- "CREATE PUBLICATION %s FOR
ALL TABLES;",
- publicationName);
- LOGGER.info(
- "Creating Publication with
statement '{}'",
- createPublicationStmt);
- // Publication doesn't exist, create it.
- stmt.execute(createPublicationStmt);
- break;
- case FILTERED:
- createOrUpdatePublicationModeFilterted(
- tableFilterString, stmt, false);
- break;
- }
- } else {
- switch (publicationAutocreateMode) {
- case FILTERED:
- createOrUpdatePublicationModeFilterted(
- tableFilterString, stmt, true);
- break;
- default:
- LOGGER.trace(
- "A logical publication named '{}'
for plugin '{}' and database '{}' is already active on the server "
- + "and will be used by the
plugin",
- publicationName,
- plugin,
- database());
- }
- }
- }
- }
- conn.commit();
- conn.setAutoCommit(true);
- } catch (SQLException e) {
- throw new JdbcConnectionException(e);
- }
- }
- }
-
- private void createOrUpdatePublicationModeFilterted(
- String tableFilterString, Statement stmt, boolean isUpdate) {
- String createOrUpdatePublicationStmt;
- try {
- Set<TableId> tablesToCapture = determineCapturedTables();
- tableFilterString =
- tablesToCapture.stream()
- .map(TableId::toDoubleQuotedString)
- .collect(Collectors.joining(", "));
- if (tableFilterString.isEmpty()) {
- throw new DebeziumException(
- String.format(
- "No table filters found for filtered
publication %s",
- publicationName));
- }
- boolean supportPartitionRoot = !isUpdate
- && ((BaseConnection)
pgConnection()).haveMinimumServerVersion(ServerVersion.v13);
- String pubViaRootSuffix = supportPartitionRoot
- ? " WITH (publish_via_partition_root = true)" : "";
- createOrUpdatePublicationStmt =
- isUpdate
- ? String.format(
- "ALTER PUBLICATION %s SET TABLE %s;",
- publicationName, tableFilterString)
- : String.format(
- "CREATE PUBLICATION %s FOR TABLE %s%s;",
- publicationName, tableFilterString,
pubViaRootSuffix);
- LOGGER.info(
- isUpdate
- ? "Updating Publication with statement '{}'"
- : "Creating Publication with statement '{}'",
- createOrUpdatePublicationStmt);
- stmt.execute(createOrUpdatePublicationStmt);
- } catch (Exception e) {
- throw new ConnectException(
- String.format(
- "Unable to %s filtered publication %s for %s",
- isUpdate ? "update" : "create", publicationName,
tableFilterString),
- e);
- }
- }
-
- private Set<TableId> determineCapturedTables() throws Exception {
- Set<TableId> allTableIds =
jdbcConnection.getAllTableIds(connectorConfig.databaseName());
-
- Set<TableId> capturedTables = new HashSet<>();
-
- for (TableId tableId : allTableIds) {
- if (tableFilter.dataCollectionFilter().isIncluded(tableId)) {
- LOGGER.trace("Adding table {} to the list of captured tables",
tableId);
- capturedTables.add(tableId);
- } else {
- LOGGER.trace(
- "Ignoring table {} as it's not included in the filter
configuration",
- tableId);
- }
- }
-
- return capturedTables.stream()
- .sorted()
- .collect(Collectors.toCollection(LinkedHashSet::new));
- }
-
- protected void initReplicationSlot() throws SQLException,
InterruptedException {
- ServerInfo.ReplicationSlot slotInfo = getSlotInfo();
-
- boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID ==
slotInfo;
- try {
- // there's no info for this plugin and slot so create a new slot
- if (shouldCreateSlot) {
- this.createReplicationSlot();
- }
-
- // replication connection does not support parsing of SQL
statements so we need to
- // create
- // the connection without executing on connect statements - see
JDBC opt
- // preferQueryMode=simple
- pgConnection();
- final String identifySystemStatement = "IDENTIFY_SYSTEM";
- LOGGER.debug(
- "running '{}' to validate replication connection",
identifySystemStatement);
- final Lsn xlogStart =
- queryAndMap(
- identifySystemStatement,
- rs -> {
- if (!rs.next()) {
- throw new IllegalStateException(
- "The DB connection is not a valid
replication connection");
- }
- String xlogpos = rs.getString("xlogpos");
- LOGGER.debug("received latest xlogpos '{}'",
xlogpos);
- return Lsn.valueOf(xlogpos);
- });
-
- if (slotCreationInfo != null) {
- this.defaultStartingPos = slotCreationInfo.startLsn();
- } else if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) {
- // this is a new slot or we weren't able to read a valid flush
LSN pos, so we always
- // start from the xlog pos that was reported
- this.defaultStartingPos = xlogStart;
- } else {
- Lsn latestFlushedLsn = slotInfo.latestFlushedLsn();
- this.defaultStartingPos =
- latestFlushedLsn.compareTo(xlogStart) < 0 ?
latestFlushedLsn : xlogStart;
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("found previous flushed LSN '{}'",
latestFlushedLsn);
- }
- }
- hasInitedSlot = true;
- } catch (SQLException e) {
- throw new JdbcConnectionException(e);
- }
- }
-
- // Temporary replication slots is a new feature of PostgreSQL 10
- private boolean useTemporarySlot() throws SQLException {
- // Temporary replication slots cannot be used due to connection restart
- // when finding WAL position
- // return dropSlotOnClose &&
pgConnection().haveMinimumServerVersion(ServerVersion.v10);
- return false;
- }
-
- /**
- * creating a replication connection and starting to stream involves a few
steps: 1. we create
- * the connection and ensure that a. the slot exists b. the slot isn't
currently being used 2.
- * we query to get our potential start position in the slot (lsn) 3. we
try and start streaming,
- * depending on our options (such as in wal2json) this may fail, which can
result in the
- * connection being killed and we need to start the process over if we are
using a temporary
- * slot 4. actually start the streamer
- *
- * <p>This method takes care of all of these and this method queries for a
default starting
- * position If you know where you are starting from you should call {@link
#startStreaming(Lsn,
- * WalPositionLocator)}, this method delegates to that method
- *
- * @return
- * @throws SQLException
- * @throws InterruptedException
- */
- @Override
- public ReplicationStream startStreaming(WalPositionLocator walPosition)
- throws SQLException, InterruptedException {
- return startStreaming(null, walPosition);
- }
-
- @Override
- public ReplicationStream startStreaming(Lsn offset, WalPositionLocator
walPosition)
- throws SQLException, InterruptedException {
- initConnection();
-
- connect();
- if (offset == null || !offset.isValid()) {
- offset = defaultStartingPos;
- }
- Lsn lsn = offset;
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("starting streaming from LSN '{}'", lsn);
- }
-
- final int maxRetries = connectorConfig.maxRetries();
- final Duration delay = connectorConfig.retryDelay();
- int tryCount = 0;
- while (true) {
- try {
- return createReplicationStream(lsn, walPosition);
- } catch (Exception e) {
- String message = "Failed to start replication stream at " +
lsn;
- if (++tryCount > maxRetries) {
- if (e.getMessage().matches(".*replication slot .* is
active.*")) {
- message +=
- "; when setting up multiple connectors for the
same database host, please make sure to use a distinct replication slot name
for each.";
- }
- throw new DebeziumException(message, e);
- } else {
- LOGGER.warn(
- message + ", waiting for {} ms and retrying,
attempt number {} over {}",
- delay,
- tryCount,
- maxRetries);
- final Metronome metronome = Metronome.sleeper(delay,
Clock.SYSTEM);
- metronome.pause();
- }
- }
- }
- }
-
- @Override
- public void initConnection() throws SQLException, InterruptedException {
- // See
https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
- // For pgoutput specifically, the publication must be created before
the slot.
- initPublication();
- if (!hasInitedSlot) {
- initReplicationSlot();
- }
- }
-
- @Override
- public Optional<SlotCreationResult> createReplicationSlot() throws
SQLException {
- // note that some of these options are only supported in Postgres
9.4+, additionally
- // the options are not yet exported by the jdbc api wrapper,
therefore, we just do
- // this ourselves but eventually this should be moved back to the jdbc
API
- // see https://github.com/pgjdbc/pgjdbc/issues/1305
-
- LOGGER.debug("Creating new replication slot '{}' for plugin '{}'",
slotName, plugin);
- String tempPart = "";
- // Exported snapshots are supported in Postgres 9.4+
- boolean canExportSnapshot =
pgConnection().haveMinimumServerVersion(ServerVersion.v9_4);
- if ((dropSlotOnClose) && !canExportSnapshot) {
- LOGGER.warn(
- "A slot marked as temporary or with an exported snapshot
was created, "
- + "but not on a supported version of Postgres,
ignoring!");
- }
- if (useTemporarySlot()) {
- tempPart = "TEMPORARY";
- }
-
- // See
https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
- // For pgoutput specifically, the publication must be created prior to
the slot.
- initPublication();
-
- try (Statement stmt = pgConnection().createStatement()) {
- String createCommand =
- String.format(
- "CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s",
- slotName, tempPart,
plugin.getPostgresPluginName());
- LOGGER.info("Creating replication slot with command {}",
createCommand);
- stmt.execute(createCommand);
- // when we are in Postgres 9.4+, we can parse the slot creation
info,
- // otherwise, it returns nothing
- if (canExportSnapshot) {
- this.slotCreationInfo = parseSlotCreation(stmt.getResultSet());
- }
-
- return Optional.ofNullable(slotCreationInfo);
- }
- }
-
- protected BaseConnection pgConnection() throws SQLException {
- return (BaseConnection) connection(false);
- }
-
- private SlotCreationResult parseSlotCreation(ResultSet rs) {
- try {
- if (rs.next()) {
- String slotName = rs.getString("slot_name");
- String startPoint = rs.getString("consistent_point");
- String snapName = rs.getString("snapshot_name");
- String pluginName = rs.getString("output_plugin");
-
- return new SlotCreationResult(slotName, startPoint, snapName,
pluginName);
- } else {
- throw new ConnectException("No replication slot found");
- }
- } catch (SQLException ex) {
- throw new ConnectException("Unable to parse
create_replication_slot response", ex);
- }
- }
-
- private ReplicationStream createReplicationStream(
- final Lsn startLsn, WalPositionLocator walPosition)
- throws SQLException, InterruptedException {
- PGReplicationStream s;
-
- try {
- try {
- s =
- startPgReplicationStream(
- startLsn,
- plugin.forceRds()
- ?
messageDecoder::optionsWithoutMetadata
- : messageDecoder::optionsWithMetadata);
- messageDecoder.setContainsMetadata(plugin.forceRds() ? false :
true);
- } catch (PSQLException e) {
- LOGGER.debug(
- "Could not register for streaming, retrying without
optional options", e);
-
- // re-init the slot after a failed start of slot, as this
- // may have closed the slot
- if (useTemporarySlot()) {
- initReplicationSlot();
- }
-
- s =
- startPgReplicationStream(
- startLsn,
- plugin.forceRds()
- ?
messageDecoder::optionsWithoutMetadata
- : messageDecoder::optionsWithMetadata);
- messageDecoder.setContainsMetadata(plugin.forceRds() ? false :
true);
- }
- } catch (PSQLException e) {
- if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
- // It is possible we are connecting to an old wal2json plug-in
- LOGGER.warn(
- "Could not register for streaming with metadata in
messages, falling back to messages without metadata");
-
- // re-init the slot after a failed start of slot, as this
- // may have closed the slot
- if (useTemporarySlot()) {
- initReplicationSlot();
- }
-
- s = startPgReplicationStream(startLsn,
messageDecoder::optionsWithoutMetadata);
- messageDecoder.setContainsMetadata(false);
- } else if (e.getMessage()
- .matches("(?s)ERROR: requested WAL segment .* has already
been removed.*")) {
- LOGGER.error("Cannot rewind to last processed WAL position",
e);
- throw new ConnectException(
- "The offset to start reading from has been removed
from the database write-ahead log. Create a new snapshot and consider setting
of PostgreSQL parameter wal_keep_segments = 0.");
- } else {
- throw e;
- }
- }
-
- final PGReplicationStream stream = s;
-
- return new ReplicationStream() {
-
- private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
- private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
- private ExecutorService keepAliveExecutor = null;
- private AtomicBoolean keepAliveRunning;
- private final Metronome metronome =
- Metronome.sleeper(statusUpdateInterval, Clock.SYSTEM);
-
- // make sure this is volatile since multiple threads may be
interested in this value
- private volatile Lsn lastReceivedLsn;
-
- @Override
- public void read(ReplicationMessageProcessor processor)
- throws SQLException, InterruptedException {
- processWarnings(false);
- ByteBuffer read = stream.read();
- final Lsn lastReceiveLsn =
Lsn.valueOf(stream.getLastReceiveLSN());
- LOGGER.trace(
- "Streaming requested from LSN {}, received LSN {}",
- startLsn,
- lastReceiveLsn);
- if (reachEnd(lastReceivedLsn)) {
- lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
- LOGGER.trace("Received message at LSN {}",
lastReceivedLsn);
- processor.process(new ReplicationMessage.NoopMessage(null,
null));
- return;
- }
- if (messageDecoder.shouldMessageBeSkipped(
- read, lastReceiveLsn, startLsn, walPosition)) {
- return;
- }
- deserializeMessages(read, processor);
- }
-
- @Override
- public boolean readPending(ReplicationMessageProcessor processor)
- throws SQLException, InterruptedException {
- processWarnings(false);
- ByteBuffer read = stream.readPending();
- final Lsn lastReceiveLsn =
Lsn.valueOf(stream.getLastReceiveLSN());
- LOGGER.trace(
- "Streaming requested from LSN {}, received LSN {}",
- startLsn,
- lastReceiveLsn);
-
- if (reachEnd(lastReceiveLsn)) {
- lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
- LOGGER.trace("Received message at LSN {}",
lastReceivedLsn);
- processor.process(new ReplicationMessage.NoopMessage(null,
null));
- return true;
- }
-
- if (read == null) {
- return false;
- }
-
- if (messageDecoder.shouldMessageBeSkipped(
- read, lastReceiveLsn, startLsn, walPosition)) {
- return true;
- }
-
- deserializeMessages(read, processor);
-
- return true;
- }
-
- private void deserializeMessages(
- ByteBuffer buffer, ReplicationMessageProcessor processor)
- throws SQLException, InterruptedException {
- lastReceivedLsn = Lsn.valueOf(stream.getLastReceiveLSN());
- LOGGER.trace("Received message at LSN {}", lastReceivedLsn);
- messageDecoder.processMessage(buffer, processor, typeRegistry);
- }
-
- @Override
- public void close() throws SQLException {
- processWarnings(true);
- stream.close();
- }
-
- @Override
- public void flushLsn(Lsn lsn) throws SQLException {
- doFlushLsn(lsn);
- }
-
- private void doFlushLsn(Lsn lsn) throws SQLException {
- stream.setFlushedLSN(lsn.asLogSequenceNumber());
- stream.setAppliedLSN(lsn.asLogSequenceNumber());
-
- stream.forceUpdateStatus();
- }
-
- @Override
- public Lsn lastReceivedLsn() {
- return lastReceivedLsn;
- }
-
- @Override
- public void startKeepAlive(ExecutorService service) {
- if (keepAliveExecutor == null) {
- keepAliveExecutor = service;
- keepAliveRunning = new AtomicBoolean(true);
- keepAliveExecutor.submit(
- () -> {
- while (keepAliveRunning.get()) {
- try {
- LOGGER.trace(
- "Forcing status update with
replication stream");
- stream.forceUpdateStatus();
- metronome.pause();
- } catch (Exception exp) {
- throw new RuntimeException(
- "received unexpected exception
will perform keep alive",
- exp);
- }
- }
- });
- }
- }
-
- @Override
- public void stopKeepAlive() {
- if (keepAliveExecutor != null) {
- keepAliveRunning.set(false);
- keepAliveExecutor.shutdownNow();
- keepAliveExecutor = null;
- }
- }
-
- private void processWarnings(final boolean forced) throws
SQLException {
- if (--warningCheckCounter == 0 || forced) {
- warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
- for (SQLWarning w = connection().getWarnings();
- w != null;
- w = w.getNextWarning()) {
- LOGGER.debug(
- "Server-side message: '{}', state = {}, code =
{}",
- w.getMessage(),
- w.getSQLState(),
- w.getErrorCode());
- }
- connection().clearWarnings();
- }
- }
-
- @Override
- public Lsn startLsn() {
- return startLsn;
- }
-
- private boolean reachEnd(Lsn receivedLsn) {
- if (receivedLsn == null) {
- return false;
- }
- return endingPos != null
- && (!endingPos.isNonStopping())
- && endingPos.compareTo(receivedLsn) < 0;
- }
- };
- }
-
- public void setEndingPos(Lsn endingPos) {
- this.endingPos = endingPos;
- }
-
- private PGReplicationStream startPgReplicationStream(
- final Lsn lsn,
- BiFunction<
- ChainedLogicalStreamBuilder,
- Function<Integer, Boolean>,
- ChainedLogicalStreamBuilder>
- configurator)
- throws SQLException {
- assert lsn != null;
- ChainedLogicalStreamBuilder streamBuilder =
- pgConnection()
- .getReplicationAPI()
- .replicationStream()
- .logical()
- .withSlotName("\"" + slotName + "\"")
- .withStartPosition(lsn.asLogSequenceNumber())
- .withSlotOptions(streamParams);
- streamBuilder = configurator.apply(streamBuilder,
this::hasMinimumVersion);
-
- if (statusUpdateInterval != null && statusUpdateInterval.toMillis() >
0) {
- streamBuilder.withStatusInterval(
- toIntExact(statusUpdateInterval.toMillis()),
TimeUnit.MILLISECONDS);
- }
-
- PGReplicationStream stream = streamBuilder.start();
-
- // TODO DBZ-508 get rid of this
- // Needed by tests when connections are opened and closed in a fast
sequence
- try {
- Thread.sleep(10);
- } catch (Exception e) {
- }
- stream.forceUpdateStatus();
- return stream;
- }
-
- private Boolean hasMinimumVersion(int version) {
- try {
- return pgConnection().haveMinimumServerVersion(version);
- } catch (SQLException e) {
- throw new DebeziumException(e);
- }
- }
-
- @Override
- public synchronized void close() {
- close(true);
- }
-
- public synchronized void close(boolean dropSlot) {
- try {
- LOGGER.debug("Closing message decoder");
- messageDecoder.close();
- } catch (Throwable e) {
- LOGGER.error("Unexpected error while closing message decoder", e);
- }
-
- try {
- LOGGER.debug("Closing replication connection");
- super.close();
- } catch (Throwable e) {
- LOGGER.error("Unexpected error while closing Postgres connection",
e);
- }
- if (dropSlotOnClose && dropSlot) {
- // we're dropping the replication slot via a regular - i.e. not a
replication -
- // connection
- try (PostgresConnection connection =
- new PostgresConnection(
- connectorConfig.getJdbcConfig(),
- PostgresConnection.CONNECTION_DROP_SLOT)) {
- connection.dropReplicationSlot(slotName);
- } catch (Throwable e) {
- LOGGER.error("Unexpected error while dropping replication
slot", e);
- }
- }
- }
-
- @Override
- public void reconnect() throws SQLException {
- close(false);
- // Don't re-execute initial commands on reconnection
- connection(false);
- }
-
- protected static class ReplicationConnectionBuilder implements Builder {
-
- private final PostgresConnectorConfig config;
- private String slotName = DEFAULT_SLOT_NAME;
- private String publicationName = DEFAULT_PUBLICATION_NAME;
- private RelationalTableFilters tableFilter;
- private PostgresConnectorConfig.AutoCreateMode
publicationAutocreateMode =
- PostgresConnectorConfig.AutoCreateMode.ALL_TABLES;
- private PostgresConnectorConfig.LogicalDecoder plugin =
- PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
- private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
- private Duration statusUpdateIntervalVal;
- private boolean doSnapshot;
- private TypeRegistry typeRegistry;
- private PostgresSchema schema;
- private Properties slotStreamParams = new Properties();
- private PostgresConnection jdbcConnection;
-
- protected ReplicationConnectionBuilder(PostgresConnectorConfig config)
{
- assert config != null;
- this.config = config;
- }
-
- @Override
- public ReplicationConnectionBuilder withSlot(final String slotName) {
- assert slotName != null;
- this.slotName = slotName;
- return this;
- }
-
- @Override
- public Builder withPublication(String publicationName) {
- assert publicationName != null;
- this.publicationName = publicationName;
- return this;
- }
-
- @Override
- public Builder withTableFilter(RelationalTableFilters tableFilter) {
- assert tableFilter != null;
- this.tableFilter = tableFilter;
- return this;
- }
-
- @Override
- public Builder withPublicationAutocreateMode(
- PostgresConnectorConfig.AutoCreateMode
publicationAutocreateMode) {
- assert publicationName != null;
- this.publicationAutocreateMode = publicationAutocreateMode;
- return this;
- }
-
- @Override
- public ReplicationConnectionBuilder withPlugin(
- final PostgresConnectorConfig.LogicalDecoder plugin) {
- assert plugin != null;
- this.plugin = plugin;
- return this;
- }
-
- @Override
- public ReplicationConnectionBuilder dropSlotOnClose(final boolean
dropSlotOnClose) {
- this.dropSlotOnClose = dropSlotOnClose;
- return this;
- }
-
- @Override
- public ReplicationConnectionBuilder streamParams(final String
slotStreamParams) {
- if (slotStreamParams != null && !slotStreamParams.isEmpty()) {
- this.slotStreamParams = new Properties();
- String[] paramsWithValues = slotStreamParams.split(";");
- for (String paramsWithValue : paramsWithValues) {
- String[] paramAndValue = paramsWithValue.split("=");
- if (paramAndValue.length == 2) {
- this.slotStreamParams.setProperty(paramAndValue[0],
paramAndValue[1]);
- } else {
- LOGGER.warn(
- "The following STREAM_PARAMS value is invalid:
{}",
- paramsWithValue);
- }
- }
- }
- return this;
- }
-
- @Override
- public ReplicationConnectionBuilder statusUpdateInterval(
- final Duration statusUpdateInterval) {
- this.statusUpdateIntervalVal = statusUpdateInterval;
- return this;
- }
-
- @Override
- public Builder doSnapshot(boolean doSnapshot) {
- this.doSnapshot = doSnapshot;
- return this;
- }
-
- @Override
- public Builder jdbcMetadataConnection(PostgresConnection
jdbcConnection) {
- this.jdbcConnection = jdbcConnection;
- return this;
- }
-
- @Override
- public ReplicationConnection build() {
- assert plugin != null : "Decoding plugin name is not set";
- return new PostgresReplicationConnection(
- config,
- slotName,
- publicationName,
- tableFilter,
- publicationAutocreateMode,
- plugin,
- dropSlotOnClose,
- doSnapshot,
- statusUpdateIntervalVal,
- jdbcConnection,
- typeRegistry,
- slotStreamParams,
- schema);
- }
-
- @Override
- public Builder withTypeRegistry(TypeRegistry typeRegistry) {
- this.typeRegistry = typeRegistry;
- return this;
- }
-
- @Override
- public Builder withSchema(PostgresSchema schema) {
- this.schema = schema;
- return this;
- }
- }
-}
\ No newline at end of file
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 9505290d55e..ebbd9c4acf1 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -64,6 +64,7 @@ import com.google.common.base.Preconditions;
import io.debezium.data.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
@@ -80,16 +81,16 @@ public class PipelineCoordinator {
// taskId -> writeFailReason
private final Map<String, String> taskErrorMaps = new
ConcurrentHashMap<>();
private final ThreadPoolExecutor executor;
- private static final int MAX_CONCURRENT_TASKS = 10;
private static final int QUEUE_CAPACITY = 128;
private static final ObjectMapper objectMapper = new ObjectMapper();
private final byte[] LINE_DELIMITER =
"\n".getBytes(StandardCharsets.UTF_8);
- public PipelineCoordinator() {
+ public PipelineCoordinator(
+ @Value("${pipeline.max-concurrent-tasks:10}") int
maxConcurrentTasks) {
this.executor =
new ThreadPoolExecutor(
- MAX_CONCURRENT_TASKS,
- MAX_CONCURRENT_TASKS,
+ maxConcurrentTasks,
+ maxConcurrentTasks,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(QUEUE_CAPACITY),
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index f853f931646..b1d1cd8ba03 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -270,9 +270,12 @@ public class DorisBatchStreamLoad implements Serializable {
}
public void close() {
- // close async executor
- this.loadExecutorService.shutdown();
+ // Wake up any blocked producer and stop the loader to avoid writer
thread leak.
this.started.set(false);
+ this.loadThreadAlive = false;
+ this.flushQueue.clear();
+ this.currentCacheBytes.set(0);
+ this.loadExecutorService.shutdownNow();
}
@VisibleForTesting
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 1bc7db23fd4..a4c19aedb9e 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -849,7 +849,12 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient,
jdbcConnection);
- return new BinlogSplitReader(statefulTaskContext, 0);
+ int readerTag = Math.abs(config.getJobId().hashCode());
+ LOG.info(
+ "create binlog reader for job {}, thread tag =
binlog-reader-{}",
+ config.getJobId(),
+ readerTag);
+ return new BinlogSplitReader(statefulTaskContext, readerTag);
}
private MySqlSourceConfig getSourceConfig(JobBaseConfig config) {
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index 2e09e48957a..027a4c7af94 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -55,14 +55,17 @@ import
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
import org.apache.flink.table.types.DataType;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.time.Instant;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
@@ -80,6 +83,8 @@ import io.debezium.relational.history.TableChanges;
import io.debezium.time.Conversions;
import lombok.Data;
import org.postgresql.Driver;
+import org.postgresql.core.BaseConnection;
+import org.postgresql.core.ServerVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,6 +103,11 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
public void initialize(String jobId, DataSource dataSource, Map<String,
String> config) {
PostgresSourceConfig sourceConfig = generatePostgresConfig(config,
jobId, 0);
PostgresDialect dialect = new PostgresDialect(sourceConfig);
+ // Doris-owned publication: pre-create it covering all include_tables
(autocreate is
+ // DISABLED).
+ if (isPublicationDorisOwned(config, jobId)) {
+ createPublicationForDorisOwned(dialect, config, jobId);
+ }
// Only create the slot when Doris owns it (name == default);
user-provided slots must
// pre-exist, validated at CREATE JOB.
if (isSlotDorisOwned(config, jobId)) {
@@ -150,6 +160,61 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
}
}
+ /**
+ * Create/ensure the Doris-owned publication for all include_tables
(idempotent, multi-BE safe).
+ */
+ private void createPublicationForDorisOwned(
+ PostgresDialect dialect, Map<String, String> config, String jobId)
{
+ String pubName = resolvePublicationName(config, jobId);
+ String schema = config.get(DataSourceConfigKeys.SCHEMA);
+ String[] qualified = ConfigUtil.getTableList(schema, config);
+ if (qualified.length == 0) {
+ throw new CdcClientException("No tables to create publication " +
pubName);
+ }
+ String tableList =
+ Arrays.stream(qualified)
+ .map(
+ q ->
+ new TableId(null, schema,
q.substring(q.indexOf('.') + 1))
+ .toDoubleQuotedString())
+ .collect(Collectors.joining(", "));
+ // Mirrors debezium PostgresReplicationConnection#initPublication:
check existence, then
+ // CREATE ... FOR TABLE / ALTER ... SET TABLE (here always the full
include_tables set).
+ try (PostgresConnection conn = dialect.openJdbcConnection();
+ Statement stmt = conn.connection().createStatement()) {
+ long count;
+ try (ResultSet rs =
+ stmt.executeQuery(
+ "SELECT COUNT(1) FROM pg_publication WHERE pubname
= '"
+ + pubName
+ + "'")) {
+ rs.next();
+ count = rs.getLong(1);
+ }
+ if (count == 0) {
+ // Preserve debezium FILTERED behavior: on PG 13+ publish
partitioned-root changes
+ // as the root table, matching
configFactory.setIncludePartitionedTables(true).
+ String pubViaRootSuffix =
+ ((BaseConnection) conn.connection())
+
.haveMinimumServerVersion(ServerVersion.v13)
+ ? " WITH (publish_via_partition_root = true)"
+ : "";
+ stmt.execute(
+ "CREATE PUBLICATION "
+ + pubName
+ + " FOR TABLE "
+ + tableList
+ + pubViaRootSuffix);
+ } else {
+ stmt.execute("ALTER PUBLICATION " + pubName + " SET TABLE " +
tableList);
+ }
+ LOG.info("Ensured publication {} for tables {}", pubName,
tableList);
+ } catch (SQLException e) {
+ throw new CdcClientException(
+ "Failed to create publication " + pubName + ": " +
e.getMessage(), e);
+ }
+ }
+
@Override
protected PostgresSourceConfig getSourceConfig(JobBaseConfig config) {
return generatePostgresConfig(config);
@@ -242,18 +307,17 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
Properties dbzProps = ConfigUtil.getDefaultDebeziumProps();
dbzProps.put("interval.handling.mode", "string");
- // Doris-owned = FILTERED (auto-create per-table publication);
otherwise DISABLED
- // (user-provided or legacy dbz_publication already present on PG).
+ // Always DISABLED; the publication always pre-exists: Doris creates
it for all
+ // include_tables
+ // in initialize(); user-provided / legacy (dbz_publication) ones are
already present on PG.
+ // FILTERED would make each split SET TABLE its single table -> flip
publication -> data
+ // loss.
String publicationName = resolvePublicationName(cdcConfig, jobId);
String slotName = resolveSlotName(cdcConfig, jobId);
- AutoCreateMode autocreateMode =
- isPublicationDorisOwned(cdcConfig, jobId)
- ? AutoCreateMode.FILTERED
- : AutoCreateMode.DISABLED;
dbzProps.put(PostgresConnectorConfig.PUBLICATION_NAME.name(),
publicationName);
dbzProps.put(
PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name(),
- autocreateMode.getValue());
+ AutoCreateMode.DISABLED.getValue());
configFactory.debeziumProperties(dbzProps);
@@ -279,7 +343,8 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
// support scan partition table
configFactory.setIncludePartitionedTables(true);
- // FE injects "true" on TVF path; from-to leaves it absent → default
false.
+ // from-to: FE forces "true" (at-least-once, skip backfill); TVF:
absent → false
+ // (exactly-once needs backfill).
configFactory.skipSnapshotBackfill(
Boolean.parseBoolean(cdcConfig.get(DataSourceConfigKeys.SKIP_SNAPSHOT_BACKFILL)));
@@ -319,6 +384,10 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
PostgresDialect dialect = new PostgresDialect(sourceConfig);
PostgresSourceFetchTaskContext taskContext =
new PostgresSourceFetchTaskContext(sourceConfig, dialect);
+ LOG.info(
+ "create snapshot reader for job {}, thread tag =
debezium-snapshot-reader-{}",
+ config.getJobId(),
+ subtaskId);
IncrementalSourceScanFetcher snapshotReader =
new IncrementalSourceScanFetcher(taskContext, subtaskId);
return snapshotReader;
@@ -330,9 +399,13 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
PostgresDialect dialect = new PostgresDialect(sourceConfig);
PostgresSourceFetchTaskContext taskContext =
new PostgresSourceFetchTaskContext(sourceConfig, dialect);
- // subTaskId maybe add jobId?
+ int readerTag = Math.abs(config.getJobId().hashCode());
+ LOG.info(
+ "create binlog reader for job {}, thread tag =
debezium-reader-{}",
+ config.getJobId(),
+ readerTag);
IncrementalSourceStreamFetcher binlogReader =
- new IncrementalSourceStreamFetcher(taskContext, 0);
+ new IncrementalSourceStreamFetcher(taskContext, readerTag);
return binlogReader;
}
diff --git a/fs_brokers/cdc_client/src/main/resources/application.properties
b/fs_brokers/cdc_client/src/main/resources/application.properties
index d22c98d434b..a8d22bed9dd 100644
--- a/fs_brokers/cdc_client/src/main/resources/application.properties
+++ b/fs_brokers/cdc_client/src/main/resources/application.properties
@@ -20,4 +20,6 @@ server.port=9096
backend.http.port=8040
# see doris-meta/image/VERSION
cluster.token=cluster-token
-spring.mvc.async.request-timeout=300000
\ No newline at end of file
+spring.mvc.async.request-timeout=300000
+# Max concurrent write-record tasks (override via be.conf
cdc_client_java_opts=-Dpipeline.max-concurrent-tasks=N)
+# pipeline.max-concurrent-tasks=10
\ No newline at end of file
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table.out
new file mode 100644
index 00000000000..9da45918b71
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table.out
@@ -0,0 +1,43 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_count_t1 --
+1007
+
+-- !select_count_t2 --
+1007
+
+-- !select_updates_t1 --
+1 99
+100 99
+500 99
+999 99
+
+-- !select_updates_t2 --
+1 99
+100 99
+500 99
+999 99
+
+-- !select_inserts_t1 --
+1001 concurrent_ins 1
+1002 concurrent_ins 1
+1003 concurrent_ins 1
+1004 concurrent_ins 1
+1005 concurrent_ins 1
+1006 concurrent_ins 1
+1007 concurrent_ins 1
+1008 concurrent_ins 1
+1009 concurrent_ins 1
+1010 concurrent_ins 1
+
+-- !select_inserts_t2 --
+1001 concurrent_ins 1
+1002 concurrent_ins 1
+1003 concurrent_ins 1
+1004 concurrent_ins 1
+1005 concurrent_ins 1
+1006 concurrent_ins 1
+1007 concurrent_ins 1
+1008 concurrent_ins 1
+1009 concurrent_ins 1
+1010 concurrent_ins 1
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table.groovy
new file mode 100644
index 00000000000..6ecdffdea8d
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table.groovy
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Multi-table from-to snapshot + concurrent DML: with >=2 tables each
snapshot split flips the
+// shared publication to its single table, so a row written to a table while
the publication
+// temporarily excludes it would be filtered on stream replay and lost. The
fix keeps the
+// publication full-table, so no row is lost. Asserts every row of BOTH tables
is synced.
+suite("test_streaming_postgres_job_snapshot_with_concurrent_dml_multi_table",
+ "p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def jobName = "test_streaming_pg_snapshot_concurrent_dml_multi_table_name"
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "streaming_snapshot_dml_multi_pg_t1"
+ def table2 = "streaming_snapshot_dml_multi_pg_t2"
+ def tables = [table1, table2]
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+ def totalRows = 1000
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+ tables.each { sql """drop table if exists ${currentDb}.${it} force""" }
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // ===== Prepare PG side: two tables, each 1000 snapshot rows =====
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ tables.each { t ->
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${t}"""
+ sql """
+ create table ${pgDB}.${pgSchema}.${t} (
+ id integer PRIMARY KEY,
+ tag varchar(64),
+ version integer
+ );
+ """
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${t} (id, tag, version)
+ SELECT g, 'snap', 0 FROM generate_series(1,
${totalRows}) g"""
+ }
+ }
+
+ // snapshot_split_size=10 + snapshot_parallelism=1 -> 100 serial
splits per table, slow
+ // enough that the concurrent DML overlaps snapshot while the
publication keeps flipping.
+ sql """CREATE JOB ${jobName}
+ ON STREAMING
+ FROM POSTGRES (
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "include_tables" = "${table1},${table2}",
+ "offset" = "initial",
+ "snapshot_split_size" = "10",
+ "snapshot_parallelism" = "1"
+ )
+ TO DATABASE ${currentDb} (
+ "table.create.properties.replication_num" = "1"
+ )
+ """
+
+ // Wait until the first snapshot split commits (slot created, snapshot
in progress) so the
+ // DML below lands inside the snapshot window and overlaps the
publication flipping.
+ Awaitility.await().atMost(120, SECONDS).pollInterval(1,
SECONDS).until({
+ def c = sql """select SucceedTaskCount from jobs("type"="insert")
where Name='${jobName}' and ExecuteType='STREAMING'"""
+ c.size() == 1 && (c.get(0).get(0).toString() as long) >= 1
+ })
+
+ // Concurrent DML on BOTH tables while still snapshotting. Same DML
shape on each table.
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ tables.each { t ->
+ for (int i = 1; i <= 10; i++) {
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${t} (id, tag,
version) VALUES (${totalRows + i}, 'concurrent_ins', 1)"""
+ }
+ sql """UPDATE ${pgDB}.${pgSchema}.${t} SET version=99 WHERE id
IN (1, 100, 500, 999)"""
+ sql """DELETE FROM ${pgDB}.${pgSchema}.${t} WHERE id IN (2,
200, 800)"""
+ }
+ }
+
+ // Each table: 1000 + 10 inserts - 3 deletes = 1007 rows,
updates/deletes/inserts applied.
+ def expectedRows = totalRows + 10 - 3
+ try {
+ Awaitility.await().atMost(600, SECONDS)
+ .pollInterval(2, SECONDS).until(
+ {
+ boolean allOk = true
+ for (def t : tables) {
+ def showTbl = sql """show tables from ${currentDb}
like '${t}'"""
+ if (showTbl.size() == 0) {
+ allOk = false
+ break
+ }
+ def cnt = sql """select count(1) from
${currentDb}.${t}"""
+ def upd1 = sql """select version from
${currentDb}.${t} where id=1"""
+ def upd999 = sql """select version from
${currentDb}.${t} where id=999"""
+ def del2 = sql """select count(1) from
${currentDb}.${t} where id=2"""
+ def del800 = sql """select count(1) from
${currentDb}.${t} where id=800"""
+ def ins = sql """select count(1) from
${currentDb}.${t} where id=${totalRows + 10}"""
+ def v1 = upd1.size() == 0 ? null :
upd1.get(0).get(0)
+ def v999 = upd999.size() == 0 ? null :
upd999.get(0).get(0)
+ log.info("table=${t} cnt=${cnt} v1=${v1}
v999=${v999} del2=${del2} del800=${del800} ins=${ins}")
+ boolean ok = cnt.get(0).get(0) == expectedRows &&
+ v1 != null && v1.toString() == '99' &&
+ v999 != null && v999.toString() == '99' &&
+ del2.get(0).get(0) == 0 &&
+ del800.get(0).get(0) == 0 &&
+ ins.get(0).get(0) == 1
+ if (!ok) {
+ allOk = false
+ break
+ }
+ }
+ allOk
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex
+ }
+
+ qt_select_count_t1 """select count(1) from ${currentDb}.${table1}"""
+ qt_select_count_t2 """select count(1) from ${currentDb}.${table2}"""
+ qt_select_updates_t1 """select id, version from ${currentDb}.${table1}
where id in (1, 100, 500, 999) order by id"""
+ qt_select_updates_t2 """select id, version from ${currentDb}.${table2}
where id in (1, 100, 500, 999) order by id"""
+ qt_select_inserts_t1 """select id, tag, version from
${currentDb}.${table1} where id > ${totalRows} order by id"""
+ qt_select_inserts_t2 """select id, tag, version from
${currentDb}.${table2} where id > ${totalRows} order by id"""
+
+ sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert")
where Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
index 03af48213aa..c4214145151 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy
@@ -95,13 +95,13 @@ suite("test_streaming_postgres_job_special_offset",
"p0,external,pg,external_doc
sql """drop table if exists ${currentDb}.${table1} force"""
// ===== Test 2: CREATE with initial, then ALTER with JSON LSN offset
=====
- // Pre-create a DUPLICATE KEY table so duplicate rows from
re-consuming are visible
+ // UNIQUE KEY table: dedup at-least-once re-consume; this case only
verifies ALTER offset LSN filtering.
sql """
CREATE TABLE IF NOT EXISTS ${currentDb}.${table1} (
`id` int NULL,
`name` varchar(100) NULL
) ENGINE=OLAP
- DUPLICATE KEY(`id`)
+ UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
PROPERTIES ("replication_allocation" = "tag.location.default: 1")
"""
@@ -126,7 +126,7 @@ suite("test_streaming_postgres_job_special_offset",
"p0,external,pg,external_doc
"""
Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
def result = sql """SELECT count(*) FROM ${currentDb}.${table1}"""
- return result[0][0] >= 2
+ return result[0][0] >= 3
})
qt_select_after_create """ SELECT * FROM ${currentDb}.${table1} ORDER
BY id """
@@ -169,8 +169,11 @@ suite("test_streaming_postgres_job_special_offset",
"p0,external,pg,external_doc
// After ALTER to LSN mark, only data AFTER that LSN (id 30,31) should
be synced
Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
def result = sql """SELECT count(*) FROM ${currentDb}.${table1}
WHERE id IN (30, 31)"""
- return result[0][0] >= 2
+ return result[0][0] == 2
})
+ // mark LSN sits between 21 and 30; before-mark rows must never be
read.
+ def beforeMark = sql """SELECT count(*) FROM ${currentDb}.${table1}
WHERE id IN (20, 21)"""
+ assert beforeMark[0][0] == 0 : "rows before ALTER LSN must be skipped,
found ${beforeMark[0][0]}"
qt_select_after_alter """ SELECT * FROM ${currentDb}.${table1} ORDER
BY id """
// Step 3: ALTER with named mode should fail for CDC
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
index 4be3d3ac8cb..986b893299d 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy
@@ -61,13 +61,16 @@ suite("test_streaming_insert_job_fetch_meta_error",
"nonConcurrent") {
);
"""
+ // GET_REMOTE_DATA_ERROR auto-resumes (PAUSED->PENDING->PAUSED);
assert in the poll snapshot, not a separate read.
try {
Awaitility.await().atMost(120, SECONDS)
.pollInterval(2, SECONDS).until(
{
- def jobRes = sql """ select Status from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ def jobRes = sql """ select Status, ErrorMsg from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
log.info("jobRes: " + jobRes)
jobRes.size() == 1 &&
'PAUSED'.equals(jobRes.get(0).get(0))
+ && jobRes.get(0).get(1) != null
+ &&
jobRes.get(0).get(1).toString().contains("simulated S3 auth error")
}
)
} catch (Exception ex) {
@@ -78,10 +81,6 @@ suite("test_streaming_insert_job_fetch_meta_error",
"nonConcurrent") {
throw ex
}
- def jobStatus = sql """select Status, ErrorMsg from
jobs("type"="insert") where Name='${jobName}'"""
- assert jobStatus.get(0).get(0) == "PAUSED"
- assert jobStatus.get(0).get(1).contains("simulated S3 auth error")
-
sql """
DROP JOB IF EXISTS where jobname = '${jobName}'
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]