[ https://issues.apache.org/jira/browse/KAFKA-3092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15095314#comment-15095314 ]
Ewen Cheslack-Postava commented on KAFKA-3092: ---------------------------------------------- The change to rename to open/close and fix up the guarantees about when they are invoked makes sense, and I don't think there's actually a good use case for passing in the partition assignment info anyway except to occasionally allow a connector to not have to track a bit of info for itself. I don't think close() vs stop() is necessarily that confusing. Some connectors may have shared context which isn't tied to a particular resource (e.g. the connection to the other system) and stop() is the only place to clean that up that wouldn't be wasteful -- using close() for that would mean you'd be restarting that connection every time there's a rebalance. For connectors that don't need to close per-partition resources on rebalance, they just won't implement close() (or will they need to? in the updated semantics, is flush() called during a rebalance or only close() followed by the framework committing offsets?). I'm not sure we can move fully to implicit allocation of resources by connectors when they see records with a topic partition. This is what we basically required until we added the onAssigned/onRevoked. But for connectors that manage their own offsets, I think this doesn't work. For example, the HDFS connector needs to know it has been assigned a partition so it can list the files in HDFS, figure out the last committed offset (and process the WAL if a commit wasn't properly completed), and then tell the framework where to start consuming from. Re: compatibility, I'm sure others will have input on this too, but we kept things marked unstable for a reason -- unlike the consumer where there was a lot of experience to inform the updated API (and where there's still some doubt around the design), we knew this API, despite our best efforts, was unlikely to be perfect the first time around. At this point, impact will be pretty low (you'll have to run connectors with the right version of KC/Kafka, but that's not that big a deal, and the changes this is introducing are mainly just rearrangement of code in connectors, not a huge change to their implementations). > Rename SinkTask.onPartitionsAssigned/onPartitionsRevoked and Clarify Contract > ----------------------------------------------------------------------------- > > Key: KAFKA-3092 > URL: https://issues.apache.org/jira/browse/KAFKA-3092 > Project: Kafka > Issue Type: Improvement > Components: copycat > Reporter: Jason Gustafson > Assignee: Jason Gustafson > > The purpose of the onPartitionsRevoked() and onPartitionsAssigned() methods > exposed in Kafka Connect's SinkTask interface seems a little unclear and too > closely tied to consumer semantics. From the javadoc, these APIs are used to > open/close per-partition resources, but that would suggest that we should > always get one call to onPartitionsAssigned() before writing any records for > the corresponding partitions and one call to onPartitionsRevoked() when we > have finished with them. However, the same methods on the consumer are used > to indicate phases of the rebalance operation: onPartitionsRevoked() is > called before the rebalance begins and onPartitionsAssigned() is called after > it completes. In particular, the consumer does not guarantee a final call to > onPartitionsRevoked(). > This mismatch makes the contract of these methods unclear. In fact, the > WorkerSinkTask currently does not guarantee the initial call to > onPartitionsAssigned(), nor the final call to onPartitionsRevoked(). Instead, > the task implementation must pull the initial assignment from the > SinkTaskContext. To make it more confusing, the call to commit offsets > following onPartitionsRevoked() causes a flush() on a partition which had > already been revoked. All of this makes it difficult to use this API as > suggested in the javadocs. > To fix this, we should clarify the behavior of these methods and consider > renaming them to avoid confusion with the same methods in the consumer API. > If onPartitionsAssigned() is meant for opening resources, maybe we can rename > it to open(). Similarly, onPartitionsRevoked() can be renamed to close(). We > can then fix the code to ensure that a typical open/close contract is > enforced. This would also mean removing the need to pass the initial > assignment in the SinkTaskContext. This would give the following API: > {code} > void open(Collection<TopicPartition> partitions); > void close(Collection<TopicPartition> partitions); > {code} > We could also consider going a little further. Instead of depending on > onPartitionsAssigned() to open resources, tasks could open partition > resources on demand as records are received. In general, connectors will need > some way to close partition-specific resources, but there might not be any > need to pass the full list of partitions to close since the only open > resources should be those that have received writes since the last rebalance. > In this case, we just have a single method: > {code} > void close(); > {code} > The downside to this is that the difference between close() and stop() then > becomes a little unclear. > Obviously these are not compatible changes and connectors would have to be > updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)