Great! Thanks for reporting back :-) Buffer timeout of 0ms is quite aggressive. You might sending buffers of (by default) 32KB that just contain a single record. Anyway, now you know the nobs to tune the latency.
Cheers, Fabian 2018-03-15 21:00 GMT+01:00 Yan Zhou [FDS Science] <yz...@coupang.com>: > Hi Fabian, > > > Thank you for the information. > > After setting the watermark interval to 10ms and buffer timeout to 0 ms, > the end-to-end latency is reduced to 5ms. I am very happy with the result > and will go from there. > > > > Best > > Yan > ------------------------------ > *From:* Fabian Hueske <fhue...@gmail.com> > *Sent:* Thursday, March 15, 2018 11:55:12 AM > > *To:* Yan Zhou [FDS Science] > *Cc:* user@flink.apache.org > *Subject:* Re: flink sql: "slow" performance on Over Window aggregation > with time attribute set to event time > > I see... > Another issue might be the frequency with which you emit watermarks (in > case you use a periodic watermark assigner). > You can set the interval with > StreamExecutionEnvironment.getConfig.setAutoWatermarkInterval() > [1]. > However, keep in mind that each watermark is an additional record which > might add overhead. > > You can also play with the buffer timeout [2] to reduce the latency. > > Best, Fabian > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_ > timestamps_watermarks.html#with-periodic-watermarks > [2] https://ci.apache.org/projects/flink/flink-docs- > release-1.4/dev/datastream_api.html#controlling-latency > > 2018-03-15 19:36 GMT+01:00 Yan Zhou [FDS Science] <yz...@coupang.com>: > > Hi Fabian, > > > Yes, it is typically not a good idea to generate watermark based on system > time. I was setting the watermark based on system time with very little > delay to see how fast my application could process the data. All the > servers are sync with ntp and only 1ms difference with each other. What i > assume is that setting watermark in this way should have similar end-to-end > latency with using process time. > > > However, it's not the case. the end-to-end latency of application using > process time is only has 1/3 of the other(50ms vs 150ms). Why was that? > Please help me to understand. > > > Best > > Yan > > ------------------------------ > *From:* Fabian Hueske <fhue...@gmail.com> > *Sent:* Wednesday, March 14, 2018 12:02:01 PM > > *To:* Yan Zhou [FDS Science] > *Cc:* user@flink.apache.org > *Subject:* Re: flink sql: "slow" performance on Over Window aggregation > with time attribute set to event time > > Hi, > > It is typically not a good idea to generate watermarks based on system > (machine) time. Watermarks should be data driven. > As soon as the clock of one of your machines is 1 second behind the other > machines, its watermarks will also be 1 second behind and hence the > complete stream. > > Best, Fabian > > 2018-03-14 18:40 GMT+01:00 Yan Zhou [FDS Science] <yz...@coupang.com>: > > Hi Fabian, > > > Thank you for answering the question. However, I think it doesn't explain > my situation. the source tasks' watermark are set to 10 ms behind the > system time. Assigners allowing a fixed amount of lateness > <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness> > is used. So even the slowest source task is not that slow. > > > Best > > Yan > > > > > > ------------------------------ > *From:* Fabian Hueske <fhue...@gmail.com> > *Sent:* Wednesday, March 14, 2018 3:28 AM > *To:* Yan Zhou [FDS Science] > *Cc:* user@flink.apache.org > *Subject:* Re: flink sql: "slow" performance on Over Window aggregation > with time attribute set to event time > > Hi, > > Flink advances watermarks based on all parallel source tasks. If one of > the source tasks lags behind the others, the event time progresses as > determined by the "slowest" source task. > Hence, records ingested from a faster task might have a higher processing > latency. > > Best, Fabian > > 2018-03-14 1:36 GMT+01:00 Yan Zhou [FDS Science] <yz...@coupang.com>: > > Hi, > > I am using flink sql in my application. It simply reads records from kafka > source, converts to table, then runs an query to have over window > aggregation for each record. Time lag watermark assigner with 10ms time lag > is used. > > The performance is not ideal. the end-to-end latency, which is the > difference between the time an record arrives in flink source and the time > the record arrives in flink sink, is around 250ms (median). Please note > that my query, which is over window aggregation, will generate one result > for each input record. I was expecting it to be less then 100ms. I increase > the number of query to 100 times and still have same median end-to-end > latency with plenty of CPU and memory available. It seems to me that > something is holding my application back. > > However, When I use process time as time attribute without changing > anything else, the latency is reduced to 50ms. I understand that in general > using process time should be faster. But for my test using event time, the > time lag is set to only 10ms, which should mean the operators will almost > immediately process the events after they arrives. And the classes which > calculate over window aggregation(ProcTimeBoundedRangeOve, > RowTimeBoundedRowsOver and etc...) basically have same logic. Why does > using process_time or event_time could bring such big difference in > end-to-end latency? And what is hold my application back if time attribute > is set event time? > > Below is my cluster and application setup and thank you for your time. > > > *The cluster:* > > *The cluster runs in standalone mode with 7 servers. Each server has 24 > cores, 240 GB memory. There are 1 job manager and 6 task managers. Each > task manager is allocated with 12 cores, 120 GB memory and 6 taskmanager > slots. Running hdfs over ssd on these servers as well. * > > > *The application:* > > *When the event arrives flink from kafka, an ingestionTs is set for the > event by the application. When the event arrives sink, the process latency > is calculated as System.currentTimeMillis() - ingestionTs. The value is > consider the end-to-end latency and recorded with histogram metric and can > be view in flink web portal. RocksDB state backend is used. Time lag water > assigner with time lag of 10ms is used. * > > > *Custom Source * > *-> Flat Map * > *-> Timestamps/Watermarks * > *-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type, > ingestionTs, eventTs))* > *--HASH-->* > *over:( PARTITION BY: ip,* > > *ORDER BY: eventTs, * > > *RANGEBETWEEN 86400000 PRECEDING AND CURRENT ROW, * > > *select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs) * > > *-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs) * > *-> to: Tuple2 * > *-> Sink: Unnamed* > > *select id, eventTs, count(*) over (partition by id order by eventTs > ranges between interval '24' hour preceding and current row) as cnt1 from > myTable.* > > > > > > >