Re: Flink JMX Metrics

2018-09-10 Thread Yee-Ning Cheng
Might be related to this https://issues.apache.org/jira/browse/FLINK-10135 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

KafkaTopicPartition internal class treated as generic type serialization

2018-09-02 Thread Yee-Ning Cheng
I disabled generic type serialization via env.getConfig.disableGenericTypes() and got the following exception when running my job on a standalone cluster. Caused by: java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type org.apache.flink.stream

Re: WriteTimeoutException in Cassandra sink kill the job

2018-09-01 Thread Yee-Ning Cheng
I have never used the Flink Cassandra Sink so this may or may not work, but have you tried creating your own custom retry policy? https://docs.datastax.com/en/developer/java-driver/3.4/manual/retries/ Returning an ignore when onWriteTimeout -- Sent from: http://apache-flink-user-mailing-list-a

Re: ODP: API for delayed/scheduled interval input source for integrationtests

2018-09-01 Thread Yee-Ning Cheng
I haven't tried it yet, but I saw flink-spector which seems to actually do what I want.. https://github.com/ottogroup/flink-spector/wiki/Defining-Input Although having it as part of the normal API would be nice as that library is already out of date (1.5.0) -- Sent from: http://apache-flink-us

Re: API for delayed/scheduled interval input source for integration tests

2018-08-31 Thread Yee-Ning Cheng
I was able to use the AbstractStreamOperatorTestHarness to write more of a unit test for windowing operators. However, I'm still trying to figure out a way to have a "delayed iterator". I tried implementing an iterator that Thread.sleeps for the interval and passed it to the stream as an input, b

API for delayed/scheduled interval input source for integration tests

2018-08-31 Thread Yee-Ning Cheng
Hi, I've been trying to write an integration test for my Flink application that has managed state with TTL expiration. However, I can't seem to find a good way to create a stream of elements that waits X amount of time before each element is sent in. I'm using the simple API: val stream = env.f