[ https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15491296#comment-15491296 ]
Shikhar Bhushan commented on KAFKA-4161: ---------------------------------------- bq. Probably worth clarifying whether we're really talking about just flush here or offset commit as well. Flush really only exists in order to support offset commit (from the framework's perspective), but since you mention full buffers I think you might be getting at a slightly different use case for connectors. Sorry I wasn't clear, flushing data & offset commit are currently coupled as you pointed out. If we want to avoid unnecessary redelivery of records it is best to commit offsets with the 'most current' knowledge of them, which we currently have after calling {{flush()}}. bq. In general, I think it'd actually be even better to just get rid of the idea of having to flush as a common operation as it hurts throughput to have to flush entirely to commit offsets (we are flushing the pipeline, which is never good). Ideally we coudl do what the framework does with source connectors and just track which data has been successfully delivered and use that for the majority of offset commits. We'd still need it for cases like shutdown where we want to make sure all data has been sent, but since the framework controls delivery of data, maybe its even better just to wait for that data to be written. Good points, I agree it would be better to make it so {{flush()}} is not routine since it can hurt throughput. I think we can deprecate it altogether. As a proposal: {noformat} abstract class SinkTask { .. // New method public Map<TopicPartition, OffsetMetadata> flushedOffsets() { throw new NotImplementedException(); } @Deprecated public void flush(Map<TopicPartition, OffsetMetadata> offsets) { } .. } {noformat} Then periodic offset committing business would get at the {{flushedOffsets()}}, and if that is not implemented, call {{flush()}} as currently so it can commit the offset state as of the last {{put()}} call. I don't think {{flush()}} is needed even at shutdown. Tasks are already being advised via {{close()}} and can choose to flush any buffered data from there. We can do a final offset commit based on the {{flushedOffsets()}} after {{close()}} (though this does imply a quirk that even after a {{TopicPartition}} is closed we expect tasks to keep offset state around in the map returned by {{flushedOffsets()}}). Additionally, it would be good to have a {{context.requestCommit()}} in the spirit of {{context.requestFlush()}} as I was originally proposing. The motivation is that connectors can optimize for avoiding unnecessary redelivery when recovering from failures. Connectors can choose whatever policies are best like number-of-records or size-based batching/buffering for writing to the destination system as part of the normal flow of calls to {{put()}}, and request a commit when they have actually written data to the destination system. There need not be a strong guarantee about whether offset committing actually happens after such a request so we don't commit offsets too often and can choose to only do it after some minimum interval, e.g. in case a connector always requests commit after a put. bq. The main reason I think we even need the explicit flush() is that some connectors may have very long delays between flushes (e.g. any object stores like S3) such that they need to be told directly that they need to write all their data (or discard it). I don't believe it is currently possible for a connector to communicate that it wants to discard data rather than write it out when {{flush()}} is called (aside from I guess throwing an exception...). With the above proposal the decision of when and whether or not to write data would be completely upto connectors. bq. Was there a specific connector & scenario you were thinking about here? This came up in a thread on the user list ('Sink Connector feature request: SinkTask.putAndReport()') > Allow connectors to request flush via the context > ------------------------------------------------- > > Key: KAFKA-4161 > URL: https://issues.apache.org/jira/browse/KAFKA-4161 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect > Reporter: Shikhar Bhushan > Assignee: Ewen Cheslack-Postava > Labels: needs-kip > > It is desirable to have, in addition to the time-based flush interval, volume > or size-based commits. E.g. a sink connector which is buffering in terms of > number of records may want to request a flush when the buffer is full, or > when sufficient amount of data has been buffered in a file. > Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would > allow for connectors to have flexible policies around flushes. This would be > in addition to the time interval based flushes that are controlled with > {{offset.flush.interval.ms}}, for which the clock should be reset when any > kind of flush happens. > We should probably also support requesting flushes via the > {{SourceTaskContext}} for consistency though a use-case doesn't come to mind > off the bat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)