Great! Thanks for reporting back :-)

Buffer timeout of 0ms is quite aggressive. You might sending buffers of (by
default) 32KB that just contain a single record.
Anyway, now you know the nobs to tune the latency.

Cheers, Fabian

2018-03-15 21:00 GMT+01:00 Yan Zhou [FDS Science] <yz...@coupang.com>:

> Hi Fabian,
>
>
> Thank you for the information.
>
> After setting the watermark interval to 10ms and buffer timeout to 0 ms,
> the end-to-end latency is reduced to 5ms. I am very happy with the result
> and will go from there.
>
>
>
> Best
>
> Yan
> ------------------------------
> *From:* Fabian Hueske <fhue...@gmail.com>
> *Sent:* Thursday, March 15, 2018 11:55:12 AM
>
> *To:* Yan Zhou [FDS Science]
> *Cc:* user@flink.apache.org
> *Subject:* Re: flink sql: "slow" performance on Over Window aggregation
> with time attribute set to event time
>
> I see...
> Another issue might be the frequency with which you emit watermarks (in
> case you use a periodic watermark assigner).
> You can set the interval with 
> StreamExecutionEnvironment.getConfig.setAutoWatermarkInterval()
> [1].
> However, keep in mind that each watermark is an additional record which
> might add overhead.
>
> You can also play with the buffer timeout [2] to reduce the latency.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_
> timestamps_watermarks.html#with-periodic-watermarks
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/datastream_api.html#controlling-latency
>
> 2018-03-15 19:36 GMT+01:00 Yan Zhou [FDS Science] <yz...@coupang.com>:
>
> Hi Fabian,
>
>
> Yes, it is typically not a good idea to generate watermark based on system
> time. I was setting the watermark based on system time with very little
> delay to see how fast my application could process the data. All the
> servers are sync with ntp and only 1ms difference with each other. What i
> assume is that setting watermark in this way should have similar end-to-end
> latency with using process time.
>
>
> However, it's not the case. the end-to-end latency of application using
> process time is only has 1/3 of the other(50ms vs 150ms).  Why was that?
> Please help me to understand.
>
>
> Best
>
> Yan
>
> ------------------------------
> *From:* Fabian Hueske <fhue...@gmail.com>
> *Sent:* Wednesday, March 14, 2018 12:02:01 PM
>
> *To:* Yan Zhou [FDS Science]
> *Cc:* user@flink.apache.org
> *Subject:* Re: flink sql: "slow" performance on Over Window aggregation
> with time attribute set to event time
>
> Hi,
>
> It is typically not a good idea to generate watermarks based on system
> (machine) time. Watermarks should be data driven.
> As soon as the clock of one of your machines is 1 second behind the other
> machines, its watermarks will also be 1 second behind and hence the
> complete stream.
>
> Best, Fabian
>
> 2018-03-14 18:40 GMT+01:00 Yan Zhou [FDS Science] <yz...@coupang.com>:
>
> Hi Fabian,
>
>
> Thank you for answering the question. However, I think it doesn't explain
> my situation. the source tasks' watermark are set to 10 ms behind the
> system time. Assigners allowing a fixed amount of lateness
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness>
> is used. So even the slowest source task is not that slow.
>
>
> Best
>
> Yan
>
>
>
>
>
> ------------------------------
> *From:* Fabian Hueske <fhue...@gmail.com>
> *Sent:* Wednesday, March 14, 2018 3:28 AM
> *To:* Yan Zhou [FDS Science]
> *Cc:* user@flink.apache.org
> *Subject:* Re: flink sql: "slow" performance on Over Window aggregation
> with time attribute set to event time
>
> Hi,
>
> Flink advances watermarks based on all parallel source tasks. If one of
> the source tasks lags behind the others, the event time progresses as
> determined by the "slowest" source task.
> Hence, records ingested from a faster task might have a higher processing
> latency.
>
> Best, Fabian
>
> 2018-03-14 1:36 GMT+01:00 Yan Zhou [FDS Science] <yz...@coupang.com>:
>
> Hi,
>
> I am using flink sql in my application. It simply reads records from kafka
> source, converts to table, then runs an query to have over window
> aggregation for each record. Time lag watermark assigner with 10ms time lag
> is used.
>
> The performance is not ideal. the end-to-end latency, which is the
> difference between the time an record arrives in flink source and the time
> the record arrives in flink sink, is around 250ms (median). Please note
> that my query, which is over window aggregation, will generate one result
> for each input record. I was expecting it to be less then 100ms. I increase
> the number of query to 100 times and still have same median end-to-end
> latency with plenty of CPU and memory available. It seems to me that
> something is holding my application back.
>
> However, When I use process time as time attribute without changing
> anything else, the latency is reduced to 50ms. I understand that in general
> using process time should be faster. But for my test using event time, the
> time lag is set to only 10ms, which should mean the operators will almost
> immediately process the events after they arrives. And the classes which
> calculate over window aggregation(ProcTimeBoundedRangeOve,
> RowTimeBoundedRowsOver and etc...) basically have same logic. Why does
> using process_time or event_time could bring such big difference in
> end-to-end latency? And what is hold my application back if time attribute
> is set event time?
>
> Below is my cluster and application setup and thank you for your time.
>
>
> *The cluster:*
>
> *The cluster runs in standalone mode with 7 servers. Each server has 24
> cores, 240 GB memory. There are 1 job manager and 6 task managers. Each
> task manager is allocated with 12 cores, 120 GB memory and 6 taskmanager
> slots. Running hdfs over ssd on these servers as well. *
>
>
> *The application:*
>
> *When the event arrives flink from kafka, an ingestionTs is set for the
> event by the application. When the event arrives sink, the process latency
> is calculated as System.currentTimeMillis() - ingestionTs. The value is
> consider the end-to-end latency and recorded with histogram metric and can
> be view in flink web portal. RocksDB state backend is used. Time lag water
> assigner with time lag of 10ms is used. *
>
>
> *Custom Source *
> *-> Flat Map *
> *-> Timestamps/Watermarks *
> *-> (from: (id, ip, type, ingestionTs, eventTs) -> select: (id, ip, type,
> ingestionTs, eventTs))*
> *--HASH-->*
> *over:( PARTITION BY: ip,*
>
> *ORDER BY: eventTs, *
>
> *RANGEBETWEEN 86400000 PRECEDING AND CURRENT ROW, *
>
> *select: (id, ip, eventTs, COUNT(*) AS w0$o0), ingestionTs) *
>
> *-> select: (id, eventTs, w0$o0 AS CNT), ingestionTs) *
> *-> to: Tuple2 *
> *-> Sink: Unnamed*
>
> *select id, eventTs, count(*) over (partition by id order by eventTs
> ranges between interval '24' hour preceding and current row) as cnt1 from
> myTable.*
>
>
>
>
>
>
>

Reply via email to