RE: Question about DataStream serialization

2015-12-08 Thread Radu Tudoran
Hi, Is the partitioned functioned used by the ".keyBy(Object)" of the form: Object.hash % getNumberOfParallelSubtasks() ? Dr. Radu Tudoran Research Engineer IT R&D Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huaw

Re: Using memory logging in Flink

2015-12-08 Thread Filip Łęczycki
Hi, Thank you for your reply! I have made sure I restarted the TaskManager after changing config, but it didn't resolve the issue.The config is loaded as I can see the following line in the log: 09:12:2015 00:00:21,894 DEBUG org.apache.flink.configuration.GlobalConfiguration- Loading

Re: Using Kryo for serializing lambda closures

2015-12-08 Thread Stephan Ewen
That is true. If you want to look into this, there are probably two places that need adjustment: 1) The UserCodeObjectWrapper would need to be adjusted to hold a serialized object (a byte[]) for shipping and serialize that object differently (say trying Java, then falling back to Kryo). 2) The C

Re: Using memory logging in Flink

2015-12-08 Thread Stephan Ewen
Hi! That is exactly the right way to do it. Logging has to be at least INFO and the parameter "taskmanager.debug.memory.startLogThread" set to true. The log output should be under "org.apache.flink.runtime.taskmanager.TaskManager". Do you see other outputs for that class in the log? Make sure yo

Re: Using Kryo for serializing lambda closures

2015-12-08 Thread Nick Dimiduk
The point is to provide a means for user to work around nonconforming APIs. Kryo at least is extensible in that you can register additional serializers. On Tue, Dec 8, 2015 at 11:40 AM, Stephan Ewen wrote: > Actually, this should be independent of Java 8 lambdas vs Java 7 anonymous > classes. >

Re: Using Kryo for serializing lambda closures

2015-12-08 Thread Stephan Ewen
Actually, this should be independent of Java 8 lambdas vs Java 7 anonymous classes. I have been using Java 8 lambdas quite a bit with Flink. The important thing is that no non-serializable objects are in the closure. As Fabian mentioned, lazy initialization helps. Serializability is also discusse

Re: Using Kryo for serializing lambda closures

2015-12-08 Thread Nick Dimiduk
Ah, very good. I've closed my issue as a duplicate. Thanks for the reference. On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske wrote: > Hi Nick, > > thanks for pushing this and opening the JIRA issue. > > The issue came up a couple of times and a known limitation (see > FLINK-1256). > So far the w

Re: Using Kryo for serializing lambda closures

2015-12-08 Thread Fabian Hueske
Hi Nick, thanks for pushing this and opening the JIRA issue. The issue came up a couple of times and a known limitation (see FLINK-1256). So far the workaround of marking member variables as transient and initializing them in the open() method of a RichFunction has been good enough for all cases

Re: Using Kryo for serializing lambda closures

2015-12-08 Thread Nick Dimiduk
That's what I feared. IMO this is very limiting when mixing in other projects where a user does not have control over those projects' APIs. At least falling back to an extensible serialization mechanism (like Kryo) allows users to register serializers external to the types they're consuming. I ope

Using memory logging in Flink

2015-12-08 Thread Filip Łęczycki
Hi, I am trying to enable logging of memory usage on flink 0.10.0 by adding: taskmanager.debug.memory.startLogThread: true to config.yaml and setting log4j level to DEBUG however in the logs after running the job I cannot see any info regarding memory usage.My job lasted 30s so it should catch f

Re: Flink Storm

2015-12-08 Thread Maximilian Michels
Hi Naveen, Turns out I had changed the pom.xml after I checked out your code while trying to get your example working. I have found the real issue of your problem. Please make sure you have the following dependency in your pom.xml (in addition to the storm modules). org.apache.flink flin

Re: Strange behaviour of windows

2015-12-08 Thread Dawid Wysakowicz
Thanks for the explanation. That was really stupid mistake from my side. By the way, I really like the whole idea and API. Really good job! Regards Dawid 2015-12-08 12:30 GMT+01:00 Aljoscha Krettek : > Hi, > an important concept of the Flink API is that transformations do not > modify the origin

Re: Question about DataStream serialization

2015-12-08 Thread Aljoscha Krettek
Hi, it is not possible in an officially supported way. There is however a trick that you could use: You can cast the OperatorState to a KvState. This has a method setCurrentKey() that sets the key to be used when calling value() and update(). In this way you can trick the OperatorState into thin

RE: Question about DataStream serialization

2015-12-08 Thread Radu Tudoran
Hi, The state that is being loaded can very well be partitioned by keys. Assuming this scenario and that you would now that the keys go from 0 to N, is there some possibility to load and partitioned the initial data in the open function? Dr. Radu Tudoran Research Engineer IT R&D Division HUA

Re: Question about DataStream serialization

2015-12-08 Thread Aljoscha Krettek
Ah, I see what’s the problem. Operator state is scoped to the key of the incoming element. In the open() method, no element has been received yet, so the key of the incoming element is basically NULL. So the open() method initializes state for key NULL. In flatMap() we actually have a key of inc

RE: Question about DataStream serialization

2015-12-08 Thread Radu Tudoran
Hi, I attached below a function that shows the issue and that operatorstate does not have the initialized value from the open function before the flatmap is called. You can see this as the print will not show anything. If you remove on the other hand the initialization loop and put a non zero v

Re: Flink Storm

2015-12-08 Thread Maximilian Michels
Hi Naveen, In your code on GitHub, please remove the following from the WordCount file: OutputStream o; try { o = new FileOutputStream("/tmp/wordcount1.txt", true); o.write((word + " " + count.toString() + "\n").getBytes()); o.close(); } catch (IOException e) { e.printStackTrace(

Re: Using Flink with Scala 2.11 and Java 8

2015-12-08 Thread Maximilian Michels
Thanks for the stack trace, Cory. Looks like you were on the right path with the Spark issue. We will file an issue and correct it soon. Thanks, Max On Mon, Dec 7, 2015 at 8:20 PM, Stephan Ewen wrote: > Sorry, correcting myself: > > The ClosureCleaner uses Kryo's bundled ASM 4 without any reason

Re: Strange behaviour of windows

2015-12-08 Thread Aljoscha Krettek
Hi, an important concept of the Flink API is that transformations do not modify the original stream (or dataset) but return a new stream with the modifications in place. In your example the result of the extractTimestamps() call should be used for further processing. I attached your source code

Re: Question about DataStream serialization

2015-12-08 Thread Aljoscha Krettek
Hi, if the open() method is indeed not called before the first flatMap() call then this would be a bug. Could you please verify that this is the case and maybe provide an example where this is observable? Cheers, Aljoscha > On 08 Dec 2015, at 10:41, Matthias J. Sax wrote: > > Hi, > > I think

Re: Question about DataStream serialization

2015-12-08 Thread Matthias J. Sax
Hi, I think (but please someone verify) that an OperatorState is actually not required -- I think that "open()" is called after a failure and recovery, too. So you can use a regular member variable to store the data instead of an OperatorState. In case of failure, you just re-read the data as on r

Re: Using Kryo for serializing lambda closures

2015-12-08 Thread Till Rohrmann
Hi Nick, at the moment Flink uses Java serialization to ship the UDFs to the cluster. Therefore, the closures must only contain Serializable objects. The serializer registration only applies to the data which is processed by the Flink job. Thus, for the moment I would try to get rid of the ColumnI

RE: Question about DataStream serialization

2015-12-08 Thread Radu Tudoran
Hi, Thanks for the answer - it is helpful. The issue that remains is why is the open function not being executed before the flatmap to load the data in the OperatorState. I used something like - and I observe that the dataset is not initialized when being used in the flatmap function env.sock