Hi,
if the open() method is indeed not called before the first flatMap() call then 
this would be a bug. Could you please verify that this is the case and maybe 
provide an example where this is observable?

Cheers,
Aljoscha
> On 08 Dec 2015, at 10:41, Matthias J. Sax <mj...@apache.org> wrote:
> 
> Hi,
> 
> I think (but please someone verify) that an OperatorState is actually
> not required -- I think that "open()" is called after a failure and
> recovery, too. So you can use a regular member variable to store the
> data instead of an OperatorState. In case of failure, you just re-read
> the data as on regular start-up.
> 
> -Matthias
> 
> 
> On 12/08/2015 09:38 AM, Radu Tudoran wrote:
>> Hi,
>> 
>> Thanks for the answer - it is helpful.
>> The issue that remains is why is the open function not being executed before 
>> the flatmap to load the data in the OperatorState. 
>> 
>> I used something like - and I observe that the dataset is not initialized 
>> when being used in the flatmap function
>> 
>> env.socketTextStream
>> .map() -> to transform data to a Tuple1<String>
>> .keyby(0) -> to enable the usage of the operatorState which I saw requires 
>> keyed structured
>> .flatmap(RichFlatMapFunction<Tuple1<String>, String>   -> the function
>> {
>> private OperatorState<String> dataset;
>> @Override
>> public void flatMap(
>> {
>> Dataset -> use ...is empty
>> }
>> @Override
>> public void open(
>> {
>> dataset -> load 
>> }
>> })
>> 
>> 
>> 
>> Dr. Radu Tudoran
>> Research Engineer
>> IT R&D Division
>> 
>> 
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> European Research Center
>> Riesstrasse 25, 80992 München
>> 
>> E-mail: radu.tudo...@huawei.com
>> Mobile: +49 15209084330
>> Telephone: +49 891588344173
>> 
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>> This e-mail and its attachments contain confidential information from 
>> HUAWEI, which is intended only for the person or entity whose address is 
>> listed above. Any use of the information contained herein in any way 
>> (including, but not limited to, total or partial disclosure, reproduction, 
>> or dissemination) by persons other than the intended recipient(s) is 
>> prohibited. If you receive this e-mail in error, please notify the sender by 
>> phone or email immediately and delete it!
>> 
>> -----Original Message-----
>> From: Matthias J. Sax [mailto:mj...@apache.org] 
>> Sent: Tuesday, December 08, 2015 8:42 AM
>> To: user@flink.apache.org
>> Subject: Re: Question about DataStream serialization
>> 
>> Hi Radu,
>> 
>> you are right. The open() method is called for each parallel instance of a 
>> rich function. Thus, if all instanced use the same code, you might read the 
>> same data multiple times.
>> 
>> The easiest was to distinguish different instanced within open() is to user 
>> the RuntimeContext. If offers two methods  "int 
>> getNumberOfParallelSubtasks()" and "int getIndexOfThisSubtask()" that you 
>> can use to compute your own partitioning within open().
>> 
>> For example (just a sketch):
>> 
>> @Override
>> public void open(Configuration parameters) throws Exception {
>>  RuntimeContext context = super.getRuntimeContext();
>>  int dop = context.getNumberOfParallelSubtasks();
>>  int idx = context.getIndexOfThisSubtask();
>> 
>>  // open file
>>  // get size of file in bytes
>> 
>>  // seek to partition #idx:
>>  long seek = fileSize * idx / dop;
>> 
>>  // read "fileSize/dop" bytes
>> }
>> 
>> Hope this helps.
>> 
>> -Matthias
>> 
>> 
>> On 12/08/2015 04:28 AM, Radu Tudoran wrote:
>>> Hi,
>>> 
>>> 
>>> 
>>> Taking the example you mentioned of using RichFlatMapFunction and in 
>>> the
>>> open() reading a file.
>>> 
>>> Would this open function be executed on each node where the 
>>> RichFlatMapFunction gets executed? (I have done some tests and I would 
>>> get the feeling it does – but I wanted to double - check )
>>> 
>>> If so, would this mean that the same data will be loaded multiple 
>>> times on each parallel instance? Is there anyway, this can be 
>>> prevented and the data to be hashed and partitioned somehow across nodes?
>>> 
>>> 
>>> 
>>> Would using the operator state help?:
>>> 
>>> “
>>> 
>>> OperatorState*<*MyList<String>*>*dataset*;*
>>> 
>>> ”
>>> 
>>> I would be curious in this case how could the open function look like 
>>> to initialize the data for this operator state:
>>> 
>>> 
>>> 
>>> 
>>> 
>>> I have tried to just read a file and write it into the dataset, but I 
>>> encountered a strange behavior that would look like the flatmap 
>>> function gets executed before the open function, which leads to using 
>>> an empty dataset in the flatmap function while when this finish 
>>> executing the dataset gets loaded. Is this an error or I am doing something 
>>> wrong?
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Dr. Radu Tudoran
>>> 
>>> Research Engineer
>>> 
>>> IT R&D Division
>>> 
>>> 
>>> 
>>> cid:image007.jpg@01CD52EB.AD060EE0
>>> 
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>> 
>>> European Research Center
>>> 
>>> Riesstrasse 25, 80992 München
>>> 
>>> 
>>> 
>>> E-mail: _radu.tudoran@huawei.com_
>>> 
>>> Mobile: +49 15209084330
>>> 
>>> Telephone: +49 891588344173
>>> 
>>> 
>>> 
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com 
>>> <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court 
>>> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, 
>>> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, 
>>> HRB 56063,
>>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>>> 
>>> This e-mail and its attachments contain confidential information from 
>>> HUAWEI, which is intended only for the person or entity whose address 
>>> is listed above. Any use of the information contained herein in any 
>>> way (including, but not limited to, total or partial disclosure, 
>>> reproduction, or dissemination) by persons other than the intended
>>> recipient(s) is prohibited. If you receive this e-mail in error, 
>>> please notify the sender by phone or email immediately and delete it!
>>> 
>>> 
>>> 
>>> *From:*Robert Metzger [mailto:rmetz...@apache.org]
>>> *Sent:* Tuesday, December 01, 2015 6:21 PM
>>> *To:* user@flink.apache.org
>>> *Cc:* Goetz Brasche
>>> *Subject:* Re: Question about DataStream serialization
>>> 
>>> 
>>> 
>>> Hi Radu,
>>> 
>>> 
>>> 
>>> both emails reached the mailing list :)
>>> 
>>> 
>>> 
>>> You can not reference to DataSets or DataStreams from inside user 
>>> defined functions. Both are just abstractions for a data set or 
>>> stream, so the elements are not really inside the set.
>>> 
>>> 
>>> 
>>> We don't have any support for mixing the DataSet and DataStream API.
>>> 
>>> 
>>> 
>>> For your use case, I would recommend you to use a RichFlatMapFunction 
>>> and in the open() call read the text file.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran <radu.tudo...@huawei.com 
>>> <mailto:radu.tudo...@huawei.com>> wrote:
>>> 
>>> 
>>> 
>>> Hello,
>>> 
>>> 
>>> 
>>> I am not sure if this message was received on the user list, if so I 
>>> apologies for duplicate messages
>>> 
>>> 
>>> 
>>> I have the following scenario
>>> 
>>> 
>>> 
>>> ·         Reading a fixed set
>>> 
>>> DataStream<String> /fixedset/ = env.readtextFile(…
>>> 
>>> ·         Reading a continuous stream of data
>>> 
>>> DataStream<String> /stream/ = ….
>>> 
>>> 
>>> 
>>> I would need that for each event read from the continuous stream to 
>>> make some operations onit and on the /fixedsettoghether/
>>> 
>>> 
>>> 
>>> 
>>> 
>>> I have tried something like
>>> 
>>> 
>>> 
>>> final myObject.referenceStaticSet = fixedset;
>>> 
>>> stream.map(new MapFunction<String, String>() {
>>> 
>>>                     @Override
>>> 
>>>                     public String map(String arg0) throws Exception {
>>> 
>>> 
>>> 
>>>                           //for example:   final string2add = arg0;
>>> 
>>>                                                                //the 
>>> goal of below function would be to add the string2add to the fixedset
>>> 
>>>                           myObject.referenceStaticSet = 
>>> myObject.referenceStaticSet.flatMap(new FlatMapFunction<String, 
>>> String>() {
>>> 
>>> 
>>> 
>>>                                  @Override
>>> 
>>>                                  public void flatMap(String arg0, 
>>> Collector<String> arg1)
>>> 
>>> 
>>> //for example adding to the fixed set also the string2add object:  
>>> arg1.collect(string2add);
>>> 
>>> 
>>>                                }
>>> 
>>> …
>>> 
>>> }
>>> 
>>> 
>>> 
>>> However,  I get an exception (Exception in thread "main"
>>> _org.apache.flink.api.common.InvalidProgramException_: ) that object 
>>> is not serializable (Object MyClass$3@a71081 not serializable )
>>> 
>>> 
>>> 
>>> Looking into this I see that the issues is that the DataStream<> is 
>>> not serializable. What would be the solution to this issue?
>>> 
>>> 
>>> 
>>> As I mentioned before, I would like that for each event from the 
>>> continuous stream to use the initial fixed set, add the event to it 
>>> and apply an operation.
>>> 
>>> Stephan was mentioning at some point some possibility to create a 
>>> DataSet and launch a batch processing while operating in stream mode– 
>>> in case this is possible, can you give me a reference for it, because 
>>> it might be the good solution to  use in case. I am thinking that I 
>>> could keep the fixed set as a DataSet and as each new event comes, 
>>> transform it into a dataset and then join with reference set and apply 
>>> an operation
>>> 
>>> 
>>> 
>>> Regards,
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Dr. Radu Tudoran
>>> 
>>> Research Engineer
>>> 
>>> IT R&D Division
>>> 
>>> 
>>> 
>>> cid:image007.jpg@01CD52EB.AD060EE0
>>> 
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>> 
>>> European Research Center
>>> 
>>> Riesstrasse 25, 80992 München
>>> 
>>> 
>>> 
>>> E-mail: _radu.tudo...@huawei.com <mailto:radu.tudo...@huawei.com>_
>>> 
>>> Mobile: +49 15209084330 <tel:%2B49%2015209084330>
>>> 
>>> Telephone: +49 891588344173 <tel:%2B49%20891588344173>
>>> 
>>> 
>>> 
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com 
>>> <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court 
>>> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, 
>>> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, 
>>> HRB 56063,
>>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>>> 
>>> This e-mail and its attachments contain confidential information from 
>>> HUAWEI, which is intended only for the person or entity whose address 
>>> is listed above. Any use of the information contained herein in any 
>>> way (including, but not limited to, total or partial disclosure, 
>>> reproduction, or dissemination) by persons other than the intended
>>> recipient(s) is prohibited. If you receive this e-mail in error, 
>>> please notify the sender by phone or email immediately and delete it!
>>> 
>>> 
>>> 
>>> *From:*Vieru, Mihail [mailto:mihail.vi...@zalando.de 
>>> <mailto:mihail.vi...@zalando.de>]
>>> *Sent:* Tuesday, December 01, 2015 4:55 PM
>>> *To:* user@flink.apache.org <mailto:user@flink.apache.org>
>>> *Subject:* NPE with Flink Streaming from Kafka
>>> 
>>> 
>>> 
>>> Hi,
>>> 
>>> we get the following NullPointerException after ~50 minutes when 
>>> running a streaming job with windowing and state that reads data from 
>>> Kafka and writes the result to local FS.
>>> 
>>> There are around 170 million messages to be processed, Flink 0.10.1 
>>> stops at ~8 million.
>>> 
>>> Flink runs locally, started with the "start-cluster-streaming.sh" script.
>>> 
>>> 
>>> 12/01/2015 15:06:24    Job execution switched to status RUNNING.
>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>>> to SCHEDULED
>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>>> to DEPLOYING
>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to 
>>> SCHEDULED
>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to 
>>> DEPLOYING
>>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>>> to RUNNING
>>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to 
>>> RUNNING
>>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at
>>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to 
>>> CANCELED
>>> 12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1) switched
>>> to FAILED
>>> java.lang.Exception
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>>>    at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>>    at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>>    at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>    at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.NullPointerException
>>>    at
>>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>>>    at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>>>    at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>>>    at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>>>    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>>>    at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>>>    at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>>>    at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>>>    at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$Periodi
>>> cOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>>> 
>>> Any ideas on what could cause this behaviour?
>>> 
>>> 
>>> 
>>> Best,
>>> 
>>> Mihail
>>> 
>>> 
>>> 
>> 
> 

Reply via email to