[ 
https://issues.apache.org/jira/browse/KAFKA-4161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15489179#comment-15489179
 ] 

Ewen Cheslack-Postava commented on KAFKA-4161:
----------------------------------------------

[~shikhar] Probably worth clarifying whether we're really talking about just 
flush here or offset commit as well. Flush really only exists in order to 
support offset commit (from the framework's perspective), but since you mention 
full buffers I think you might be getting at a slightly different use case for 
connectors.

In general, I think it'd actually be even better to just get rid of the idea of 
having to flush as a common operation as it hurts throughput to have to flush 
entirely to commit offsets (we are flushing the pipeline, which is never good). 
Ideally we coudl do what the framework does with source connectors and just 
track which data has been successfully delivered and use that for the majority 
of offset commits. We'd still need it for cases like shutdown where we want to 
make sure all data has been sent, but since the framework controls delivery of 
data, maybe its even better just to wait for that data to be written. The main 
reason I think we even need the explicit flush() is that some connectors may 
have *very* long delays between flushes (e.g. any object stores like S3) such 
that they need to be told directly that they need to write all their data (or 
discard it).

Was there a specific connector & scenario you were thinking about here?

> Allow connectors to request flush via the context
> -------------------------------------------------
>
>                 Key: KAFKA-4161
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4161
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Shikhar Bhushan
>            Assignee: Ewen Cheslack-Postava
>              Labels: needs-kip
>
> It is desirable to have, in addition to the time-based flush interval, volume 
> or size-based commits. E.g. a sink connector which is buffering in terms of 
> number of records may want to request a flush when the buffer is full, or 
> when sufficient amount of data has been buffered in a file.
> Having a method like say {{requestFlush()}} on the {{SinkTaskContext}} would 
> allow for connectors to have flexible policies around flushes. This would be 
> in addition to the time interval based flushes that are controlled with 
> {{offset.flush.interval.ms}}, for which the clock should be reset when any 
> kind of flush happens.
> We should probably also support requesting flushes via the 
> {{SourceTaskContext}} for consistency though a use-case doesn't come to mind 
> off the bat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to