maksaska commented on code in PR #311:
URL: https://github.com/apache/ignite-extensions/pull/311#discussion_r2183693141


##########
modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcConsumer.java:
##########
@@ -0,0 +1,265 @@
+package org.apache.ignite.cdc.postgresql;
+
+import java.sql.Connection;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import javax.sql.DataSource;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cdc.CdcCacheEvent;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.TypeMapping;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
+import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.metric.MetricRegistry;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ * This class represents a consumer component that replicates cache changes 
from Apache Ignite to PostgreSQL using
+ * Change Data Capture (CDC) mechanism. It applies events to PostgreSQL via 
batch-prepared SQL statements, ensuring
+ * efficient handling of large volumes of updates.
+ *
+ * <p>Additionally, it provides methods for initializing connections, managing 
transactions, and performing atomic batches
+ * of writes.</p>
+ */
+public class IgniteToPostgreSqlCdcConsumer implements CdcConsumer {
+    /** */
+    public static final String EVTS_SENT_CNT = "EventsCount";
+
+    /** */
+    public static final String EVTS_SENT_CNT_DESC = "Count of events applied 
to PostgreSQL";
+
+    /** */
+    public static final String LAST_EVT_SENT_TIME = "LastEventTime";
+
+    /** */
+    public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last 
applied event to PostgreSQL";
+
+    /** */
+    private static final boolean DFLT_IS_ONLY_PRIMARY = false;
+
+    /** */
+    private static final long DFLT_BATCH_SIZE = 1024;
+
+    /** */
+    private static final boolean DFLT_CREATE_TABLES = false;
+
+    /** */
+    private static final boolean DFLT_AUTO_COMMIT = false;
+
+    /** */
+    private DataSource dataSrc;
+
+    /** Collection of cache names which will be replicated to PostgreSQL. */
+    private Collection<String> caches;
+
+    /** */
+    private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
+
+    /** */
+    private long maxBatchSize = DFLT_BATCH_SIZE;
+
+    /** */
+    private boolean createTables = DFLT_CREATE_TABLES;
+
+    /** */
+    private boolean autoCommit = DFLT_AUTO_COMMIT;
+
+    /** Log. */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** Cache IDs. */
+    private Set<Integer> cachesIds;
+
+    /** Applier instance responsible for applying individual CDC events to 
PostgreSQL. */
+    private IgniteToPostgreSqlCdcApplier applier;
+
+    /** Count of events applied to PostgreSQL. */
+    private AtomicLongMetric evtsCnt;
+
+    /** Timestamp of last applied batch to PostgreSQL. */
+    private AtomicLongMetric lastEvtTs;
+
+    /** {@inheritDoc} */
+    @Override public void start(MetricRegistry reg) {
+        A.notNull(dataSrc, "dataSource");
+        A.notEmpty(caches, "caches");
+
+        cachesIds = caches.stream()
+            .map(CU::cacheId)
+            .collect(Collectors.toSet());
+
+        applier = new IgniteToPostgreSqlCdcApplier(maxBatchSize, log);
+
+        MetricRegistryImpl mreg = (MetricRegistryImpl)reg;
+
+        this.evtsCnt = mreg.longMetric(EVTS_SENT_CNT, EVTS_SENT_CNT_DESC);
+        this.lastEvtTs = mreg.longMetric(LAST_EVT_SENT_TIME, 
LAST_EVT_SENT_TIME_DESC);
+
+        if (log.isInfoEnabled())
+            log.info("CDC Ignite to PostgreSQL start-up [cacheIds=" + 
cachesIds + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onEvents(Iterator<CdcEvent> events) {
+        Iterator<CdcEvent> filtered = F.iterator(
+            events,
+            F.identity(),
+            true,
+            evt -> !onlyPrimary || evt.primary(),
+            evt -> cachesIds.contains(evt.cacheId()));
+
+        return withTx((conn) -> {

Review Comment:
   Indeed It's not necessary. Fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to