Hi Faye,
the problem lies in the wrong design of JDK's java.sql.Timestamp. You
can also find a nice summary in the answer here [1]. java.sql.Timestamp
is timezone dependent. Internally, we subtract/normalize the timezone
and work with the UNIX timestamp. Beginning from Flink 1.9 we are using
the new Java time classes such as LocalDateTime. Until then it would be
great to set the JVM's timezone to UTC or make remove the timezone both
in sources and sinks.
Regards,
Timo
[1] https://stackoverflow.com/a/43883203/806430
On 11.08.20 22:55, Faye Pressly wrote:
Hello,
I am having an issue with Event time stamp and timezone with Flink 1.8
(1.8 because I need it to work on AWS Kinesis)
I have a very simple pipeline that read events from a stream, transform
to a Table does a small window (Tumblin 1 min) aggregation and groupby,
transforms back to a stream and sink the result.
I have created a small Integration test where I pass a custom Source and
Custom Sink Collector so that I can verify the results.
I go inspired by this project to do the testing,
https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/StreamingJobIntegrationTest.java
This is a snipped from my Integration Test. 0L is the event timestamp
that will be used by the flink job. So here I'm firing all the events at
1970-01-01 00:00:00
ParallelSourceFunction<List<ObjectNode>> source =
new ParallelCollectionSource(
Arrays.asList(
new Tuple2<>(1L, new Event("1", "111", "impression", "A", 0L)),
new Tuple2<>(1L, new Event("1", "111", "click", "A", 0L)),
new Tuple2<>(1L, new Event("2", "111", "impression", "A", 0L)),
new Tuple2<>(1L, new Event("2", "111", "click", "A", 0L)),
new Tuple2<>(1L, new Event("3", "111", "impression", "A", 0L)),
new Tuple2<>(1L, new Event("4", "111", "impression", "A", 0L)),
new Tuple2<>(1L, new Event("4", "111", "click", "A", 0L))));
CollectingSink sink =new CollectingSink();
new Pipeline().execute(source, sink);
(https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/utils/ParallelCollectionSource.java)
My Flink pipeline uses a Tumbling Window of 1 minute and I add to the
objects (which has a filed java.sql.Timestamp) the window.rowTime which
is written to the sink.
When I check the results in sink.result all the timestamp.getTime() are
using my computer timezone (gtm +1).
For example the first window which is 1970-01-01 00:00:59.999 has as
timestamp.getTime() of `-3540001`
I expected it to beĀ 59999 which would really corespong to 1970-01-01
00:00:59.999.
Is this a bug or do I have to setup something in order for Flink to
consider all the timestamp UTC ?
Thank you!