Re: Tumbling window with timestamp out-of-range events

2020-06-09 Thread Yu Yang
Please ignore this message. The issue was that a different timestamp extractor was used when the kafka source was setup. That caused the issue. On Tue, Jun 9, 2020 at 2:58 PM Yu Yang wrote: > Hi, > > > We implement a flink application that uses TumblingWindow, and uses even &g

Tumbling window with timestamp out-of-range events

2020-06-09 Thread Yu Yang
Hi, We implement a flink application that uses TumblingWindow, and uses even time as time characteristics. In the TumblingWindow's process function, we has the implementation below that checks whether the event's timestamp is in the tumbling window's timestamp range. We expected that all events

sanity checking in ProcessWindowFunction.process shows that event timestamps are out of tumbling window time range

2020-06-09 Thread Yu Yang
Hi all, We are writing an application that set TimeCharacteristic.EventTime as time characteristic. When we implement the ProcessWindowFunction for a TumblingWindow, we added code as below to check if the timestamp of events is in the tumbling window time range. To our surprise, we found that the

Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

2020-06-01 Thread Yu Yang
SE, and the operator should be able to count the > number of NULL_TBASE received. > > Best, > Yun > > > ------Original Mail -- > *Sender:*Yu Yang > *Send Date:*Mon Jun 1 06:37:35 2020 > *Recipients:*user > *Subject:*best p

best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

2020-05-31 Thread Yu Yang
Hi all, To deal with corrupted messages that can leak into the data source once in a while, we implement a custom DefaultKryoSerializer class as below that catches exceptions. The custom serializer returns null in read(...) method when it encounters exception in reading. With this implementation,

checkpoint _metadata file has >20x different in size among different check-points

2020-03-04 Thread Yu Yang
Hi all, We have a flink job that does check-pointing per 10 minutes. We noticed that for the check-points of this job, the _metadata file size can vary a lot. In some checkpoint, we observe that _metadata file size was >900MB, while in some other check-points of the same job, the _metadata file s

Re: best practices on getting flink job logs from Hadoop history server?

2019-09-05 Thread Yu Yang
y will not be removed. > > Best > Yun Tang > > -- > *From:* Zhu Zhu > *Sent:* Friday, August 30, 2019 16:24 > *To:* Yu Yang > *Cc:* user > *Subject:* Re: best practices on getting flink job logs from Hadoop > history server? > > Hi Yu, > > Re

best practices on getting flink job logs from Hadoop history server?

2019-08-30 Thread Yu Yang
Hi, We run flink jobs through yarn on hadoop clusters. One challenge that we are facing is to simplify flink job log access. The flink job logs can be accessible using "yarn logs $application_id". That approach has a few limitations: 1. It is not straightforward to find yarn application id ba

metrics for checking whether a broker throttles requests based on its quota limits?

2019-06-20 Thread Yu Yang
Hi, Recently we enabled Kafka quota management for our Kafka clusters. We are looking for Kafka metrics that can be used for alerting on whether a Kafka broker throttles requests based on quota. There are a few throttle related metrics on Kafka. But none of them can tell accurately whether the br

Re: can flink sql handle udf-generated timestamp field

2019-06-06 Thread Yu Yang
jects/flink/flink-docs-release-1.8/dev/table/common.html#mapping-of-data-types-to-table-schema > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1 > > Am Do., 6. Juni 2019 um 08:48 Uhr schrieb

Re: can flink sql handle udf-generated timestamp field

2019-06-05 Thread Yu Yang
ROOGE); FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer(“customer_orders”, deserializationSchema, m10n05Properties); tableEnv.registerDataStream(“orders”, kafkaConsumer); Regards, -Yu On Wed, Jun 5, 2019 at 11:15 PM JingsongLee wrote: > Hi @Yu Yang: > Time-based operations such as

Re: can flink sql handle udf-generated timestamp field

2019-06-05 Thread Yu Yang
+flink-user On Wed, Jun 5, 2019 at 9:58 AM Yu Yang wrote: > Thanks for the reply! In flink-table-planner, TimeIndicatorTypeInfo is an > internal class that cannot be referenced from application. I got "cannot > find symbol" error when I tried to use it. I hav

can flink sql handle udf-generated timestamp field

2019-06-04 Thread Yu Yang
Hi, I am trying to use Flink SQL to do aggregation on a hopping window. In the data stream, we store the timestamp in long type. So I wrote a UDF 'FROM_UNIXTIME' to convert long to Timestamp type. public static class TimestampModifier extends ScalarFunction { public Timestamp eval(long t) {

read a finite number of messages from Kafka using Kafka connector without extending it?

2019-02-15 Thread Yu Yang
Hi, We are considering to use Flink SQL for ad hoc data analytics on real-time Kafka data, and want to limit the queries to process data in the past 5-10 minutes. To achieve that, one possible approach is to extend the current Kafka connect to have it only read messages in a given period of time t