tikimims commented on code in PR #12941: URL: https://github.com/apache/kafka/pull/12941#discussion_r1042944441
########## docs/connect.html: ########## @@ -593,6 +701,107 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p> + <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5> + + <h6>Supporting exactly-once</h6> + + <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p> + + <h6>Defining transaction boundaries</h6> + + <p>By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its <code>poll</code> method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the <code>transaction.boundary</code> property to <code>connector</code> in the config for the connector.</p> + + <p>If enabled, the connector's tasks will have access to a <code>TransactionContext</code> from their <code>SourceTaskContext</code>, which they can use to control when transactions are aborted and committed.</p> + + <p>For example, to commit a transaction at least every ten records:</p> + +<pre class="brush: java;"> +private int recordsSent; + +@Override +public void start(Map<String, String> props) { + this.recordsSent = 0; +} + +@Override +public List<SourceRecord> poll() { + List<SourceRecord> records = fetchRecords(); + boolean shouldCommit = false; + for (SourceRecord record : records) { + if (++this.recordsSent >= 10) { + shouldCommit = true; + } + } + if (shouldCommit) { + this.recordsSent = 0; + this.context.transactionContext().commitTransaction(); + } +} +</pre> + + <p>Or to commit a transaction for exactly every tenth record:</p> + + <pre class="brush: java;"> +private int recordsSent; + +@Override +public void start(Map<String, String> props) { + this.recordsSent = 0; +} + +@Override +public List<SourceRecord> poll() { + List<SourceRecord> records = fetchRecords(); + for (SourceRecord record : records) { + if (++this.recordsSent % 10 == 0) { + this.context.transactionContext().commitTransaction(record); + } + } +} +</pre> + + <p>Most connectors do not need to define their own transaction boundaries. However, it may be useful if files or objects in the source system are broken up into multiple source records, but should be delivered atomically. Additionally, it may be useful if it is impossible to give each source record a unique source offset, if every record with a given offset is delivered within a single transaction.</p> + + <p>Note that if the user has not enabled connector-defined transaction boundaries in the connector config, the <code>TransactionContext</code> returned by <code>context.transactionContext()</code> will be <code>null</code>.</p> + + <h6>Validation APIs</h6> + + <p>A few additional preflight validation APIs can be implemented by source connector developers.</p> + + <p>Some users may require exactly-once delivery guarantees from a connector. In this case, they may set the <code>exactly.once.support</code> property to <code>required</code> in the configuration for the connector. When this happens, the Kafka Connect framework will ask the connector whether it can provide exactly-once delivery guarantees with the specified configuration. This is done by invoking the <code>exactlyOnceSupport</code> method on the connector.</p> + + <p>If a connector doesn't support exactly-once, it should still implement this method, to let users know for certain that it cannot provide exactly-once delivery guarantees:</p> + +<pre class="brush: java;"> +@Override +public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) { + // This connector cannot provide exactly-once delivery guarantees under any conditions + return ExactlyOnceSupport.UNSUPPORTED; +} +</pre> + + <p>Otherwise, a connector should examine the configuration, and return <code>ExactlyOnceSupport.SUPPORTED</code> if it can provide exactly-once delivery guarantees:</p> + +<pre class="brush: java;"> +@Override +public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) { + // This connector can always provide exactly-once delivery guarantees + return ExactlyOnceSupport.SUPPORTED; +} +</pre> + + <p>Additionally, if the user has configured the connector to define its own transaction boundaries, the Kafka Connect framework will ask the connector whether it can define its own transaction boundaries with the specified configuration, via the <code>canDefineTransactionBoundaries</code> method:</p> Review Comment: ```suggestion <p>Additionally, if the user has configured the connector to define its own transaction boundaries, the Kafka Connect framework will ask the connector whether it can define its own transaction boundaries with the specified configuration, using the <code>canDefineTransactionBoundaries</code> method:</p> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org