tikimims commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1042944441


##########
docs/connect.html:
##########
@@ -593,6 +701,107 @@ <h5><a id="connect_resuming" 
href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input 
streams. The <code>OffsetStorageReader</code> interface also allows you to 
issue bulk reads to efficiently load all offsets, then apply them by seeking 
each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" 
href="#connect_exactlyoncesourceconnectors>">Exactly-once source 
connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors";>KIP-618</a>,
 Kafka Connect supports exactly-once source connectors as of version 3.3.0. In 
order a source connector to take advantage of this support, it must be able to 
provide meaningful source offsets for each record that it emits, and resume 
consumption from the external system at the exact position corresponding to any 
of those offsets without dropping or duplicating messages.</p>
+
+    <h6>Defining transaction boundaries</h6>
+
+    <p>By default, the Kafka Connect framework will create and commit a new 
Kafka transaction for each batch of records that a source task returns from its 
<code>poll</code> method. However, connectors can also define their own 
transaction boundaries, which can be enabled by users by setting the 
<code>transaction.boundary</code> property to <code>connector</code> in the 
config for the connector.</p>
+
+    <p>If enabled, the connector's tasks will have access to a 
<code>TransactionContext</code> from their <code>SourceTaskContext</code>, 
which they can use to control when transactions are aborted and committed.</p>
+
+    <p>For example, to commit a transaction at least every ten records:</p>
+
+<pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    boolean shouldCommit = false;
+    for (SourceRecord record : records) {
+        if (++this.recordsSent >= 10) {
+            shouldCommit = true;
+        }
+    }
+    if (shouldCommit) {
+        this.recordsSent = 0;
+        this.context.transactionContext().commitTransaction();
+    }
+}
+</pre>
+
+    <p>Or to commit a transaction for exactly every tenth record:</p>
+
+    <pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    for (SourceRecord record : records) {
+        if (++this.recordsSent % 10 == 0) {
+            this.context.transactionContext().commitTransaction(record);
+        }
+    }
+}
+</pre>
+
+    <p>Most connectors do not need to define their own transaction boundaries. 
However, it may be useful if files or objects in the source system are broken 
up into multiple source records, but should be delivered atomically. 
Additionally, it may be useful if it is impossible to give each source record a 
unique source offset, if every record with a given offset is delivered within a 
single transaction.</p>
+
+    <p>Note that if the user has not enabled connector-defined transaction 
boundaries in the connector config, the <code>TransactionContext</code> 
returned by <code>context.transactionContext()</code> will be 
<code>null</code>.</p>
+
+    <h6>Validation APIs</h6>
+
+    <p>A few additional preflight validation APIs can be implemented by source 
connector developers.</p>
+
+    <p>Some users may require exactly-once delivery guarantees from a 
connector. In this case, they may set the <code>exactly.once.support</code> 
property to <code>required</code> in the configuration for the connector. When 
this happens, the Kafka Connect framework will ask the connector whether it can 
provide exactly-once delivery guarantees with the specified configuration. This 
is done by invoking the <code>exactlyOnceSupport</code> method on the 
connector.</p>
+
+    <p>If a connector doesn't support exactly-once, it should still implement 
this method, to let users know for certain that it cannot provide exactly-once 
delivery guarantees:</p>
+
+<pre class="brush: java;">
+@Override
+public ExactlyOnceSupport exactlyOnceSupport(Map&lt;String, String&gt; props) {
+    // This connector cannot provide exactly-once delivery guarantees under 
any conditions
+    return ExactlyOnceSupport.UNSUPPORTED;
+}
+</pre>
+
+    <p>Otherwise, a connector should examine the configuration, and return 
<code>ExactlyOnceSupport.SUPPORTED</code> if it can provide exactly-once 
delivery guarantees:</p>
+
+<pre class="brush: java;">
+@Override
+public ExactlyOnceSupport exactlyOnceSupport(Map&lt;String, String&gt; props) {
+    // This connector can always provide exactly-once delivery guarantees
+    return ExactlyOnceSupport.SUPPORTED;
+}
+</pre>
+
+    <p>Additionally, if the user has configured the connector to define its 
own transaction boundaries, the Kafka Connect framework will ask the connector 
whether it can define its own transaction boundaries with the specified 
configuration, via the <code>canDefineTransactionBoundaries</code> method:</p>

Review Comment:
   ```suggestion
       <p>Additionally, if the user has configured the connector to define its 
own transaction boundaries, the Kafka Connect framework will ask the connector 
whether it can define its own transaction boundaries with the specified 
configuration, using the <code>canDefineTransactionBoundaries</code> method:</p>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to