Hi Fabian,

Thank you for answering the question. However, I think it doesn't explain my 
situation. the source tasks' watermark are set to 10 ms behind the system time. 
Assigners allowing a fixed amount of 
lateness<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness>
 is used. So even the slowest source task is not that slow.


Best

Yan




________________________________
From: Fabian Hueske <fhue...@gmail.com>
Sent: Wednesday, March 14, 2018 3:28 AM
To: Yan Zhou [FDS Science]
Cc: user@flink.apache.org
Subject: Re: flink sql: "slow" performance on Over Window aggregation with time 
attribute set to event time

Hi,

Flink advances watermarks based on all parallel source tasks. If one of the 
source tasks lags behind the others, the event time progresses as determined by 
the "slowest" source task.
Hence, records ingested from a faster task might have a higher processing 
latency.

Best, Fabian

2018-03-14 1:36 GMT+01:00 Yan Zhou [FDS Science] 
<yz...@coupang.com<mailto:yz...@coupang.com>>:

Hi,

I am using flink sql in my application. It simply reads records from kafka 
source, converts to table, then runs an query to have over window aggregation 
for each record. Time lag watermark assigner with 10ms time lag is used.

The performance is not ideal. the end-to-end latency, which is the difference 
between the time an record arrives in flink source and the time the record 
arrives in flink sink, is around 250ms (median). Please note that my query, 
which is over window aggregation, will generate one result for each input 
record. I was expecting it to be less then 100ms. I increase the number of 
query to 100 times and still have same median end-to-end latency with plenty of 
CPU and memory available. It seems to me that something is holding my 
application back.

However, When I use process time as time attribute without changing anything 
else, the latency is reduced to 50ms. I understand that in general using 
process time should be faster. But for my test using event time, the time lag 
is set to only 10ms, which should mean the operators will almost immediately 
process the events after they arrives. And the classes which calculate over 
window aggregation(ProcTimeBoundedRangeOve, RowTimeBoundedRowsOver and etc...) 
basically have same logic. Why does using process_time or event_time could 
bring such big difference in end-to-end latency? And what is hold my 
application back if time attribute is set event time?

Below is my cluster and application setup and thank you for your time.


The cluster:

The cluster runs in standalone mode with 7 servers. Each server has 24 cores, 
240 GB memory. There are 1 job manager and 6 task managers. Each task manager 
is allocated with 12 cores, 120 GB memory and 6 taskmanager slots. Running hdfs 
over ssd on these servers as well.


The application:

When the event arrives flink from kafka, an ingestionTs is set for the event by 
the application. When the event arrives sink, the process latency is calculated 
as System.currentTimeMillis() - ingestionTs. The value is consider the 
end-to-end latency and recorded with histogram metric and can be view in flink 
web portal. RocksDB state backend is used. Time lag water assigner with time 
lag of 10ms is used.


Custom Source
-> Flat Map
-> Timestamps/Watermarks
-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type, 
ingestionTs, eventTs))
--HASH-->
over:( PARTITION BY: ip,
ORDER BY: eventTs,
RANGEBETWEEN 86400000 PRECEDING AND CURRENT ROW,
select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs)
-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs)
-> to: Tuple2
-> Sink: Unnamed

select id, eventTs, count(*) over (partition by id order by eventTs ranges 
between interval '24' hour preceding and current row) as cnt1 from myTable.



Reply via email to