[ 
https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15536415#comment-15536415
 ] 

Dean Arnold commented on KAFKA-4161:
------------------------------------

I'm the requester for this feature. My particular need is that the sink system 
needs to commit with a fixed number of records (or as a close as possible) for 
optimal storage management/reads/analytics. But the same notion applies 
elsewhere (e.g., optimizing compression for large-ish buffers, or filling an 
HDFS chunk before writing it out, etc). THe feature also provides a mechanism 
for sink systems to dynamically adjust the frequency of commits, independent of 
the flush interval.

My primary concern is the contract of flush(): the sink connector must commit 
whatever data has been accumulated, even if its just a few records - since 
flush() implies that Kafka is free to commit/compact all accumulated records up 
to the flush(). By letting the sink connector provide feedback on what it has 
already committed, the flush interval can be made fairly large to minimize the 
small buffer flushes that might occur, but Kafka can still commit/compact at 
reasonable intervals based on the sink's reported committed offsets, and 
possibly never (or only rarely) invoke explicit flush() on the sink, assuming 
it resets the flush timer to start at the delivery time of the oldest 
uncommitted offset - sort of a sliding flush window based on the sink's 
reported commits.

> Decouple flush and offset commits
> ---------------------------------
>
>                 Key: KAFKA-4161
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4161
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Shikhar Bhushan
>            Assignee: Ewen Cheslack-Postava
>              Labels: needs-kip
>
> It is desirable to have, in addition to the time-based flush interval, volume 
> or size-based commits. E.g. a sink connector which is buffering in terms of 
> number of records may want to request a flush when the buffer is full, or 
> when sufficient amount of data has been buffered in a file.
> Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would 
> allow for connectors to have flexible policies around flushes. This would be 
> in addition to the time interval based flushes that are controlled with 
> {{offset.flush.interval.ms}}, for which the clock should be reset when any 
> kind of flush happens.
> We should probably also support requesting flushes via the 
> {{SourceTaskContext}} for consistency though a use-case doesn't come to mind 
> off the bat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to