[ https://issues.apache.org/jira/browse/KAFKA-14021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860154#comment-17860154 ]
Pritam Kumar commented on KAFKA-14021: -------------------------------------- [~ChrisEgerton] Although we have implemented theĀ @Override public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) { return consumerUsesReadCommitted(props) ? ExactlyOnceSupport.SUPPORTED : ExactlyOnceSupport.UNSUPPORTED; } In the source connector, but one thing we should make sure that before polling the consumer in the source connector should sync the offsets between the offsets committed in the offset.storage.topic and __offsets topic which a plain consumer uses. The sync is happening in the task.commit() call which is called outside the transaction. private void commitTransaction() { log.debug("{} Committing offsets", this); long started = time.milliseconds(); AtomicReference<Throwable> flushError = new AtomicReference<>(); boolean shouldFlush = false; try { // Begin the flush without waiting, as there should not be any concurrent flushes. // This is because commitTransaction is always called on the same thread, and should always block until // the flush is complete, or cancel the flush if an error occurs. shouldFlush = offsetWriter.beginFlush(); } catch (Throwable e) { flushError.compareAndSet(null, e); } if (flushError.get() == null && !transactionOpen && !shouldFlush) { // There is no contents on the framework side to commit, so skip the offset flush and producer commit long durationMillis = time.milliseconds() - started; recordCommitSuccess(durationMillis); log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis); commitSourceTask(); return; } // We might have just aborted a transaction, in which case we'll have to begin a new one // in order to commit offsets maybeBeginTransaction(); if (shouldFlush) { // Now we can actually write the offsets to the internal topic. // No need to track the flush future here since it's guaranteed to complete by the time // Producer::commitTransaction completes // We do have to track failures for that callback though, since they may originate from outside // the producer (i.e., the offset writer or the backing offset store), and would not cause // Producer::commitTransaction to fail offsetWriter.doFlush((error, result) -> { if (error != null) { log.error("{} Failed to flush offsets to storage: ", ExactlyOnceWorkerSourceTask.this, error); flushError.compareAndSet(null, error); } else { log.trace("{} Finished flushing offsets to storage", ExactlyOnceWorkerSourceTask.this); } }); } // Only commit the transaction if we were able to serialize the offsets. // Otherwise, we may commit source records without committing their offsets Throwable error = flushError.get(); if (error == null) { try { // Commit the transaction // Blocks until all outstanding records have been sent and ack'd +_*producer.commitTransaction();*_+ } catch (Throwable t) { log.error("{} Failed to commit producer transaction", ExactlyOnceWorkerSourceTask.this, t); flushError.compareAndSet(null, t); } transactionOpen = false; } error = flushError.get(); if (error != null) { recordCommitFailure(time.milliseconds() - started, null); offsetWriter.cancelFlush(); throw maybeWrapProducerSendException( "Failed to flush offsets and/or records for task " + id, error ); } transactionMetrics.commitTransaction(); long durationMillis = time.milliseconds() - started; recordCommitSuccess(durationMillis); log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis); // Synchronize in order to guarantee that writes on other threads are picked up by this one synchronized (committableRecords) { committableRecords.forEach(this::commitTaskRecord); committableRecords.clear(); } +_*commitSourceTask();*_+ } Which could still result in the duplicate records. Can you please help me out here. > MirrorMaker 2 should implement KIP-618 APIs > ------------------------------------------- > > Key: KAFKA-14021 > URL: https://issues.apache.org/jira/browse/KAFKA-14021 > Project: Kafka > Issue Type: Improvement > Components: connect, mirrormaker > Reporter: Chris Egerton > Assignee: Chris Egerton > Priority: Major > Fix For: 3.5.0 > > > The {{MirrorSourceConnector}} class should implement the new APIs added by > KIP-618. > This includes the > [SourceConnector::exactlyOnceSupport|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L34-L54] > method and, potentially, the > [SourceConnector::canDefineTransactionBoundaries|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L56-L73] > method. -- This message was sent by Atlassian Jira (v8.20.10#820010)