[ 
https://issues.apache.org/jira/browse/FLINK-5018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643615#comment-15643615
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5018 at 11/7/16 9:28 AM:
---------------------------------------------------------------------

Some updates after thinking about this:

I'm not sure whether or not we should implement a strict "idle timeout", 
because that would require a {{timerService.getCurrentProcessingTime()}} call 
on every collected record (the sources already do this when programs are set to 
ingestion time, but to implement strict "idle timeout", this has to be done 
also even on event time characteristic). Seems like the only processing time 
service implementation we have now is system time calls, and 
{{System.currentTimeMillis()}} is a relatively expensive call.

Perhaps a "idle interval" setting will do ok too, where we determine a source 
to be idle if no new records have been emitted in-between 2 continuous checks. 
We won't need to keep track of the last emitted record's time for this. The 
idle status propagate will be a lot sloppier for longer "idle intervals", 
though.

I'm a bit unsure about this one. [~aljoscha] any thoughts?

Either way, I think we should let the default setting for this be 0 (i.e., 
Flink does not check idleness and emit idle watermark from sources 
automatically), as in most serious use cases all Kafka partitions should have 
data. We can document clearly that if users expect any of their partitions to 
possibly halt to send data, they should set a reasonable value.


was (Author: tzulitai):
Some updates after thinking about this:

I'm not sure whether or not we should implement a strict "idle timeout", 
because that would require a {{timerService.getCurrentProcessingTime()}} call 
on every collected record (the sources already do this when programs are set to 
ingestion time, but to implement strict "idle timeout", even on event time 
characteristic this has to happen). Seems like the only processing time service 
implementation we have now is system time calls, and 
{{System.currentTimeMillis()}} is a relatively expensive call.

Perhaps a "idle interval" setting will do ok too, where we determine a source 
to be idle if no new records have been emitted in-between 2 continuous checks. 
We won't need to keep track of the last emitted record's time for this. The 
idle status propagate will be a lot sloppier for longer "idle intervals", 
though.

I'm a bit unsure about this one. [~aljoscha] any thoughts?

Either way, I think we should let the default setting for this be 0 (i.e., 
Flink does not check idleness and emit idle watermark from sources 
automatically), as in most serious use cases all Kafka partitions should have 
data. We can document clearly that if users expect any of their partitions to 
possibly halt to send data, they should set a reasonable value.

> User configurable source idle timeout to work with WatermarkStatus emitting
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-5018
>                 URL: https://issues.apache.org/jira/browse/FLINK-5018
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>            Reporter: Tzu-Li (Gordon) Tai
>             Fix For: 1.2.0
>
>
> There are 2 cases where sources are considered idle and should emit an idle 
> {{WatermarkStatus}} downstream, taking Kafka consumer as example:
> - The source instance was not assigned any partitions
> - The source instance was assigned partitions, but they currently don't have 
> any data.
> For the second case, we can only consider it idle after a timeout threshold. 
> It would be good to make this timeout user configurable besides a default 
> value.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to