timoninmaxim commented on code in PR #311: URL: https://github.com/apache/ignite-extensions/pull/311#discussion_r2176594559
########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcConsumer.java: ########## @@ -0,0 +1,265 @@ +package org.apache.ignite.cdc.postgresql; + +import java.sql.Connection; +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcConsumer; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.cdc.TypeMapping; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; +import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.metric.MetricRegistry; +import org.apache.ignite.resources.LoggerResource; + +/** + * This class represents a consumer component that replicates cache changes from Apache Ignite to PostgreSQL using + * Change Data Capture (CDC) mechanism. It applies events to PostgreSQL via batch-prepared SQL statements, ensuring + * efficient handling of large volumes of updates. + * + * <p>Additionally, it provides methods for initializing connections, managing transactions, and performing atomic batches + * of writes.</p> + */ +public class IgniteToPostgreSqlCdcConsumer implements CdcConsumer { + /** */ + public static final String EVTS_SENT_CNT = "EventsCount"; + + /** */ + public static final String EVTS_SENT_CNT_DESC = "Count of events applied to PostgreSQL"; + + /** */ + public static final String LAST_EVT_SENT_TIME = "LastEventTime"; + + /** */ + public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last applied event to PostgreSQL"; + + /** */ + private static final boolean DFLT_IS_ONLY_PRIMARY = false; Review Comment: Let's in this patch enforce `onlyPrimary = true`. ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcConsumer.java: ########## @@ -0,0 +1,265 @@ +package org.apache.ignite.cdc.postgresql; + +import java.sql.Connection; +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcConsumer; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.cdc.TypeMapping; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; +import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.metric.MetricRegistry; +import org.apache.ignite.resources.LoggerResource; + +/** + * This class represents a consumer component that replicates cache changes from Apache Ignite to PostgreSQL using + * Change Data Capture (CDC) mechanism. It applies events to PostgreSQL via batch-prepared SQL statements, ensuring + * efficient handling of large volumes of updates. + * + * <p>Additionally, it provides methods for initializing connections, managing transactions, and performing atomic batches + * of writes.</p> + */ +public class IgniteToPostgreSqlCdcConsumer implements CdcConsumer { + /** */ + public static final String EVTS_SENT_CNT = "EventsCount"; + + /** */ + public static final String EVTS_SENT_CNT_DESC = "Count of events applied to PostgreSQL"; + + /** */ + public static final String LAST_EVT_SENT_TIME = "LastEventTime"; + + /** */ + public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last applied event to PostgreSQL"; + + /** */ + private static final boolean DFLT_IS_ONLY_PRIMARY = false; + + /** */ + private static final long DFLT_BATCH_SIZE = 1024; + + /** */ + private static final boolean DFLT_CREATE_TABLES = false; + + /** */ + private static final boolean DFLT_AUTO_COMMIT = false; + + /** */ + private DataSource dataSrc; + + /** Collection of cache names which will be replicated to PostgreSQL. */ + private Collection<String> caches; + + /** */ + private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY; + + /** */ + private long maxBatchSize = DFLT_BATCH_SIZE; + + /** */ + private boolean createTables = DFLT_CREATE_TABLES; + + /** */ + private boolean autoCommit = DFLT_AUTO_COMMIT; + + /** Log. */ + @LoggerResource + private IgniteLogger log; + + /** Cache IDs. */ + private Set<Integer> cachesIds; + + /** Applier instance responsible for applying individual CDC events to PostgreSQL. */ + private IgniteToPostgreSqlCdcApplier applier; + + /** Count of events applied to PostgreSQL. */ + private AtomicLongMetric evtsCnt; + + /** Timestamp of last applied batch to PostgreSQL. */ + private AtomicLongMetric lastEvtTs; + + /** {@inheritDoc} */ + @Override public void start(MetricRegistry reg) { + A.notNull(dataSrc, "dataSource"); + A.notEmpty(caches, "caches"); + + cachesIds = caches.stream() + .map(CU::cacheId) + .collect(Collectors.toSet()); + + applier = new IgniteToPostgreSqlCdcApplier(maxBatchSize, log); + + MetricRegistryImpl mreg = (MetricRegistryImpl)reg; + + this.evtsCnt = mreg.longMetric(EVTS_SENT_CNT, EVTS_SENT_CNT_DESC); + this.lastEvtTs = mreg.longMetric(LAST_EVT_SENT_TIME, LAST_EVT_SENT_TIME_DESC); + + if (log.isInfoEnabled()) + log.info("CDC Ignite to PostgreSQL start-up [cacheIds=" + cachesIds + ']'); + } + + /** {@inheritDoc} */ + @Override public boolean onEvents(Iterator<CdcEvent> events) { + Iterator<CdcEvent> filtered = F.iterator( + events, + F.identity(), + true, + evt -> !onlyPrimary || evt.primary(), + evt -> cachesIds.contains(evt.cacheId())); + + return withTx((conn) -> { Review Comment: Why do you upgrade Ignite metrics during PG transacation? ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcConsumer.java: ########## @@ -0,0 +1,265 @@ +package org.apache.ignite.cdc.postgresql; Review Comment: Missed license in all files ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcConsumer.java: ########## @@ -0,0 +1,265 @@ +package org.apache.ignite.cdc.postgresql; + +import java.sql.Connection; +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcConsumer; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.cdc.TypeMapping; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; +import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.metric.MetricRegistry; +import org.apache.ignite.resources.LoggerResource; + +/** + * This class represents a consumer component that replicates cache changes from Apache Ignite to PostgreSQL using + * Change Data Capture (CDC) mechanism. It applies events to PostgreSQL via batch-prepared SQL statements, ensuring + * efficient handling of large volumes of updates. + * + * <p>Additionally, it provides methods for initializing connections, managing transactions, and performing atomic batches + * of writes.</p> + */ +public class IgniteToPostgreSqlCdcConsumer implements CdcConsumer { + /** */ + public static final String EVTS_SENT_CNT = "EventsCount"; + + /** */ + public static final String EVTS_SENT_CNT_DESC = "Count of events applied to PostgreSQL"; + + /** */ + public static final String LAST_EVT_SENT_TIME = "LastEventTime"; + + /** */ + public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last applied event to PostgreSQL"; + + /** */ + private static final boolean DFLT_IS_ONLY_PRIMARY = false; + + /** */ + private static final long DFLT_BATCH_SIZE = 1024; + + /** */ + private static final boolean DFLT_CREATE_TABLES = false; + + /** */ + private static final boolean DFLT_AUTO_COMMIT = false; + + /** */ + private DataSource dataSrc; + + /** Collection of cache names which will be replicated to PostgreSQL. */ + private Collection<String> caches; + + /** */ + private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY; + + /** */ + private long maxBatchSize = DFLT_BATCH_SIZE; + + /** */ + private boolean createTables = DFLT_CREATE_TABLES; + + /** */ + private boolean autoCommit = DFLT_AUTO_COMMIT; + + /** Log. */ + @LoggerResource + private IgniteLogger log; + + /** Cache IDs. */ + private Set<Integer> cachesIds; + + /** Applier instance responsible for applying individual CDC events to PostgreSQL. */ + private IgniteToPostgreSqlCdcApplier applier; + + /** Count of events applied to PostgreSQL. */ + private AtomicLongMetric evtsCnt; + + /** Timestamp of last applied batch to PostgreSQL. */ + private AtomicLongMetric lastEvtTs; + + /** {@inheritDoc} */ + @Override public void start(MetricRegistry reg) { + A.notNull(dataSrc, "dataSource"); + A.notEmpty(caches, "caches"); + + cachesIds = caches.stream() + .map(CU::cacheId) + .collect(Collectors.toSet()); + + applier = new IgniteToPostgreSqlCdcApplier(maxBatchSize, log); + + MetricRegistryImpl mreg = (MetricRegistryImpl)reg; + + this.evtsCnt = mreg.longMetric(EVTS_SENT_CNT, EVTS_SENT_CNT_DESC); + this.lastEvtTs = mreg.longMetric(LAST_EVT_SENT_TIME, LAST_EVT_SENT_TIME_DESC); + + if (log.isInfoEnabled()) + log.info("CDC Ignite to PostgreSQL start-up [cacheIds=" + cachesIds + ']'); + } + + /** {@inheritDoc} */ + @Override public boolean onEvents(Iterator<CdcEvent> events) { + Iterator<CdcEvent> filtered = F.iterator( + events, + F.identity(), + true, + evt -> !onlyPrimary || evt.primary(), + evt -> cachesIds.contains(evt.cacheId())); + + return withTx((conn) -> { + long evtsSent = applier.applyEvents(conn, filtered); + + if (evtsSent > 0) { + evtsCnt.add(evtsSent); + lastEvtTs.value(System.currentTimeMillis()); + + if (log.isInfoEnabled()) + log.info("Events applied [evtsApplied=" + evtsCnt.value() + ']'); + } + }); + } + + /** {@inheritDoc} */ + @Override public void onTypes(Iterator<BinaryType> types) { + types.forEachRemaining(e -> { + // Just skip. Handle of cache events not supported. + }); + } + + /** {@inheritDoc} */ + @Override public void onMappings(Iterator<TypeMapping> mappings) { + mappings.forEachRemaining(e -> { + // Just skip. Handle of cache events not supported. + }); + } + + /** {@inheritDoc} */ + @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) { + Iterator<CdcCacheEvent> filtered = F.iterator( + cacheEvents, + F.identity(), + true, + evt -> cachesIds.contains(evt.cacheId())); + + withTx((conn) -> applier.applyCacheEvents(conn, filtered, createTables)); + } + + /** {@inheritDoc} */ + @Override public void onCacheDestroy(Iterator<Integer> caches) { + caches.forEachRemaining(e -> { + // Just skip. Handle of cache events not supported. + }); + } + + /** {@inheritDoc} */ + @Override public void stop() { + + } + + /** + * Executes the given operation inside a database transaction, providing a Connection instance. Review Comment: Looks like this is a responsibility of an applier ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java: ########## @@ -0,0 +1,575 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheEntryVersion; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.internal.util.typedef.F; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID; + +/** */ +class IgniteToPostgreSqlCdcApplier { + /** */ + public static final String DFLT_SQL_TYPE = "BYTEA"; + + /** */ + public static final Map<String, String> JAVA_TO_SQL_TYPES; + + static { + Map<String, String> map = new HashMap<>(); + + map.put("java.lang.String", "TEXT"); + map.put("java.lang.Integer", "INT"); + map.put("int", "INT"); + map.put("java.lang.Long", "BIGINT"); + map.put("long", "BIGINT"); + map.put("java.lang.Boolean", "BOOLEAN"); + map.put("boolean", "BOOLEAN"); + map.put("java.lang.Double", "DOUBLE PRECISION"); + map.put("double", "DOUBLE PRECISION"); + map.put("java.lang.Float", "REAL"); + map.put("float", "REAL"); + map.put("java.math.BigDecimal", "NUMERIC"); + map.put("java.lang.Short", "SMALLINT"); + map.put("short", "SMALLINT"); + map.put("java.lang.Byte", "SMALLINT"); + map.put("byte", "SMALLINT"); + map.put("java.util.Date", "DATE"); + map.put("java.sql.Date", "DATE"); + map.put("java.sql.Time", "TIME"); + map.put("java.sql.Timestamp", "TIMESTAMP"); + map.put("java.time.LocalDate", "DATE"); + map.put("java.time.LocalDateTime", "TIMESTAMP"); + map.put("java.util.UUID", "UUID"); + map.put("[B", "BYTEA"); + + JAVA_TO_SQL_TYPES = Collections.unmodifiableMap(map); + } + + /** */ + private final long maxBatchSize; + + /** */ + private final IgniteLogger log; + + /** */ + private final Map<Integer, String> cacheIdToUpsertQry = new HashMap<>(); + + /** */ + private final Map<Integer, String> cacheIdToDeleteQry = new HashMap<>(); + + /** */ + private final Map<Integer, Set<String>> cacheIdToPrimaryKeys = new HashMap<>(); + + /** */ + private final Map<Integer, Set<String>> cacheIdToFields = new HashMap<>(); + + /** */ + private final Set<Object> curKeys = new HashSet<>(); + + /** */ + private PreparedStatement curPrepStmt; + + /** + * @param maxBatchSize the maximum number of CDC events to include in a single batch + * @param log the {@link IgniteLogger} instance used for logging CDC processing events + */ + public IgniteToPostgreSqlCdcApplier(long maxBatchSize, IgniteLogger log) { + this.maxBatchSize = maxBatchSize; + this.log = log; + } + + /** + * @param conn the active JDBC {@link Connection} to the PostgreSQL database + * @param evts an {@link Iterator} of {@link CdcEvent} objects to be applied + * @return the total number of events successfully batched and executed + */ + public long applyEvents(Connection conn, Iterator<CdcEvent> evts) { + long evtsApplied = 0; + + int currCacheId = UNDEFINED_CACHE_ID; + boolean prevOpIsDelete = false; + + CdcEvent evt; + + while (evts.hasNext()) { + evt = evts.next(); + + if (log.isDebugEnabled()) + log.debug("Event received [evt=" + evt + ']'); + + if (currCacheId != evt.cacheId() || prevOpIsDelete ^ (evt.value() == null)) { + evtsApplied += executeBatch(); + + currCacheId = evt.cacheId(); + prevOpIsDelete = evt.value() == null; + + prepareStatement(conn, evt); + } + + if (curKeys.size() >= maxBatchSize || curKeys.contains(evt.key())) + evtsApplied += executeBatch(); + + addEvent(evt); + } + + if (currCacheId != UNDEFINED_CACHE_ID) + evtsApplied += executeBatch(); + + return evtsApplied; + } + + /** */ + private int executeBatch() { + if (curPrepStmt == null) + return 0; + + try { + curKeys.clear(); + + if (log.isDebugEnabled()) + log.debug("Applying batch " + curPrepStmt.toString()); + + return curPrepStmt.executeBatch().length; + } + catch (SQLException e) { + log.error(e.getMessage(), e); + + throw new IgniteException(e); + } + } + + /** */ + private void prepareStatement(Connection conn, CdcEvent evt) { + String sqlQry; + + if (evt.value() == null) + sqlQry = cacheIdToDeleteQry.get(evt.cacheId()); + else + sqlQry = cacheIdToUpsertQry.get(evt.cacheId()); + + if (log.isDebugEnabled()) + log.debug("Statement updated [cacheId=" + evt.cacheId() + ", sqlQry=" + sqlQry + ']'); + + try { + curPrepStmt = conn.prepareStatement(sqlQry); Review Comment: AFAIU, it's more efficient to store prepared statement for every table. Then it can be reused during CDC lifetime multiple times. ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java: ########## @@ -0,0 +1,575 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheEntryVersion; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.internal.util.typedef.F; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID; + +/** */ +class IgniteToPostgreSqlCdcApplier { + /** */ + public static final String DFLT_SQL_TYPE = "BYTEA"; + + /** */ + public static final Map<String, String> JAVA_TO_SQL_TYPES; + + static { + Map<String, String> map = new HashMap<>(); + + map.put("java.lang.String", "TEXT"); Review Comment: Ignite supports varchar(N). How does Ignite store the N? ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java: ########## @@ -0,0 +1,575 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheEntryVersion; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.internal.util.typedef.F; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID; + +/** */ +class IgniteToPostgreSqlCdcApplier { + /** */ + public static final String DFLT_SQL_TYPE = "BYTEA"; + + /** */ + public static final Map<String, String> JAVA_TO_SQL_TYPES; + + static { + Map<String, String> map = new HashMap<>(); + + map.put("java.lang.String", "TEXT"); + map.put("java.lang.Integer", "INT"); + map.put("int", "INT"); + map.put("java.lang.Long", "BIGINT"); + map.put("long", "BIGINT"); + map.put("java.lang.Boolean", "BOOLEAN"); + map.put("boolean", "BOOLEAN"); + map.put("java.lang.Double", "DOUBLE PRECISION"); + map.put("double", "DOUBLE PRECISION"); + map.put("java.lang.Float", "REAL"); + map.put("float", "REAL"); + map.put("java.math.BigDecimal", "NUMERIC"); + map.put("java.lang.Short", "SMALLINT"); + map.put("short", "SMALLINT"); + map.put("java.lang.Byte", "SMALLINT"); + map.put("byte", "SMALLINT"); Review Comment: smallint requires 2bytes. ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java: ########## @@ -0,0 +1,575 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheEntryVersion; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.internal.util.typedef.F; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID; + +/** */ +class IgniteToPostgreSqlCdcApplier { + /** */ + public static final String DFLT_SQL_TYPE = "BYTEA"; + + /** */ + public static final Map<String, String> JAVA_TO_SQL_TYPES; + + static { + Map<String, String> map = new HashMap<>(); + + map.put("java.lang.String", "TEXT"); + map.put("java.lang.Integer", "INT"); + map.put("int", "INT"); + map.put("java.lang.Long", "BIGINT"); + map.put("long", "BIGINT"); + map.put("java.lang.Boolean", "BOOLEAN"); + map.put("boolean", "BOOLEAN"); + map.put("java.lang.Double", "DOUBLE PRECISION"); + map.put("double", "DOUBLE PRECISION"); + map.put("java.lang.Float", "REAL"); + map.put("float", "REAL"); + map.put("java.math.BigDecimal", "NUMERIC"); + map.put("java.lang.Short", "SMALLINT"); + map.put("short", "SMALLINT"); + map.put("java.lang.Byte", "SMALLINT"); + map.put("byte", "SMALLINT"); + map.put("java.util.Date", "DATE"); + map.put("java.sql.Date", "DATE"); + map.put("java.sql.Time", "TIME"); + map.put("java.sql.Timestamp", "TIMESTAMP"); + map.put("java.time.LocalDate", "DATE"); + map.put("java.time.LocalDateTime", "TIMESTAMP"); + map.put("java.util.UUID", "UUID"); + map.put("[B", "BYTEA"); + + JAVA_TO_SQL_TYPES = Collections.unmodifiableMap(map); + } + + /** */ + private final long maxBatchSize; + + /** */ + private final IgniteLogger log; + + /** */ + private final Map<Integer, String> cacheIdToUpsertQry = new HashMap<>(); + + /** */ + private final Map<Integer, String> cacheIdToDeleteQry = new HashMap<>(); + + /** */ + private final Map<Integer, Set<String>> cacheIdToPrimaryKeys = new HashMap<>(); + + /** */ + private final Map<Integer, Set<String>> cacheIdToFields = new HashMap<>(); + + /** */ + private final Set<Object> curKeys = new HashSet<>(); + + /** */ + private PreparedStatement curPrepStmt; + + /** + * @param maxBatchSize the maximum number of CDC events to include in a single batch + * @param log the {@link IgniteLogger} instance used for logging CDC processing events + */ + public IgniteToPostgreSqlCdcApplier(long maxBatchSize, IgniteLogger log) { + this.maxBatchSize = maxBatchSize; + this.log = log; + } + + /** + * @param conn the active JDBC {@link Connection} to the PostgreSQL database + * @param evts an {@link Iterator} of {@link CdcEvent} objects to be applied + * @return the total number of events successfully batched and executed + */ + public long applyEvents(Connection conn, Iterator<CdcEvent> evts) { + long evtsApplied = 0; + + int currCacheId = UNDEFINED_CACHE_ID; + boolean prevOpIsDelete = false; + + CdcEvent evt; + + while (evts.hasNext()) { + evt = evts.next(); + + if (log.isDebugEnabled()) + log.debug("Event received [evt=" + evt + ']'); + + if (currCacheId != evt.cacheId() || prevOpIsDelete ^ (evt.value() == null)) { + evtsApplied += executeBatch(); + + currCacheId = evt.cacheId(); + prevOpIsDelete = evt.value() == null; + + prepareStatement(conn, evt); + } + + if (curKeys.size() >= maxBatchSize || curKeys.contains(evt.key())) + evtsApplied += executeBatch(); + + addEvent(evt); + } + + if (currCacheId != UNDEFINED_CACHE_ID) + evtsApplied += executeBatch(); + + return evtsApplied; + } + + /** */ + private int executeBatch() { + if (curPrepStmt == null) + return 0; + + try { + curKeys.clear(); + + if (log.isDebugEnabled()) + log.debug("Applying batch " + curPrepStmt.toString()); + + return curPrepStmt.executeBatch().length; + } + catch (SQLException e) { + log.error(e.getMessage(), e); + + throw new IgniteException(e); + } + } + + /** */ + private void prepareStatement(Connection conn, CdcEvent evt) { + String sqlQry; + + if (evt.value() == null) + sqlQry = cacheIdToDeleteQry.get(evt.cacheId()); + else + sqlQry = cacheIdToUpsertQry.get(evt.cacheId()); + + if (log.isDebugEnabled()) + log.debug("Statement updated [cacheId=" + evt.cacheId() + ", sqlQry=" + sqlQry + ']'); + + try { + curPrepStmt = conn.prepareStatement(sqlQry); + } + catch (SQLException e) { + log.error(e.getMessage(), e); + + throw new IgniteException(e); + } + } + + /** */ + private void addEvent(CdcEvent evt) { + try { + if (evt.value() == null) + addEvent(evt, true); + else { + int idx = addEvent(evt, false); + + curPrepStmt.setBytes(idx, encodeVersion(evt.version())); + } + + curPrepStmt.addBatch(); + } + catch (Throwable e) { + log.error(e.getMessage(), e); + + throw new IgniteException(e); + } + } + + /** */ Review Comment: Add javadoc for return value. I suppose it returns number of filled arguments for a statement? ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java: ########## @@ -0,0 +1,575 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheEntryVersion; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.internal.util.typedef.F; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID; + +/** */ +class IgniteToPostgreSqlCdcApplier { + /** */ + public static final String DFLT_SQL_TYPE = "BYTEA"; + + /** */ + public static final Map<String, String> JAVA_TO_SQL_TYPES; Review Comment: Can we reuse the mapping from any java PG libs? ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java: ########## @@ -0,0 +1,575 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheEntryVersion; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.internal.util.typedef.F; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID; + +/** */ +class IgniteToPostgreSqlCdcApplier { + /** */ + public static final String DFLT_SQL_TYPE = "BYTEA"; + + /** */ + public static final Map<String, String> JAVA_TO_SQL_TYPES; + + static { + Map<String, String> map = new HashMap<>(); + + map.put("java.lang.String", "TEXT"); + map.put("java.lang.Integer", "INT"); + map.put("int", "INT"); + map.put("java.lang.Long", "BIGINT"); + map.put("long", "BIGINT"); + map.put("java.lang.Boolean", "BOOLEAN"); + map.put("boolean", "BOOLEAN"); + map.put("java.lang.Double", "DOUBLE PRECISION"); + map.put("double", "DOUBLE PRECISION"); + map.put("java.lang.Float", "REAL"); + map.put("float", "REAL"); + map.put("java.math.BigDecimal", "NUMERIC"); + map.put("java.lang.Short", "SMALLINT"); + map.put("short", "SMALLINT"); + map.put("java.lang.Byte", "SMALLINT"); + map.put("byte", "SMALLINT"); + map.put("java.util.Date", "DATE"); + map.put("java.sql.Date", "DATE"); + map.put("java.sql.Time", "TIME"); + map.put("java.sql.Timestamp", "TIMESTAMP"); + map.put("java.time.LocalDate", "DATE"); + map.put("java.time.LocalDateTime", "TIMESTAMP"); + map.put("java.util.UUID", "UUID"); + map.put("[B", "BYTEA"); + + JAVA_TO_SQL_TYPES = Collections.unmodifiableMap(map); + } + + /** */ + private final long maxBatchSize; + + /** */ + private final IgniteLogger log; + + /** */ + private final Map<Integer, String> cacheIdToUpsertQry = new HashMap<>(); + + /** */ + private final Map<Integer, String> cacheIdToDeleteQry = new HashMap<>(); + + /** */ + private final Map<Integer, Set<String>> cacheIdToPrimaryKeys = new HashMap<>(); + + /** */ + private final Map<Integer, Set<String>> cacheIdToFields = new HashMap<>(); + + /** */ + private final Set<Object> curKeys = new HashSet<>(); + + /** */ + private PreparedStatement curPrepStmt; + + /** + * @param maxBatchSize the maximum number of CDC events to include in a single batch + * @param log the {@link IgniteLogger} instance used for logging CDC processing events + */ + public IgniteToPostgreSqlCdcApplier(long maxBatchSize, IgniteLogger log) { + this.maxBatchSize = maxBatchSize; + this.log = log; + } + + /** + * @param conn the active JDBC {@link Connection} to the PostgreSQL database + * @param evts an {@link Iterator} of {@link CdcEvent} objects to be applied + * @return the total number of events successfully batched and executed + */ + public long applyEvents(Connection conn, Iterator<CdcEvent> evts) { + long evtsApplied = 0; + + int currCacheId = UNDEFINED_CACHE_ID; + boolean prevOpIsDelete = false; + + CdcEvent evt; + + while (evts.hasNext()) { + evt = evts.next(); + + if (log.isDebugEnabled()) + log.debug("Event received [evt=" + evt + ']'); + + if (currCacheId != evt.cacheId() || prevOpIsDelete ^ (evt.value() == null)) { + evtsApplied += executeBatch(); + + currCacheId = evt.cacheId(); + prevOpIsDelete = evt.value() == null; + + prepareStatement(conn, evt); + } + + if (curKeys.size() >= maxBatchSize || curKeys.contains(evt.key())) + evtsApplied += executeBatch(); + + addEvent(evt); + } + + if (currCacheId != UNDEFINED_CACHE_ID) + evtsApplied += executeBatch(); + + return evtsApplied; + } + + /** */ + private int executeBatch() { + if (curPrepStmt == null) + return 0; + + try { + curKeys.clear(); + + if (log.isDebugEnabled()) + log.debug("Applying batch " + curPrepStmt.toString()); + + return curPrepStmt.executeBatch().length; + } + catch (SQLException e) { + log.error(e.getMessage(), e); + + throw new IgniteException(e); + } + } + + /** */ + private void prepareStatement(Connection conn, CdcEvent evt) { + String sqlQry; + + if (evt.value() == null) + sqlQry = cacheIdToDeleteQry.get(evt.cacheId()); + else + sqlQry = cacheIdToUpsertQry.get(evt.cacheId()); + + if (log.isDebugEnabled()) + log.debug("Statement updated [cacheId=" + evt.cacheId() + ", sqlQry=" + sqlQry + ']'); + + try { + curPrepStmt = conn.prepareStatement(sqlQry); + } + catch (SQLException e) { + log.error(e.getMessage(), e); + + throw new IgniteException(e); + } + } + + /** */ + private void addEvent(CdcEvent evt) { + try { + if (evt.value() == null) + addEvent(evt, true); + else { + int idx = addEvent(evt, false); + + curPrepStmt.setBytes(idx, encodeVersion(evt.version())); + } + + curPrepStmt.addBatch(); + } + catch (Throwable e) { + log.error(e.getMessage(), e); + + throw new IgniteException(e); + } + } + + /** */ + private int addEvent(CdcEvent evt, boolean isDelete) throws SQLException { + Iterator<String> itFields = isDelete ? + cacheIdToPrimaryKeys.get(evt.cacheId()).iterator() : + cacheIdToFields.get(evt.cacheId()).iterator(); + + String field; + + BinaryObject keyObj = (evt.key() instanceof BinaryObject) ? (BinaryObject)evt.key() : null; + BinaryObject valObj = (evt.value() instanceof BinaryObject) ? (BinaryObject)evt.value() : null; + + int idx = 1; + Object obj; + + while (itFields.hasNext()) { + field = itFields.next(); + + if (cacheIdToPrimaryKeys.get(evt.cacheId()).contains(field)) + if (keyObj != null) + obj = keyObj.field(field); + else + obj = evt.key(); + else + if (valObj != null) + obj = valObj.field(field); + else + obj = evt.value(); + + addObject(idx, obj); + + idx++; + } + + return idx; //curPrepStmt.setString(idx, evt.value().toString()); + } + + /** + * Sets a value in the PreparedStatement at the given index using the appropriate setter + * based on the runtime type of the object. + */ + private void addObject(int idx, Object obj) throws SQLException { + if (obj == null) { + curPrepStmt.setObject(idx, null); + + return; + } + + if (obj instanceof String) + curPrepStmt.setString(idx, (String)obj); + else if (obj instanceof Integer) + curPrepStmt.setInt(idx, (Integer)obj); + else if (obj instanceof Long) + curPrepStmt.setLong(idx, (Long)obj); + else if (obj instanceof Short) + curPrepStmt.setShort(idx, (Short)obj); + else if (obj instanceof Byte) + curPrepStmt.setByte(idx, (Byte)obj); + else if (obj instanceof Boolean) + curPrepStmt.setBoolean(idx, (Boolean)obj); + else if (obj instanceof Float) + curPrepStmt.setFloat(idx, (Float)obj); + else if (obj instanceof Double) + curPrepStmt.setDouble(idx, (Double)obj); + else if (obj instanceof BigDecimal) + curPrepStmt.setBigDecimal(idx, (BigDecimal)obj); + else if (obj instanceof UUID) + curPrepStmt.setObject(idx, obj, java.sql.Types.OTHER); // PostgreSQL expects UUID as OTHER + else if (obj instanceof byte[]) + curPrepStmt.setBytes(idx, (byte[])obj); + else if (obj instanceof java.sql.Date) + curPrepStmt.setDate(idx, (java.sql.Date)obj); + else if (obj instanceof java.sql.Time) + curPrepStmt.setTime(idx, (java.sql.Time)obj); + else if (obj instanceof java.sql.Timestamp) + curPrepStmt.setTimestamp(idx, (java.sql.Timestamp)obj); + else if (obj instanceof java.util.Date) + curPrepStmt.setTimestamp(idx, new java.sql.Timestamp(((java.util.Date)obj).getTime())); + else if (obj instanceof java.time.LocalDate) + curPrepStmt.setDate(idx, java.sql.Date.valueOf((java.time.LocalDate)obj)); + else if (obj instanceof java.time.LocalTime) + curPrepStmt.setTime(idx, java.sql.Time.valueOf((java.time.LocalTime)obj)); + else if (obj instanceof java.time.LocalDateTime) + curPrepStmt.setTimestamp(idx, java.sql.Timestamp.valueOf((java.time.LocalDateTime)obj)); + else if (obj instanceof java.time.OffsetDateTime) + curPrepStmt.setTimestamp(idx, java.sql.Timestamp.from(((java.time.OffsetDateTime)obj).toInstant())); + else if (obj instanceof java.time.ZonedDateTime) + curPrepStmt.setTimestamp(idx, java.sql.Timestamp.from(((java.time.ZonedDateTime)obj).toInstant())); + else + curPrepStmt.setObject(idx, obj); + } + + /** + * @param conn the JDBC {@link Connection} to the PostgreSQL database + * @param evts an {@link Iterator} of {@link CdcCacheEvent} objects to apply + * @param createTables tables creation flag. If true - attempt to create tables will be made. + */ + public void applyCacheEvents(Connection conn, Iterator<CdcCacheEvent> evts, boolean createTables) { + CdcCacheEvent evt; + QueryEntity entity; + + while (evts.hasNext()) { + evt = evts.next(); + + if (evt.queryEntities().size() != 1) + throw new IgniteException("There should be exactly 1 QueryEntity for cacheId: " + evt.cacheId()); + + entity = evt.queryEntities().iterator().next(); + + if (createTables) + createTableIfNotExists(conn, entity); + + cacheIdToUpsertQry.put(evt.cacheId(), getUpsertSqlQry(entity)); + + cacheIdToDeleteQry.put(evt.cacheId(), getDeleteSqlQry(entity)); + + cacheIdToPrimaryKeys.put(evt.cacheId(), getPrimaryKeys(entity)); + + cacheIdToFields.put(evt.cacheId(), entity.getFields().keySet()); + + if (log.isInfoEnabled()) + log.info("Cache table created [tableName=" + entity.getTableName() + + ", columns=" + entity.getFields().keySet() + ']'); + } + } + + /** + * @param conn the JDBC {@link Connection} used to execute the DDL statement + * @param entity the {@link QueryEntity} describing the table schema to create + */ + private void createTableIfNotExists(Connection conn, QueryEntity entity) { + String createSqlStmt = getCreateTableSqlStatement(entity); + + try (Statement stmt = conn.createStatement()) { + stmt.execute(createSqlStmt); + } + catch (SQLException e) { + log.error(e.getMessage(), e); + + throw new IgniteException(e); + } + } + + /** + * Generates the SQL statement for creating a table. + * + * @param entity QueryEntity instance describing the cache structure. + * @return SQL statement for creating a table. + */ + public String getCreateTableSqlStatement(QueryEntity entity) { + StringBuilder ddl = new StringBuilder("CREATE TABLE IF NOT EXISTS ").append(entity.getTableName()).append(" ("); + + addFieldsAndTypes(entity, ddl); + + ddl.append(", version BYTEA NOT NULL"); + + ddl.append(", PRIMARY KEY ("); + + addPrimaryKeys(entity, ddl); + + ddl.append(')').append(')'); + + return ddl.toString(); + } + + /** + * Constructs DDL-compatible SQL fragment listing fields along with their mapped SQL types. + * + * @param entity QueryEntity instance describing the cache structure. + * @param sql Target StringBuilder where the result will be appended. + */ + private void addFieldsAndTypes(QueryEntity entity, StringBuilder sql) { + Iterator<Map.Entry<String, String>> iter = entity.getFields().entrySet().iterator(); + Map.Entry<String, String> field; + + while (iter.hasNext()) { + field = iter.next(); + + sql.append(field.getKey()).append(" ").append(JAVA_TO_SQL_TYPES.getOrDefault(field.getValue(), DFLT_SQL_TYPE)); + + if (iter.hasNext()) + sql.append(", "); + } + } + + /** + * Generates a parameterized SQL UPSERT (INSERT ... ON CONFLICT DO UPDATE) query + * for the given {@link QueryEntity}, including a version-based conflict resolution condition. + * <pre>{@code + * INSERT INTO my_table (id, name, version) VALUES (?, ?, ?) + * ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name + * WHERE version < EXCLUDED.version + * }</pre> + * + * Notes: + * <ul> + * <li>The {@code version} field is added to support version-based upsert logic.</li> + * <li>Primary key fields are excluded from the {@code DO UPDATE SET} clause.</li> + * <li>All fields are assigned {@code ?} placeholders for use with {@link java.sql.PreparedStatement}.</li> + * </ul> + * + * @param entity the {@link QueryEntity} describing the table, fields, and primary keys + * @return a SQL UPSERT query string with parameter placeholders and version conflict resolution + */ + public String getUpsertSqlQry(QueryEntity entity) { + StringBuilder sql = new StringBuilder("INSERT INTO ").append(entity.getTableName()).append(" ("); + + addFields(entity, sql); + + sql.append(", version) VALUES ("); + + for (int i = 0; i < entity.getFields().size() + 1; ++i) { // version field included + sql.append('?'); + + if (i < entity.getFields().size()) + sql.append(", "); + } + + sql.append(") ON CONFLICT ("); + + addPrimaryKeys(entity, sql); + + sql.append(") DO UPDATE SET "); + + addUpdateFields(entity, sql); + + sql.append(" WHERE ").append(entity.getTableName()).append(".version < EXCLUDED.version"); + + return sql.toString(); + } + + /** + * Builds a comma-separated list of field names extracted from the QueryEntity. + * + * @param entity QueryEntity instance describing the cache structure. + * @param sql Target StringBuilder where the result will be appended. + */ + private void addFields(QueryEntity entity, StringBuilder sql) { + Iterator<Map.Entry<String, String>> iter = entity.getFields().entrySet().iterator(); + Map.Entry<String, String> field; + + while (iter.hasNext()) { + field = iter.next(); + + sql.append(field.getKey()); + + if (iter.hasNext()) + sql.append(", "); + } + } + + /** + * Builds a SQL update clause excluding primary key fields, including version-specific fields. + * + * @param entity QueryEntity instance describing the cache structure. + * @param sql Target StringBuilder where the resulting SQL fragment will be appended. + */ + private void addUpdateFields(QueryEntity entity, StringBuilder sql) { + Set<String> primaryFields = getPrimaryKeys(entity); + + Iterator<String> itAllFields = F.concat(false, "version", entity.getFields().keySet()).iterator(); + + String field; + + while (itAllFields.hasNext()) { + field = itAllFields.next(); + + if (primaryFields.contains(field)) + continue; + + sql.append(field).append(" = EXCLUDED.").append(field); + + if (itAllFields.hasNext()) + sql.append(", "); + } + } + + /** + * Generates a parameterized SQL DELETE query for the given {@link QueryEntity}. + * Example: + * <pre>{@code + * // For a key: id + * DELETE FROM my_table WHERE (id = ?) + * }</pre> + * + * If the table has a composite primary key, all keys will be included with AND conditions: + * <pre>{@code + * // For a composite key: id1, id2 + * DELETE FROM my_table WHERE (id1 = ? AND id2 = ?) + * }</pre> + * + * @param entity the {@link QueryEntity} describing the table and its primary keys + * @return a SQL DELETE query string with parameter placeholders for primary key values + */ + public String getDeleteSqlQry(QueryEntity entity) { + StringBuilder deleteQry = new StringBuilder("DELETE FROM ").append(entity.getTableName()).append(" WHERE ("); + + Iterator<String> itKeys = getPrimaryKeys(entity).iterator(); + String key; + + while (itKeys.hasNext()) { + key = itKeys.next(); + + deleteQry.append(key).append(" = ?"); + + if (itKeys.hasNext()) + deleteQry.append(" AND "); + } + + deleteQry.append(')'); + + return deleteQry.toString(); + } + + /** + * Generates a SQL fragment listing primary key fields for the given QueryEntity. + * + * @param entity QueryEntity instance describing the cache structure. + * @param sql Target StringBuilder where the resulting SQL fragment will be appended. + */ + private void addPrimaryKeys(QueryEntity entity, StringBuilder sql) { + Iterator<String> iterKeys = getPrimaryKeys(entity).iterator(); + + while (iterKeys.hasNext()) { + sql.append(iterKeys.next()); + + if (iterKeys.hasNext()) + sql.append(", "); + } + } + + /** + * Retrieves the primary key field names from the provided {@link QueryEntity}. + * If no primary keys are defined, it returns a set containing the first field from the table. Review Comment: > it returns a set containing the first field from the table Why do you think this is correct way to define primary key? AFAIU, we don't have such requirement to specify key field first. ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java: ########## @@ -0,0 +1,575 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheEntryVersion; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.internal.util.typedef.F; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID; + +/** */ +class IgniteToPostgreSqlCdcApplier { + /** */ + public static final String DFLT_SQL_TYPE = "BYTEA"; + + /** */ + public static final Map<String, String> JAVA_TO_SQL_TYPES; + + static { + Map<String, String> map = new HashMap<>(); + + map.put("java.lang.String", "TEXT"); + map.put("java.lang.Integer", "INT"); + map.put("int", "INT"); + map.put("java.lang.Long", "BIGINT"); + map.put("long", "BIGINT"); + map.put("java.lang.Boolean", "BOOLEAN"); + map.put("boolean", "BOOLEAN"); + map.put("java.lang.Double", "DOUBLE PRECISION"); + map.put("double", "DOUBLE PRECISION"); + map.put("java.lang.Float", "REAL"); + map.put("float", "REAL"); + map.put("java.math.BigDecimal", "NUMERIC"); + map.put("java.lang.Short", "SMALLINT"); + map.put("short", "SMALLINT"); + map.put("java.lang.Byte", "SMALLINT"); + map.put("byte", "SMALLINT"); + map.put("java.util.Date", "DATE"); + map.put("java.sql.Date", "DATE"); + map.put("java.sql.Time", "TIME"); + map.put("java.sql.Timestamp", "TIMESTAMP"); Review Comment: timezone? ########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java: ########## @@ -0,0 +1,575 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheEntryVersion; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.internal.util.typedef.F; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID; + +/** */ +class IgniteToPostgreSqlCdcApplier { + /** */ + public static final String DFLT_SQL_TYPE = "BYTEA"; + + /** */ + public static final Map<String, String> JAVA_TO_SQL_TYPES; + + static { + Map<String, String> map = new HashMap<>(); + + map.put("java.lang.String", "TEXT"); + map.put("java.lang.Integer", "INT"); + map.put("int", "INT"); + map.put("java.lang.Long", "BIGINT"); + map.put("long", "BIGINT"); + map.put("java.lang.Boolean", "BOOLEAN"); + map.put("boolean", "BOOLEAN"); + map.put("java.lang.Double", "DOUBLE PRECISION"); + map.put("double", "DOUBLE PRECISION"); + map.put("java.lang.Float", "REAL"); + map.put("float", "REAL"); + map.put("java.math.BigDecimal", "NUMERIC"); + map.put("java.lang.Short", "SMALLINT"); + map.put("short", "SMALLINT"); + map.put("java.lang.Byte", "SMALLINT"); + map.put("byte", "SMALLINT"); + map.put("java.util.Date", "DATE"); + map.put("java.sql.Date", "DATE"); + map.put("java.sql.Time", "TIME"); + map.put("java.sql.Timestamp", "TIMESTAMP"); + map.put("java.time.LocalDate", "DATE"); + map.put("java.time.LocalDateTime", "TIMESTAMP"); + map.put("java.util.UUID", "UUID"); + map.put("[B", "BYTEA"); + + JAVA_TO_SQL_TYPES = Collections.unmodifiableMap(map); + } + + /** */ + private final long maxBatchSize; + + /** */ + private final IgniteLogger log; + + /** */ + private final Map<Integer, String> cacheIdToUpsertQry = new HashMap<>(); + + /** */ + private final Map<Integer, String> cacheIdToDeleteQry = new HashMap<>(); + + /** */ + private final Map<Integer, Set<String>> cacheIdToPrimaryKeys = new HashMap<>(); + + /** */ + private final Map<Integer, Set<String>> cacheIdToFields = new HashMap<>(); + + /** */ + private final Set<Object> curKeys = new HashSet<>(); + + /** */ + private PreparedStatement curPrepStmt; + + /** + * @param maxBatchSize the maximum number of CDC events to include in a single batch + * @param log the {@link IgniteLogger} instance used for logging CDC processing events + */ + public IgniteToPostgreSqlCdcApplier(long maxBatchSize, IgniteLogger log) { + this.maxBatchSize = maxBatchSize; + this.log = log; + } + + /** + * @param conn the active JDBC {@link Connection} to the PostgreSQL database + * @param evts an {@link Iterator} of {@link CdcEvent} objects to be applied + * @return the total number of events successfully batched and executed + */ + public long applyEvents(Connection conn, Iterator<CdcEvent> evts) { + long evtsApplied = 0; + + int currCacheId = UNDEFINED_CACHE_ID; + boolean prevOpIsDelete = false; + + CdcEvent evt; + + while (evts.hasNext()) { + evt = evts.next(); + + if (log.isDebugEnabled()) + log.debug("Event received [evt=" + evt + ']'); + + if (currCacheId != evt.cacheId() || prevOpIsDelete ^ (evt.value() == null)) { + evtsApplied += executeBatch(); + + currCacheId = evt.cacheId(); + prevOpIsDelete = evt.value() == null; + + prepareStatement(conn, evt); + } + + if (curKeys.size() >= maxBatchSize || curKeys.contains(evt.key())) + evtsApplied += executeBatch(); + + addEvent(evt); + } + + if (currCacheId != UNDEFINED_CACHE_ID) + evtsApplied += executeBatch(); + + return evtsApplied; + } + + /** */ + private int executeBatch() { + if (curPrepStmt == null) + return 0; + + try { + curKeys.clear(); + + if (log.isDebugEnabled()) + log.debug("Applying batch " + curPrepStmt.toString()); + + return curPrepStmt.executeBatch().length; + } + catch (SQLException e) { + log.error(e.getMessage(), e); + + throw new IgniteException(e); + } + } + + /** */ + private void prepareStatement(Connection conn, CdcEvent evt) { + String sqlQry; + + if (evt.value() == null) + sqlQry = cacheIdToDeleteQry.get(evt.cacheId()); + else + sqlQry = cacheIdToUpsertQry.get(evt.cacheId()); + + if (log.isDebugEnabled()) + log.debug("Statement updated [cacheId=" + evt.cacheId() + ", sqlQry=" + sqlQry + ']'); + + try { + curPrepStmt = conn.prepareStatement(sqlQry); + } + catch (SQLException e) { + log.error(e.getMessage(), e); + + throw new IgniteException(e); + } + } + + /** */ + private void addEvent(CdcEvent evt) { + try { + if (evt.value() == null) + addEvent(evt, true); + else { + int idx = addEvent(evt, false); + + curPrepStmt.setBytes(idx, encodeVersion(evt.version())); + } + + curPrepStmt.addBatch(); + } + catch (Throwable e) { + log.error(e.getMessage(), e); + + throw new IgniteException(e); + } + } + + /** */ + private int addEvent(CdcEvent evt, boolean isDelete) throws SQLException { + Iterator<String> itFields = isDelete ? + cacheIdToPrimaryKeys.get(evt.cacheId()).iterator() : + cacheIdToFields.get(evt.cacheId()).iterator(); + + String field; + + BinaryObject keyObj = (evt.key() instanceof BinaryObject) ? (BinaryObject)evt.key() : null; + BinaryObject valObj = (evt.value() instanceof BinaryObject) ? (BinaryObject)evt.value() : null; + + int idx = 1; + Object obj; + + while (itFields.hasNext()) { + field = itFields.next(); + + if (cacheIdToPrimaryKeys.get(evt.cacheId()).contains(field)) + if (keyObj != null) + obj = keyObj.field(field); + else + obj = evt.key(); + else + if (valObj != null) + obj = valObj.field(field); + else + obj = evt.value(); + + addObject(idx, obj); + + idx++; + } + + return idx; //curPrepStmt.setString(idx, evt.value().toString()); Review Comment: Please, make comment meaningful or remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org