maksaska commented on code in PR #311: URL: https://github.com/apache/ignite-extensions/pull/311#discussion_r2183700005
########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java: ########## @@ -0,0 +1,575 @@ +package org.apache.ignite.cdc.postgresql; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheEntryVersion; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.internal.util.typedef.F; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID; + +/** */ +class IgniteToPostgreSqlCdcApplier { + /** */ + public static final String DFLT_SQL_TYPE = "BYTEA"; + + /** */ + public static final Map<String, String> JAVA_TO_SQL_TYPES; + + static { + Map<String, String> map = new HashMap<>(); + + map.put("java.lang.String", "TEXT"); + map.put("java.lang.Integer", "INT"); + map.put("int", "INT"); + map.put("java.lang.Long", "BIGINT"); + map.put("long", "BIGINT"); + map.put("java.lang.Boolean", "BOOLEAN"); + map.put("boolean", "BOOLEAN"); + map.put("java.lang.Double", "DOUBLE PRECISION"); + map.put("double", "DOUBLE PRECISION"); + map.put("java.lang.Float", "REAL"); + map.put("float", "REAL"); + map.put("java.math.BigDecimal", "NUMERIC"); + map.put("java.lang.Short", "SMALLINT"); + map.put("short", "SMALLINT"); + map.put("java.lang.Byte", "SMALLINT"); + map.put("byte", "SMALLINT"); + map.put("java.util.Date", "DATE"); + map.put("java.sql.Date", "DATE"); + map.put("java.sql.Time", "TIME"); + map.put("java.sql.Timestamp", "TIMESTAMP"); + map.put("java.time.LocalDate", "DATE"); + map.put("java.time.LocalDateTime", "TIMESTAMP"); + map.put("java.util.UUID", "UUID"); + map.put("[B", "BYTEA"); + + JAVA_TO_SQL_TYPES = Collections.unmodifiableMap(map); + } + + /** */ + private final long maxBatchSize; + + /** */ + private final IgniteLogger log; + + /** */ + private final Map<Integer, String> cacheIdToUpsertQry = new HashMap<>(); + + /** */ + private final Map<Integer, String> cacheIdToDeleteQry = new HashMap<>(); + + /** */ + private final Map<Integer, Set<String>> cacheIdToPrimaryKeys = new HashMap<>(); + + /** */ + private final Map<Integer, Set<String>> cacheIdToFields = new HashMap<>(); + + /** */ + private final Set<Object> curKeys = new HashSet<>(); + + /** */ + private PreparedStatement curPrepStmt; + + /** + * @param maxBatchSize the maximum number of CDC events to include in a single batch + * @param log the {@link IgniteLogger} instance used for logging CDC processing events + */ + public IgniteToPostgreSqlCdcApplier(long maxBatchSize, IgniteLogger log) { + this.maxBatchSize = maxBatchSize; + this.log = log; + } + + /** + * @param conn the active JDBC {@link Connection} to the PostgreSQL database + * @param evts an {@link Iterator} of {@link CdcEvent} objects to be applied + * @return the total number of events successfully batched and executed + */ + public long applyEvents(Connection conn, Iterator<CdcEvent> evts) { + long evtsApplied = 0; + + int currCacheId = UNDEFINED_CACHE_ID; + boolean prevOpIsDelete = false; + + CdcEvent evt; + + while (evts.hasNext()) { + evt = evts.next(); + + if (log.isDebugEnabled()) + log.debug("Event received [evt=" + evt + ']'); + + if (currCacheId != evt.cacheId() || prevOpIsDelete ^ (evt.value() == null)) { + evtsApplied += executeBatch(); + + currCacheId = evt.cacheId(); + prevOpIsDelete = evt.value() == null; + + prepareStatement(conn, evt); + } + + if (curKeys.size() >= maxBatchSize || curKeys.contains(evt.key())) + evtsApplied += executeBatch(); + + addEvent(evt); + } + + if (currCacheId != UNDEFINED_CACHE_ID) + evtsApplied += executeBatch(); + + return evtsApplied; + } + + /** */ + private int executeBatch() { + if (curPrepStmt == null) + return 0; + + try { + curKeys.clear(); + + if (log.isDebugEnabled()) + log.debug("Applying batch " + curPrepStmt.toString()); + + return curPrepStmt.executeBatch().length; + } + catch (SQLException e) { + log.error(e.getMessage(), e); + + throw new IgniteException(e); + } + } + + /** */ + private void prepareStatement(Connection conn, CdcEvent evt) { + String sqlQry; + + if (evt.value() == null) + sqlQry = cacheIdToDeleteQry.get(evt.cacheId()); + else + sqlQry = cacheIdToUpsertQry.get(evt.cacheId()); + + if (log.isDebugEnabled()) + log.debug("Statement updated [cacheId=" + evt.cacheId() + ", sqlQry=" + sqlQry + ']'); + + try { + curPrepStmt = conn.prepareStatement(sqlQry); + } + catch (SQLException e) { + log.error(e.getMessage(), e); + + throw new IgniteException(e); + } + } + + /** */ + private void addEvent(CdcEvent evt) { + try { + if (evt.value() == null) + addEvent(evt, true); + else { + int idx = addEvent(evt, false); + + curPrepStmt.setBytes(idx, encodeVersion(evt.version())); + } + + curPrepStmt.addBatch(); + } + catch (Throwable e) { + log.error(e.getMessage(), e); + + throw new IgniteException(e); + } + } + + /** */ Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org