Hi,

Taking the example you mentioned of using RichFlatMapFunction and in the open() 
reading a file.
Would this open function be executed on each node where the RichFlatMapFunction 
gets executed? (I have done some tests and I would get the feeling it does – 
but I wanted to double - check )
If so, would this mean that the same data will be loaded multiple times on each 
parallel instance? Is there anyway, this can be prevented and the data to be 
hashed and partitioned somehow across nodes?

Would using the operator state help?:
“
OperatorState<MyList<String>> dataset;
”
I would be curious in this case how could the open function look like to 
initialize the data for this operator state:


I have tried to just read a file and write it into the dataset, but I 
encountered a strange behavior that would look like the flatmap function gets 
executed before the open function, which leads to using an empty dataset in the 
flatmap function while when this finish executing the dataset gets loaded. Is 
this an error or I am doing something wrong?



Dr. Radu Tudoran
Research Engineer
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com<http://www.huawei.com/>
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: Tuesday, December 01, 2015 6:21 PM
To: user@flink.apache.org
Cc: Goetz Brasche
Subject: Re: Question about DataStream serialization

Hi Radu,

both emails reached the mailing list :)

You can not reference to DataSets or DataStreams from inside user defined 
functions. Both are just abstractions for a data set or stream, so the elements 
are not really inside the set.

We don't have any support for mixing the DataSet and DataStream API.

For your use case, I would recommend you to use a RichFlatMapFunction and in 
the open() call read the text file.



On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran 
<radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>> wrote:

Hello,

I am not sure if this message was received on the user list, if so I apologies 
for duplicate messages

I have the following scenario


•         Reading a fixed set
DataStream<String> fixedset = env.readtextFile(…

•         Reading a continuous stream of data
DataStream<String> stream = ….

I would need that for each event read from the continuous stream to make some 
operations onit and on the fixedsettoghether


I have tried something like

final myObject.referenceStaticSet = fixedset;
stream.map(new MapFunction<String, String>() {
                     @Override
                     public String map(String arg0) throws Exception {

                           //for example:   final string2add = arg0;
                                                                //the goal of 
below function would be to add the string2add to the fixedset
                           myObject.referenceStaticSet = 
myObject.referenceStaticSet.flatMap(new FlatMapFunction<String, String>() {

                                  @Override
                                  public void flatMap(String arg0, 
Collector<String> arg1)
                                                                                
//for example adding to the fixed set also the string2add object:   
arg1.collect(string2add);
                                                                                
}
…
}

However,  I get an exception (Exception in thread "main" 
org.apache.flink.api.common.InvalidProgramException: ) that object is not 
serializable (Object MyClass$3@a71081 not serializable )

Looking into this I see that the issues is that the DataStream<> is not 
serializable. What would be the solution to this issue?

As I mentioned before, I would like that for each event from the continuous 
stream to use the initial fixed set, add the event to it and apply an operation.
Stephan was mentioning at some point some possibility to create a DataSet and 
launch a batch processing while operating in stream mode– in case this is 
possible, can you give me a reference for it, because it might be the good 
solution to  use in case. I am thinking that I could keep the fixed set as a 
DataSet and as each new event comes, transform it into a dataset and then join 
with reference set and apply an operation

Regards,




Dr. Radu Tudoran
Research Engineer
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com<mailto:radu.tudo...@huawei.com>
Mobile: +49 15209084330<tel:%2B49%2015209084330>
Telephone: +49 891588344173<tel:%2B49%20891588344173>

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com<http://www.huawei.com/>
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

From: Vieru, Mihail 
[mailto:mihail.vi...@zalando.de<mailto:mihail.vi...@zalando.de>]
Sent: Tuesday, December 01, 2015 4:55 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: NPE with Flink Streaming from Kafka

Hi,
we get the following NullPointerException after ~50 minutes when running a 
streaming job with windowing and state that reads data from Kafka and writes 
the result to local FS.
There are around 170 million messages to be processed, Flink 0.10.1 stops at ~8 
million.
Flink runs locally, started with the "start-cluster-streaming.sh" script.

12/01/2015 15:06:24    Job execution switched to status RUNNING.
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to 
SCHEDULED
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to 
DEPLOYING
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at 
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to SCHEDULED
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at 
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to DEPLOYING
12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched to 
RUNNING
12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at 
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to RUNNING
12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at 
main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to CANCELED
12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1) switched to 
FAILED
java.lang.Exception
    at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at 
org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
    at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
    at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
    at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
    at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
    at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
    at 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
    at 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
Any ideas on what could cause this behaviour?

Best,
Mihail

Reply via email to