Jason Gustafson created KAFKA-3092:
--------------------------------------
Summary: Rename SinkTask.onPartitionsAssigned/onPartitionsRevoked
and Clarify Contract
Key: KAFKA-3092
URL: https://issues.apache.org/jira/browse/KAFKA-3092
Project: Kafka
Issue Type: Improvement
Components: copycat
Reporter: Jason Gustafson
Assignee: Jason Gustafson
The purpose of the onPartitionsRevoked() and onPartitionsAssigned() methods
exposed in Kafka Connect's SinkTask interface seems a little unclear and too
closely tied to consumer semantics. From the javadoc, these APIs are used to
open/close per-partition resources, but that would suggest that we should
always get one call to onPartitionsAssigned() before writing any records for
the corresponding partitions and one call to onPartitionsRevoked() when we have
finished with them. However, the same methods on the consumer are used to
indicate phases of the rebalance operation: onPartitionsRevoked() is called
before the rebalance begins and onPartitionsAssigned() is called after it
completes. In particular, the consumer does not guarantee a final call to
onPartitionsRevoked().
This mismatch makes the contract of these methods unclear. In fact, the
WorkerSinkTask currently does not guarantee the initial call to
onPartitionsAssigned(), nor the final call to onPartitionsRevoked(). Instead,
the task implementation must pull the initial assignment from the
SinkTaskContext. To make it more confusing, the call to commit offsets
following onPartitionsRevoked() causes a flush() on a partition which had
already been revoked. All of this makes it difficult to use this API as
suggested in the javadocs.
To fix this, we should clarify the behavior of these methods and consider
renaming them to avoid confusion with the same methods in the consumer API. If
onPartitionsAssigned() is meant for opening resources, maybe we can rename it
to open(). Similarly, onPartitionsRevoked() can be renamed to close(). We can
then fix the code to ensure that a typical open/close contract is enforced.
This would also mean removing the need to pass the initial assignment in the
SinkTaskContext. This would give the following API:
{code}
void open(Collection<TopicPartition> partitions);
void close(Collection<TopicPartition> partitions);
{code}
We could also consider going a little further. Instead of depending on
onPartitionsAssigned() to open resources, tasks could open partition resources
on demand as records are received. In general, connectors will need some way to
close partition-specific resources, but there might not be any need to pass the
full list of partitions to close since the only open resources should be those
that have received writes since the last rebalance. In this case, we just have
a single method:
{code}
void close();
{code}
The downside to this is that the difference between close() and stop() then
becomes a little unclear.
Obviously these are not compatible changes and connectors would have to be
updated.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)