maksaska commented on code in PR #311:
URL: https://github.com/apache/ignite-extensions/pull/311#discussion_r2183699013


##########
modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java:
##########
@@ -0,0 +1,575 @@
+package org.apache.ignite.cdc.postgresql;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheEntryVersion;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cdc.CdcCacheEvent;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.internal.util.typedef.F;
+
+import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID;
+
+/** */
+class IgniteToPostgreSqlCdcApplier {
+    /** */
+    public static final String DFLT_SQL_TYPE = "BYTEA";
+
+    /** */
+    public static final Map<String, String> JAVA_TO_SQL_TYPES;
+
+    static {
+        Map<String, String> map = new HashMap<>();
+
+        map.put("java.lang.String", "TEXT");
+        map.put("java.lang.Integer", "INT");
+        map.put("int", "INT");
+        map.put("java.lang.Long", "BIGINT");
+        map.put("long", "BIGINT");
+        map.put("java.lang.Boolean", "BOOLEAN");
+        map.put("boolean", "BOOLEAN");
+        map.put("java.lang.Double", "DOUBLE PRECISION");
+        map.put("double", "DOUBLE PRECISION");
+        map.put("java.lang.Float", "REAL");
+        map.put("float", "REAL");
+        map.put("java.math.BigDecimal", "NUMERIC");
+        map.put("java.lang.Short", "SMALLINT");
+        map.put("short", "SMALLINT");
+        map.put("java.lang.Byte", "SMALLINT");
+        map.put("byte", "SMALLINT");
+        map.put("java.util.Date", "DATE");
+        map.put("java.sql.Date", "DATE");
+        map.put("java.sql.Time", "TIME");
+        map.put("java.sql.Timestamp", "TIMESTAMP");
+        map.put("java.time.LocalDate", "DATE");
+        map.put("java.time.LocalDateTime", "TIMESTAMP");
+        map.put("java.util.UUID", "UUID");
+        map.put("[B", "BYTEA");
+
+        JAVA_TO_SQL_TYPES = Collections.unmodifiableMap(map);
+    }
+
+    /** */
+    private final long maxBatchSize;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final Map<Integer, String> cacheIdToUpsertQry = new HashMap<>();
+
+    /** */
+    private final Map<Integer, String> cacheIdToDeleteQry = new HashMap<>();
+
+    /** */
+    private final Map<Integer, Set<String>> cacheIdToPrimaryKeys = new 
HashMap<>();
+
+    /** */
+    private final Map<Integer, Set<String>> cacheIdToFields = new HashMap<>();
+
+    /** */
+    private final Set<Object> curKeys = new HashSet<>();
+
+    /** */
+    private PreparedStatement curPrepStmt;
+
+    /**
+     * @param maxBatchSize the maximum number of CDC events to include in a 
single batch
+     * @param log the {@link IgniteLogger} instance used for logging CDC 
processing events
+     */
+    public IgniteToPostgreSqlCdcApplier(long maxBatchSize, IgniteLogger log) {
+        this.maxBatchSize = maxBatchSize;
+        this.log = log;
+    }
+
+    /**
+     * @param conn the active JDBC {@link Connection} to the PostgreSQL 
database
+     * @param evts an {@link Iterator} of {@link CdcEvent} objects to be 
applied
+     * @return the total number of events successfully batched and executed
+     */
+    public long applyEvents(Connection conn, Iterator<CdcEvent> evts) {
+        long evtsApplied = 0;
+
+        int currCacheId = UNDEFINED_CACHE_ID;
+        boolean prevOpIsDelete = false;
+
+        CdcEvent evt;
+
+        while (evts.hasNext()) {
+            evt = evts.next();
+
+            if (log.isDebugEnabled())
+                log.debug("Event received [evt=" + evt + ']');
+
+            if (currCacheId != evt.cacheId() || prevOpIsDelete ^ (evt.value() 
== null)) {
+                evtsApplied += executeBatch();
+
+                currCacheId = evt.cacheId();
+                prevOpIsDelete = evt.value() == null;
+
+                prepareStatement(conn, evt);
+            }
+
+            if (curKeys.size() >= maxBatchSize || curKeys.contains(evt.key()))
+                evtsApplied += executeBatch();
+
+            addEvent(evt);
+        }
+
+        if (currCacheId != UNDEFINED_CACHE_ID)
+            evtsApplied += executeBatch();
+
+        return evtsApplied;
+    }
+
+    /** */
+    private int executeBatch() {
+        if (curPrepStmt == null)
+            return 0;
+
+        try {
+            curKeys.clear();
+
+            if (log.isDebugEnabled())
+                log.debug("Applying batch " + curPrepStmt.toString());
+
+            return curPrepStmt.executeBatch().length;
+        }
+        catch (SQLException e) {
+            log.error(e.getMessage(), e);
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    private void prepareStatement(Connection conn, CdcEvent evt) {
+        String sqlQry;
+
+        if (evt.value() == null)
+            sqlQry = cacheIdToDeleteQry.get(evt.cacheId());
+        else
+            sqlQry = cacheIdToUpsertQry.get(evt.cacheId());
+
+        if (log.isDebugEnabled())
+            log.debug("Statement updated [cacheId=" + evt.cacheId() + ", 
sqlQry=" + sqlQry + ']');
+
+        try {
+            curPrepStmt = conn.prepareStatement(sqlQry);
+        }
+        catch (SQLException e) {
+            log.error(e.getMessage(), e);
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    private void addEvent(CdcEvent evt) {
+        try {
+            if (evt.value() == null)
+                addEvent(evt, true);
+            else {
+                int idx = addEvent(evt, false);
+
+                curPrepStmt.setBytes(idx, encodeVersion(evt.version()));
+            }
+
+            curPrepStmt.addBatch();
+        }
+        catch (Throwable e) {
+            log.error(e.getMessage(), e);
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    private int addEvent(CdcEvent evt, boolean isDelete) throws SQLException {
+        Iterator<String> itFields = isDelete ?
+            cacheIdToPrimaryKeys.get(evt.cacheId()).iterator() :
+            cacheIdToFields.get(evt.cacheId()).iterator();
+
+        String field;
+
+        BinaryObject keyObj = (evt.key() instanceof BinaryObject) ? 
(BinaryObject)evt.key() : null;
+        BinaryObject valObj = (evt.value() instanceof BinaryObject) ? 
(BinaryObject)evt.value() : null;
+
+        int idx = 1;
+        Object obj;
+
+        while (itFields.hasNext()) {
+            field = itFields.next();
+
+            if (cacheIdToPrimaryKeys.get(evt.cacheId()).contains(field))
+                if (keyObj != null)
+                    obj = keyObj.field(field);
+                else
+                    obj = evt.key();
+            else
+                if (valObj != null)
+                    obj = valObj.field(field);
+                else
+                    obj = evt.value();
+
+            addObject(idx, obj);
+
+            idx++;
+        }
+
+        return idx; //curPrepStmt.setString(idx, evt.value().toString());
+    }
+
+    /**
+     * Sets a value in the PreparedStatement at the given index using the 
appropriate setter
+     * based on the runtime type of the object.
+     */
+    private void addObject(int idx, Object obj) throws SQLException {
+        if (obj == null) {
+            curPrepStmt.setObject(idx, null);
+
+            return;
+        }
+
+        if (obj instanceof String)
+            curPrepStmt.setString(idx, (String)obj);
+        else if (obj instanceof Integer)
+            curPrepStmt.setInt(idx, (Integer)obj);
+        else if (obj instanceof Long)
+            curPrepStmt.setLong(idx, (Long)obj);
+        else if (obj instanceof Short)
+            curPrepStmt.setShort(idx, (Short)obj);
+        else if (obj instanceof Byte)
+            curPrepStmt.setByte(idx, (Byte)obj);
+        else if (obj instanceof Boolean)
+            curPrepStmt.setBoolean(idx, (Boolean)obj);
+        else if (obj instanceof Float)
+            curPrepStmt.setFloat(idx, (Float)obj);
+        else if (obj instanceof Double)
+            curPrepStmt.setDouble(idx, (Double)obj);
+        else if (obj instanceof BigDecimal)
+            curPrepStmt.setBigDecimal(idx, (BigDecimal)obj);
+        else if (obj instanceof UUID)
+            curPrepStmt.setObject(idx, obj, java.sql.Types.OTHER); // 
PostgreSQL expects UUID as OTHER
+        else if (obj instanceof byte[])
+            curPrepStmt.setBytes(idx, (byte[])obj);
+        else if (obj instanceof java.sql.Date)
+            curPrepStmt.setDate(idx, (java.sql.Date)obj);
+        else if (obj instanceof java.sql.Time)
+            curPrepStmt.setTime(idx, (java.sql.Time)obj);
+        else if (obj instanceof java.sql.Timestamp)
+            curPrepStmt.setTimestamp(idx, (java.sql.Timestamp)obj);
+        else if (obj instanceof java.util.Date)
+            curPrepStmt.setTimestamp(idx, new 
java.sql.Timestamp(((java.util.Date)obj).getTime()));
+        else if (obj instanceof java.time.LocalDate)
+            curPrepStmt.setDate(idx, 
java.sql.Date.valueOf((java.time.LocalDate)obj));
+        else if (obj instanceof java.time.LocalTime)
+            curPrepStmt.setTime(idx, 
java.sql.Time.valueOf((java.time.LocalTime)obj));
+        else if (obj instanceof java.time.LocalDateTime)
+            curPrepStmt.setTimestamp(idx, 
java.sql.Timestamp.valueOf((java.time.LocalDateTime)obj));
+        else if (obj instanceof java.time.OffsetDateTime)
+            curPrepStmt.setTimestamp(idx, 
java.sql.Timestamp.from(((java.time.OffsetDateTime)obj).toInstant()));
+        else if (obj instanceof java.time.ZonedDateTime)
+            curPrepStmt.setTimestamp(idx, 
java.sql.Timestamp.from(((java.time.ZonedDateTime)obj).toInstant()));
+        else
+            curPrepStmt.setObject(idx, obj);
+    }
+
+    /**
+     * @param conn the JDBC {@link Connection} to the PostgreSQL database
+     * @param evts an {@link Iterator} of {@link CdcCacheEvent} objects to 
apply
+     * @param createTables tables creation flag. If true - attempt to create 
tables will be made.
+     */
+    public void applyCacheEvents(Connection conn, Iterator<CdcCacheEvent> 
evts, boolean createTables) {
+        CdcCacheEvent evt;
+        QueryEntity entity;
+
+        while (evts.hasNext()) {
+            evt = evts.next();
+
+            if (evt.queryEntities().size() != 1)
+                throw new IgniteException("There should be exactly 1 
QueryEntity for cacheId: " + evt.cacheId());
+
+            entity = evt.queryEntities().iterator().next();
+
+            if (createTables)
+                createTableIfNotExists(conn, entity);
+
+            cacheIdToUpsertQry.put(evt.cacheId(), getUpsertSqlQry(entity));
+
+            cacheIdToDeleteQry.put(evt.cacheId(), getDeleteSqlQry(entity));
+
+            cacheIdToPrimaryKeys.put(evt.cacheId(), getPrimaryKeys(entity));
+
+            cacheIdToFields.put(evt.cacheId(), entity.getFields().keySet());
+
+            if (log.isInfoEnabled())
+                log.info("Cache table created [tableName=" + 
entity.getTableName() +
+                    ", columns=" + entity.getFields().keySet() + ']');
+        }
+    }
+
+    /**
+     * @param conn the JDBC {@link Connection} used to execute the DDL 
statement
+     * @param entity the {@link QueryEntity} describing the table schema to 
create
+     */
+    private void createTableIfNotExists(Connection conn, QueryEntity entity) {
+        String createSqlStmt = getCreateTableSqlStatement(entity);
+
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute(createSqlStmt);
+        }
+        catch (SQLException e) {
+            log.error(e.getMessage(), e);
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * Generates the SQL statement for creating a table.
+     *
+     * @param entity QueryEntity instance describing the cache structure.
+     * @return SQL statement for creating a table.
+     */
+    public String getCreateTableSqlStatement(QueryEntity entity) {
+        StringBuilder ddl = new StringBuilder("CREATE TABLE IF NOT EXISTS 
").append(entity.getTableName()).append(" (");
+
+        addFieldsAndTypes(entity, ddl);
+
+        ddl.append(", version BYTEA NOT NULL");
+
+        ddl.append(", PRIMARY KEY (");
+
+        addPrimaryKeys(entity, ddl);
+
+        ddl.append(')').append(')');
+
+        return ddl.toString();
+    }
+
+    /**
+     * Constructs DDL-compatible SQL fragment listing fields along with their 
mapped SQL types.
+     *
+     * @param entity QueryEntity instance describing the cache structure.
+     * @param sql Target StringBuilder where the result will be appended.
+     */
+    private void addFieldsAndTypes(QueryEntity entity, StringBuilder sql) {
+        Iterator<Map.Entry<String, String>> iter = 
entity.getFields().entrySet().iterator();
+        Map.Entry<String, String> field;
+
+        while (iter.hasNext()) {
+            field = iter.next();
+
+            sql.append(field.getKey()).append(" 
").append(JAVA_TO_SQL_TYPES.getOrDefault(field.getValue(), DFLT_SQL_TYPE));
+
+            if (iter.hasNext())
+                sql.append(", ");
+        }
+    }
+
+    /**
+     * Generates a parameterized SQL UPSERT (INSERT ... ON CONFLICT DO UPDATE) 
query
+     * for the given {@link QueryEntity}, including a version-based conflict 
resolution condition.
+     * <pre>{@code
+     * INSERT INTO my_table (id, name, version) VALUES (?, ?, ?)
+     * ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name
+     * WHERE version < EXCLUDED.version
+     * }</pre>
+     *
+     * Notes:
+     * <ul>
+     *   <li>The {@code version} field is added to support version-based 
upsert logic.</li>
+     *   <li>Primary key fields are excluded from the {@code DO UPDATE SET} 
clause.</li>
+     *   <li>All fields are assigned {@code ?} placeholders for use with 
{@link java.sql.PreparedStatement}.</li>
+     * </ul>
+     *
+     * @param entity the {@link QueryEntity} describing the table, fields, and 
primary keys
+     * @return a SQL UPSERT query string with parameter placeholders and 
version conflict resolution
+     */
+    public String getUpsertSqlQry(QueryEntity entity) {
+        StringBuilder sql = new StringBuilder("INSERT INTO 
").append(entity.getTableName()).append(" (");
+
+        addFields(entity, sql);
+
+        sql.append(", version) VALUES (");
+
+        for (int i = 0; i < entity.getFields().size() + 1; ++i) { // version 
field included
+            sql.append('?');
+
+            if (i < entity.getFields().size())
+                sql.append(", ");
+        }
+
+        sql.append(") ON CONFLICT (");
+
+        addPrimaryKeys(entity, sql);
+
+        sql.append(") DO UPDATE SET ");
+
+        addUpdateFields(entity, sql);
+
+        sql.append(" WHERE ").append(entity.getTableName()).append(".version < 
EXCLUDED.version");
+
+        return sql.toString();
+    }
+
+    /**
+     * Builds a comma-separated list of field names extracted from the 
QueryEntity.
+     *
+     * @param entity QueryEntity instance describing the cache structure.
+     * @param sql Target StringBuilder where the result will be appended.
+     */
+    private void addFields(QueryEntity entity, StringBuilder sql) {
+        Iterator<Map.Entry<String, String>> iter = 
entity.getFields().entrySet().iterator();
+        Map.Entry<String, String> field;
+
+        while (iter.hasNext()) {
+            field = iter.next();
+
+            sql.append(field.getKey());
+
+            if (iter.hasNext())
+                sql.append(", ");
+        }
+    }
+
+    /**
+     * Builds a SQL update clause excluding primary key fields, including 
version-specific fields.
+     *
+     * @param entity QueryEntity instance describing the cache structure.
+     * @param sql Target StringBuilder where the resulting SQL fragment will 
be appended.
+     */
+    private void addUpdateFields(QueryEntity entity, StringBuilder sql) {
+        Set<String> primaryFields = getPrimaryKeys(entity);
+
+        Iterator<String> itAllFields = F.concat(false, "version", 
entity.getFields().keySet()).iterator();
+
+        String field;
+
+        while (itAllFields.hasNext()) {
+            field = itAllFields.next();
+
+            if (primaryFields.contains(field))
+                continue;
+
+            sql.append(field).append(" = EXCLUDED.").append(field);
+
+            if (itAllFields.hasNext())
+                sql.append(", ");
+        }
+    }
+
+    /**
+     * Generates a parameterized SQL DELETE query for the given {@link 
QueryEntity}.
+     * Example:
+     * <pre>{@code
+     * // For a key: id
+     * DELETE FROM my_table WHERE (id = ?)
+     * }</pre>
+     *
+     * If the table has a composite primary key, all keys will be included 
with AND conditions:
+     * <pre>{@code
+     * // For a composite key: id1, id2
+     * DELETE FROM my_table WHERE (id1 = ? AND id2 = ?)
+     * }</pre>
+     *
+     * @param entity the {@link QueryEntity} describing the table and its 
primary keys
+     * @return a SQL DELETE query string with parameter placeholders for 
primary key values
+     */
+    public String getDeleteSqlQry(QueryEntity entity) {
+        StringBuilder deleteQry = new StringBuilder("DELETE FROM 
").append(entity.getTableName()).append(" WHERE (");
+
+        Iterator<String> itKeys = getPrimaryKeys(entity).iterator();
+        String key;
+
+        while (itKeys.hasNext()) {
+            key = itKeys.next();
+
+            deleteQry.append(key).append(" = ?");
+
+            if (itKeys.hasNext())
+                deleteQry.append(" AND ");
+        }
+
+        deleteQry.append(')');
+
+        return deleteQry.toString();
+    }
+
+    /**
+     * Generates a SQL fragment listing primary key fields for the given 
QueryEntity.
+     *
+     * @param entity QueryEntity instance describing the cache structure.
+     * @param sql Target StringBuilder where the resulting SQL fragment will 
be appended.
+     */
+    private void addPrimaryKeys(QueryEntity entity, StringBuilder sql) {
+        Iterator<String> iterKeys = getPrimaryKeys(entity).iterator();
+
+        while (iterKeys.hasNext()) {
+            sql.append(iterKeys.next());
+
+            if (iterKeys.hasNext())
+                sql.append(", ");
+        }
+    }
+
+    /**
+     * Retrieves the primary key field names from the provided {@link 
QueryEntity}.
+     * If no primary keys are defined, it returns a set containing the first 
field from the table.

Review Comment:
   No, it's not. Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to