keyBy called twice. Second time, INetAddress and Array[Byte] are empty

2017-01-07 Thread Jonas
Hi! I have two streams that I connect and call keyBy after. I put some debugging code in the bKeySelector. Turns out, it gets called twice from different areas. Stacktrace here: https://gist.github.com/JonasGroeger/8ce218ee1c19f0639fa990f43b5f9e2b It contains 1 package which gets keyed twice fo

Re: HashMap/HashSet Serialization Issue

2017-01-07 Thread Charith Wickramarachchi
Thanks very much. Regards, Charith On Sat, Jan 7, 2017 at 3:42 AM, Jark Wu wrote: > Hi Charith, > > The LOG message has nothing to worry about. The HashMap and HashSet > members in your POJO is not a PojoType, > so they will be handled as GenericType, i.e. will be > serialized/deserialized by K

Re: Increasing parallelism skews/increases overall job processing time linearly

2017-01-07 Thread Chakravarthy varaga
Hi Stephen . Kafka version is: 0.9.0.1 the connector is flinkconsumer09 . The flatmap n coflatmap are connected by keyBy . No data is broadcasted and the data is not exploded based on the parallelism Cvp On 6 Jan 2017 20:16, "Stephan Ewen" wrote: > Hi! > > You are right, parallelism 2 should b

window function outputs two different values

2017-01-07 Thread tao xiao
Hi team, I have a requirement that wants to output two different values from a time window reduce function. Here is basic workflow 1. fetch data from Kafka 2. flow the data to a event session window. kafka source -> keyBy -> session window -> reduce 3. inside the reduce function, count the number

Re: HashMap/HashSet Serialization Issue

2017-01-07 Thread Jark Wu
Hi Charith, The LOG message has nothing to worry about. The HashMap and HashSet members in your POJO is not a PojoType, so they will be handled as GenericType, i.e. will be serialized/deserialized by Kryo. But your custom POJO type will still recognized as PojoType, i.e. will be serialized/des

Re: Cannot run using a savepoint with the same jar

2017-01-07 Thread Rami Al-Isawi
Hi Stephan, I have not change the parallelism nor the names or anything in my program. It is the same exact jar file unmodified. I have tried uid. but I faced this "UnsupportedOperationException: Cannot assign user-specified hash to intermediate node in chain. This will be supported in future