[ https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15536415#comment-15536415 ]
Dean Arnold commented on KAFKA-4161: ------------------------------------ I'm the requester for this feature. My particular need is that the sink system needs to commit with a fixed number of records (or as a close as possible) for optimal storage management/reads/analytics. But the same notion applies elsewhere (e.g., optimizing compression for large-ish buffers, or filling an HDFS chunk before writing it out, etc). THe feature also provides a mechanism for sink systems to dynamically adjust the frequency of commits, independent of the flush interval. My primary concern is the contract of flush(): the sink connector must commit whatever data has been accumulated, even if its just a few records - since flush() implies that Kafka is free to commit/compact all accumulated records up to the flush(). By letting the sink connector provide feedback on what it has already committed, the flush interval can be made fairly large to minimize the small buffer flushes that might occur, but Kafka can still commit/compact at reasonable intervals based on the sink's reported committed offsets, and possibly never (or only rarely) invoke explicit flush() on the sink, assuming it resets the flush timer to start at the delivery time of the oldest uncommitted offset - sort of a sliding flush window based on the sink's reported commits. > Decouple flush and offset commits > --------------------------------- > > Key: KAFKA-4161 > URL: https://issues.apache.org/jira/browse/KAFKA-4161 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect > Reporter: Shikhar Bhushan > Assignee: Ewen Cheslack-Postava > Labels: needs-kip > > It is desirable to have, in addition to the time-based flush interval, volume > or size-based commits. E.g. a sink connector which is buffering in terms of > number of records may want to request a flush when the buffer is full, or > when sufficient amount of data has been buffered in a file. > Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would > allow for connectors to have flexible policies around flushes. This would be > in addition to the time interval based flushes that are controlled with > {{offset.flush.interval.ms}}, for which the clock should be reset when any > kind of flush happens. > We should probably also support requesting flushes via the > {{SourceTaskContext}} for consistency though a use-case doesn't come to mind > off the bat. -- This message was sent by Atlassian JIRA (v6.3.4#6332)