maksaska commented on code in PR #311: URL: https://github.com/apache/ignite-extensions/pull/311#discussion_r2183693141
########## modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcConsumer.java: ########## @@ -0,0 +1,265 @@ +package org.apache.ignite.cdc.postgresql; + +import java.sql.Connection; +import java.util.Collection; +import java.util.Iterator; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.cdc.CdcCacheEvent; +import org.apache.ignite.cdc.CdcConsumer; +import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.cdc.TypeMapping; +import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; +import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.metric.MetricRegistry; +import org.apache.ignite.resources.LoggerResource; + +/** + * This class represents a consumer component that replicates cache changes from Apache Ignite to PostgreSQL using + * Change Data Capture (CDC) mechanism. It applies events to PostgreSQL via batch-prepared SQL statements, ensuring + * efficient handling of large volumes of updates. + * + * <p>Additionally, it provides methods for initializing connections, managing transactions, and performing atomic batches + * of writes.</p> + */ +public class IgniteToPostgreSqlCdcConsumer implements CdcConsumer { + /** */ + public static final String EVTS_SENT_CNT = "EventsCount"; + + /** */ + public static final String EVTS_SENT_CNT_DESC = "Count of events applied to PostgreSQL"; + + /** */ + public static final String LAST_EVT_SENT_TIME = "LastEventTime"; + + /** */ + public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last applied event to PostgreSQL"; + + /** */ + private static final boolean DFLT_IS_ONLY_PRIMARY = false; + + /** */ + private static final long DFLT_BATCH_SIZE = 1024; + + /** */ + private static final boolean DFLT_CREATE_TABLES = false; + + /** */ + private static final boolean DFLT_AUTO_COMMIT = false; + + /** */ + private DataSource dataSrc; + + /** Collection of cache names which will be replicated to PostgreSQL. */ + private Collection<String> caches; + + /** */ + private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY; + + /** */ + private long maxBatchSize = DFLT_BATCH_SIZE; + + /** */ + private boolean createTables = DFLT_CREATE_TABLES; + + /** */ + private boolean autoCommit = DFLT_AUTO_COMMIT; + + /** Log. */ + @LoggerResource + private IgniteLogger log; + + /** Cache IDs. */ + private Set<Integer> cachesIds; + + /** Applier instance responsible for applying individual CDC events to PostgreSQL. */ + private IgniteToPostgreSqlCdcApplier applier; + + /** Count of events applied to PostgreSQL. */ + private AtomicLongMetric evtsCnt; + + /** Timestamp of last applied batch to PostgreSQL. */ + private AtomicLongMetric lastEvtTs; + + /** {@inheritDoc} */ + @Override public void start(MetricRegistry reg) { + A.notNull(dataSrc, "dataSource"); + A.notEmpty(caches, "caches"); + + cachesIds = caches.stream() + .map(CU::cacheId) + .collect(Collectors.toSet()); + + applier = new IgniteToPostgreSqlCdcApplier(maxBatchSize, log); + + MetricRegistryImpl mreg = (MetricRegistryImpl)reg; + + this.evtsCnt = mreg.longMetric(EVTS_SENT_CNT, EVTS_SENT_CNT_DESC); + this.lastEvtTs = mreg.longMetric(LAST_EVT_SENT_TIME, LAST_EVT_SENT_TIME_DESC); + + if (log.isInfoEnabled()) + log.info("CDC Ignite to PostgreSQL start-up [cacheIds=" + cachesIds + ']'); + } + + /** {@inheritDoc} */ + @Override public boolean onEvents(Iterator<CdcEvent> events) { + Iterator<CdcEvent> filtered = F.iterator( + events, + F.identity(), + true, + evt -> !onlyPrimary || evt.primary(), + evt -> cachesIds.contains(evt.cacheId())); + + return withTx((conn) -> { Review Comment: Indeed It's not necessary. Fixed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org