[ 
https://issues.apache.org/jira/browse/KAFKA-14021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860154#comment-17860154
 ] 

Pritam Kumar commented on KAFKA-14021:
--------------------------------------

[~ChrisEgerton] Although we have implemented theĀ 

@Override
public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
return consumerUsesReadCommitted(props)
? ExactlyOnceSupport.SUPPORTED
: ExactlyOnceSupport.UNSUPPORTED;
}

In the source connector, but one thing we should make sure that before polling 
the consumer in the source connector should sync the offsets between the 
offsets committed in the offset.storage.topic and __offsets topic which a plain 
consumer uses. The sync is happening in the task.commit() call which is called 
outside the transaction.



private void commitTransaction() {
log.debug("{} Committing offsets", this);

long started = time.milliseconds();

AtomicReference<Throwable> flushError = new AtomicReference<>();
boolean shouldFlush = false;
try {
// Begin the flush without waiting, as there should not be any concurrent 
flushes.
// This is because commitTransaction is always called on the same thread, and 
should always block until
// the flush is complete, or cancel the flush if an error occurs.
shouldFlush = offsetWriter.beginFlush();
} catch (Throwable e) {
flushError.compareAndSet(null, e);
}
if (flushError.get() == null && !transactionOpen && !shouldFlush) {
// There is no contents on the framework side to commit, so skip the offset 
flush and producer commit
long durationMillis = time.milliseconds() - started;
recordCommitSuccess(durationMillis);
log.debug("{} Finished commitOffsets successfully in {} ms", this, 
durationMillis);

commitSourceTask();
return;
}

// We might have just aborted a transaction, in which case we'll have to begin 
a new one
// in order to commit offsets
maybeBeginTransaction();

if (shouldFlush) {
// Now we can actually write the offsets to the internal topic.
// No need to track the flush future here since it's guaranteed to complete by 
the time
// Producer::commitTransaction completes
// We do have to track failures for that callback though, since they may 
originate from outside
// the producer (i.e., the offset writer or the backing offset store), and 
would not cause
// Producer::commitTransaction to fail
offsetWriter.doFlush((error, result) -> {
if (error != null) {
log.error("{} Failed to flush offsets to storage: ", 
ExactlyOnceWorkerSourceTask.this, error);
flushError.compareAndSet(null, error);
} else {
log.trace("{} Finished flushing offsets to storage", 
ExactlyOnceWorkerSourceTask.this);
}
});
}

// Only commit the transaction if we were able to serialize the offsets.
// Otherwise, we may commit source records without committing their offsets
Throwable error = flushError.get();
if (error == null) {
try {
// Commit the transaction
// Blocks until all outstanding records have been sent and ack'd
+_*producer.commitTransaction();*_+
} catch (Throwable t) {
log.error("{} Failed to commit producer transaction", 
ExactlyOnceWorkerSourceTask.this, t);
flushError.compareAndSet(null, t);
}
transactionOpen = false;
}

error = flushError.get();
if (error != null) {
recordCommitFailure(time.milliseconds() - started, null);
offsetWriter.cancelFlush();
throw maybeWrapProducerSendException(
"Failed to flush offsets and/or records for task " + id,
error
);
}

transactionMetrics.commitTransaction();

long durationMillis = time.milliseconds() - started;
recordCommitSuccess(durationMillis);
log.debug("{} Finished commitOffsets successfully in {} ms", this, 
durationMillis);

// Synchronize in order to guarantee that writes on other threads are picked up 
by this one
synchronized (committableRecords) {
committableRecords.forEach(this::commitTaskRecord);
committableRecords.clear();
}
+_*commitSourceTask();*_+
}

Which could still result in the duplicate records.

Can you please help me out here.

> MirrorMaker 2 should implement KIP-618 APIs
> -------------------------------------------
>
>                 Key: KAFKA-14021
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14021
>             Project: Kafka
>          Issue Type: Improvement
>          Components: connect, mirrormaker
>            Reporter: Chris Egerton
>            Assignee: Chris Egerton
>            Priority: Major
>             Fix For: 3.5.0
>
>
> The {{MirrorSourceConnector}} class should implement the new APIs added by 
> KIP-618.
> This includes the 
> [SourceConnector::exactlyOnceSupport|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L34-L54]
>  method and, potentially, the 
> [SourceConnector::canDefineTransactionBoundaries|https://github.com/apache/kafka/blob/025e47b8334eb7125c7fdd2f725a2fef3c98344c/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java#L56-L73]
>  method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to