Hi Folks, We are working on a Flink job to proccess a large amount of data coming in from a Kafka stream.
We selected Flink because the data is sometimes out of order or late, and we need to roll up the data into 30-minutes event time windows, after which we are writing it back out to an s3 bucket. We have hit a couple issues: 1) The job works fine using processing time, but when we switch to event time (almost) nothing seems to be written out. Our watermark code looks like this: ``` override def getCurrentWatermark(): Watermark = { new Watermark(System.currentTimeMillis() - maxLateness); } ``` And we are doing this: ``` val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) ``` and this: ``` .assignTimestampsAndWatermarks(new TimestampAndWatermarkAssigner(Time.minutes(30).toMilliseconds)) ``` However even though we get millions of records per hour (the vast majority of which are no more that 30 minutes late) we get like 2 - 10 records per hour written out to the s3 bucket. We are using a custom BucketingFileSink Bucketer if folks believe that is the issue I would be happy to provide that code here as well. 2) On top of all this, we would really prefer to write the records directly to Aurora in RDS rather than to an intermediate s3 bucket, but it seems that the JDBC sink connector is unsupported / doesn't exist. If this is not the case we would love to know. Thanks in advance for all the help / insight on this, Max Walker -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.