Jason Gustafson created KAFKA-3092:
--------------------------------------

             Summary: Rename SinkTask.onPartitionsAssigned/onPartitionsRevoked 
and Clarify Contract
                 Key: KAFKA-3092
                 URL: https://issues.apache.org/jira/browse/KAFKA-3092
             Project: Kafka
          Issue Type: Improvement
          Components: copycat
            Reporter: Jason Gustafson
            Assignee: Jason Gustafson


The purpose of the onPartitionsRevoked() and onPartitionsAssigned() methods 
exposed in Kafka Connect's SinkTask interface seems a little unclear and too 
closely tied to consumer semantics. From the javadoc, these APIs are used to 
open/close per-partition resources, but that would suggest that we should 
always get one call to onPartitionsAssigned() before writing any records for 
the corresponding partitions and one call to onPartitionsRevoked() when we have 
finished with them. However, the same methods on the consumer are used to 
indicate phases of the rebalance operation: onPartitionsRevoked() is called 
before the rebalance begins and onPartitionsAssigned() is called after it 
completes. In particular, the consumer does not guarantee a final call to 
onPartitionsRevoked(). 

This mismatch makes the contract of these methods unclear. In fact, the 
WorkerSinkTask currently does not guarantee the initial call to 
onPartitionsAssigned(), nor the final call to onPartitionsRevoked(). Instead, 
the task implementation must pull the initial assignment from the 
SinkTaskContext. To make it more confusing, the call to commit offsets 
following onPartitionsRevoked() causes a flush() on a partition which had 
already been revoked. All of this makes it difficult to use this API as 
suggested in the javadocs.

To fix this, we should clarify the behavior of these methods and consider 
renaming them to avoid confusion with the same methods in the consumer API. If 
onPartitionsAssigned() is meant for opening resources, maybe we can rename it 
to open(). Similarly, onPartitionsRevoked() can be renamed to close(). We can 
then fix the code to ensure that a typical open/close contract is enforced. 
This would also mean removing the need to pass the initial assignment in the 
SinkTaskContext. This would give the following API:

{code}
void open(Collection<TopicPartition> partitions);
void close(Collection<TopicPartition> partitions);
{code}

We could also consider going a little further. Instead of depending on 
onPartitionsAssigned() to open resources, tasks could open partition resources 
on demand as records are received. In general, connectors will need some way to 
close partition-specific resources, but there might not be any need to pass the 
full list of partitions to close since the only open resources should be those 
that have received writes since the last rebalance. In this case, we just have 
a single method:

{code}
void close();
{code}

The downside to this is that the difference between close() and stop() then 
becomes a little unclear.

Obviously these are not compatible changes and connectors would have to be 
updated.






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to