Re: How to stop FlinkKafkaConsumer and make job finished?

2017-12-28 Thread Eron Wright
I believe you can extend the `KeyedDeserializationSchema` that you pass to the consumer to check for end-of-stream markers. https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.html#isEndOfStream-T- Eron On

Re: RichAsyncFunction in scala

2017-12-28 Thread Antoine Philippot
Hi Ufuk, I don't think it is possible as I use this function as a parameter of AsyncDataStream (from the scala API) which is mandatory to use with the scala DataStream. Le jeu. 28 déc. 2017 à 16:55, Ufuk Celebi a écrit : > Hey Antoine, > > isn't it possible to use the Java RichAsyncFunction f

Re: RichAsyncFunction in scala

2017-12-28 Thread Ufuk Celebi
Hey Antoine, isn't it possible to use the Java RichAsyncFunction from Scala like this: class Test extends RichAsyncFunction[Int, Int] { override def open(parameters: Configuration): Unit = super.open(parameters) override def asyncInvoke(input: Int, resultFuture: functions.async.ResultFuture

RichAsyncFunction in scala

2017-12-28 Thread Antoine Philippot
Hi, It lacks a version of RichAsyncFunction class in the scala API or the possibility to handle a class which extends AbstractRichFunction and implements AsyncFunction (from the scala API). I made a small dev on our current flink fork because we need to use the open method to add our custom metri

Re: org.apache.zookeeper.ClientCnxn, Client session timed out

2017-12-28 Thread Hao Sun
Ok, thanks for the clarification. On Thu, Dec 28, 2017 at 1:05 AM Ufuk Celebi wrote: > On Thu, Dec 28, 2017 at 12:11 AM, Hao Sun wrote: > > Thanks! Great to know I do not have to worry duplicates inside Flink. > > > > One more question, why this happens? Because TM and JM both check > leadershi

Re: How to apply patterns from a source onto another datastream?

2017-12-28 Thread Kostas Kloudas
Hi Jayant, As Dawid said, currently dynamically updating patterns is currently not supported. There is also this question raised in the dev mailing list with the subject CEP: Dynamic Patterns. I will repeat my answer here so that we are on the same page: "To support this, we need 2 features

Re: keyby() issue

2017-12-28 Thread Jinhua Luo
It's very strange, when I change the key selector to use random key, the jvm reports oom. .keyBy(new KeySelector() { public Integer getKey(MyEvent ev) { return ThreadLocalRandom.current().nextInt(1, 100);} }) Caused by: java.lang.OutOfMemoryError: Java heap space at com.esoter

Re: keyby() issue

2017-12-28 Thread Jinhua Luo
Does keyby() on field generate the same number of key as the number of uniqueness of the field? For example, if the field is valued in range {"a", "b", "c"}, then the number of keys is 3, correct? The field in my case has half of million uniqueness (ip addresses), so keyby() on field following with

Re: Cassandra POJO sink flink 1.4.0 in scala

2017-12-28 Thread shashank agarwal
Hi Micheal, Thanks for the response actually I have solved the issue. I was sharing my knowledge how I solved that. For sinking scala classes like JAVA Pojo. We have to convert that to JavaStream first but in 1.4 that already done by connector so no need to do that in 1.4 We have to write scala

Re: keyby() issue

2017-12-28 Thread Ufuk Celebi
Hey Jinhua, On Thu, Dec 28, 2017 at 9:57 AM, Jinhua Luo wrote: > The keyby() upon the field would generate unique key as the field > value, so if the number of the uniqueness is huge, flink would have > trouble both on cpu and memory. Is it considered in the design of > flink? Yes, keyBy hash pa

Re: org.apache.zookeeper.ClientCnxn, Client session timed out

2017-12-28 Thread Ufuk Celebi
On Thu, Dec 28, 2017 at 12:11 AM, Hao Sun wrote: > Thanks! Great to know I do not have to worry duplicates inside Flink. > > One more question, why this happens? Because TM and JM both check leadership > in different interval? Yes, it's not deterministic how this happens. There will also be cases

keyby() issue

2017-12-28 Thread Jinhua Luo
Hi All, I need to aggregate some field of the event, at first I use keyby(), but I found the flink performs very slow (even stop working out results) due to the number of keys is around half a million per min. So I use windowAll() instead, and flink works as expected then. The keyby() upon the fi