[
https://issues.apache.org/jira/browse/KAFKA-6080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341287#comment-16341287
]
Randall Hauch commented on KAFKA-6080:
--------------------------------------
Here are some initial thoughts.
First, we want source connector's tasks to be able to optionally define the
transaction boundaries. Since tasks are not able to coordinate or collaborate
directly (they may be in different processes), that implies that tasks cannot
collaborate on a single transaction.
Second, a source task can't call a separate API to demarcate the transaction
boundaries, since the source task is being asked to returning messages via the
poll method. So, the transaction demarcation needs to be expressable within the
sequence of {{SourceRecord}} objects that are returned. Doing this will also
allow a single call to {{poll()}} return messages in multiple transactions.
However, we may want to _recommend_ that when possible (or require?) tasks
always begin and commit/rollback a transaction within the same set of records
returned by a single call to {{poll()}}. Obviously this won't be a requirement,
but Connect does not currently place any guarantees on the timing of subsequent
calls to {{poll()}}.
We could add to the Kafka Connect API three new subtypes of {{SourceRecord}} so
that source connectors can explicitly control the boundary of these EOS
transactions:
* {{BeginTransaction}} represents the beginning of a transaction in the source
system
* {{CommitTransaction}} represents the successful completion of a transaction
in the source system
* {{RollbackTransaction}} represents the cancellation of a transaction in the
source system
These record types would include the source partition and source offset, but
would have a null value for the key, key schema, value, value schema, topic,
and partition number fields of {{SourceRecord}}.
Source connectors include these records with normal {{SourceRecord}} objects
and return them in the {{List<SourceRecord>}} results. Each transaction is
specified with a {{BeginTransaction}}, followed by one or more normal records,
and ending with either {{CommitTransaction}} or {{RollbackTransaction}}. A
transaction sequence with no source records between a {{BeginTransaction}} and
{{CommitTransaction}} would simply update the source offsets for the source
partition, allowing the connector to record it has made progress without
producing new records (see KAFKA-3821). And any transaction sequence that ends
with a {{RollbackTransaction}} would correspond to an aborted EOS transaction.
Any {{SourceRecord}} objects outside of an explicit transaction sequence will
be also written using Kafka's EOS feature, although the framework would be free
to determine the EOS transaction boundaries.
Some things we still need to consider:
* Currently each source task is given its own {{Producer}} instance, so that
should eliminate cross-talk between the transaction boundaries from different
tasks writing to the same topics.
* Should Connect always use EOS transactions, even when the source
connector/task does not use them?
* Should offsets be committed within the same EOS transactions? If so, what
happens when transactions are rolled back? Any offset committing during that
transaction would also be rolled back, but is that actually what we want?
* Should connectors or tasks be required to declare that they may use
transactions? If so, how?
> Transactional EoS for source connectors
> ---------------------------------------
>
> Key: KAFKA-6080
> URL: https://issues.apache.org/jira/browse/KAFKA-6080
> Project: Kafka
> Issue Type: New Feature
> Components: KafkaConnect
> Reporter: Antony Stubbs
> Priority: Major
> Labels: needs-kip
>
> Exactly once (eos) message production for source connectors.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)