Hi,

I think (but please someone verify) that an OperatorState is actually
not required -- I think that "open()" is called after a failure and
recovery, too. So you can use a regular member variable to store the
data instead of an OperatorState. In case of failure, you just re-read
the data as on regular start-up.

-Matthias


On 12/08/2015 09:38 AM, Radu Tudoran wrote:
> Hi,
> 
> Thanks for the answer - it is helpful.
> The issue that remains is why is the open function not being executed before 
> the flatmap to load the data in the OperatorState. 
> 
> I used something like - and I observe that the dataset is not initialized 
> when being used in the flatmap function
> 
> env.socketTextStream
> .map() -> to transform data to a Tuple1<String>
> .keyby(0) -> to enable the usage of the operatorState which I saw requires 
> keyed structured
> .flatmap(RichFlatMapFunction<Tuple1<String>, String>   -> the function
> {
> private OperatorState<String> dataset;
> @Override
> public void flatMap(
> {
> Dataset -> use ...is empty
> }
> @Override
> public void open(
> {
> dataset -> load 
> }
> })
> 
> 
> 
> Dr. Radu Tudoran
> Research Engineer
> IT R&D Division
> 
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
> 
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, 
> which is intended only for the person or entity whose address is listed 
> above. Any use of the information contained herein in any way (including, but 
> not limited to, total or partial disclosure, reproduction, or dissemination) 
> by persons other than the intended recipient(s) is prohibited. If you receive 
> this e-mail in error, please notify the sender by phone or email immediately 
> and delete it!
> 
> -----Original Message-----
> From: Matthias J. Sax [mailto:mj...@apache.org] 
> Sent: Tuesday, December 08, 2015 8:42 AM
> To: user@flink.apache.org
> Subject: Re: Question about DataStream serialization
> 
> Hi Radu,
> 
> you are right. The open() method is called for each parallel instance of a 
> rich function. Thus, if all instanced use the same code, you might read the 
> same data multiple times.
> 
> The easiest was to distinguish different instanced within open() is to user 
> the RuntimeContext. If offers two methods  "int 
> getNumberOfParallelSubtasks()" and "int getIndexOfThisSubtask()" that you can 
> use to compute your own partitioning within open().
> 
> For example (just a sketch):
> 
> @Override
> public void open(Configuration parameters) throws Exception {
>   RuntimeContext context = super.getRuntimeContext();
>   int dop = context.getNumberOfParallelSubtasks();
>   int idx = context.getIndexOfThisSubtask();
> 
>   // open file
>   // get size of file in bytes
> 
>   // seek to partition #idx:
>   long seek = fileSize * idx / dop;
> 
>   // read "fileSize/dop" bytes
> }
> 
> Hope this helps.
> 
> -Matthias
> 
> 
> On 12/08/2015 04:28 AM, Radu Tudoran wrote:
>> Hi,
>>
>>  
>>
>> Taking the example you mentioned of using RichFlatMapFunction and in 
>> the
>> open() reading a file.
>>
>> Would this open function be executed on each node where the 
>> RichFlatMapFunction gets executed? (I have done some tests and I would 
>> get the feeling it does – but I wanted to double - check )
>>
>> If so, would this mean that the same data will be loaded multiple 
>> times on each parallel instance? Is there anyway, this can be 
>> prevented and the data to be hashed and partitioned somehow across nodes?
>>
>>  
>>
>> Would using the operator state help?:
>>
>> “
>>
>> OperatorState*<*MyList<String>*>*dataset*;*
>>
>> ”
>>
>> I would be curious in this case how could the open function look like 
>> to initialize the data for this operator state:
>>
>>  
>>
>>  
>>
>> I have tried to just read a file and write it into the dataset, but I 
>> encountered a strange behavior that would look like the flatmap 
>> function gets executed before the open function, which leads to using 
>> an empty dataset in the flatmap function while when this finish 
>> executing the dataset gets loaded. Is this an error or I am doing something 
>> wrong?
>>
>>  
>>
>>  
>>
>>  
>>
>> Dr. Radu Tudoran
>>
>> Research Engineer
>>
>> IT R&D Division
>>
>>  
>>
>> cid:image007.jpg@01CD52EB.AD060EE0
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>
>> European Research Center
>>
>> Riesstrasse 25, 80992 München
>>
>>  
>>
>> E-mail: _radu.tudoran@huawei.com_
>>
>> Mobile: +49 15209084330
>>
>> Telephone: +49 891588344173
>>
>>  
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com 
>> <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court 
>> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, 
>> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, 
>> HRB 56063,
>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>>
>> This e-mail and its attachments contain confidential information from 
>> HUAWEI, which is intended only for the person or entity whose address 
>> is listed above. Any use of the information contained herein in any 
>> way (including, but not limited to, total or partial disclosure, 
>> reproduction, or dissemination) by persons other than the intended
>> recipient(s) is prohibited. If you receive this e-mail in error, 
>> please notify the sender by phone or email immediately and delete it!
>>
>>  
>>
>> *From:*Robert Metzger [mailto:rmetz...@apache.org]
>> *Sent:* Tuesday, December 01, 2015 6:21 PM
>> *To:* user@flink.apache.org
>> *Cc:* Goetz Brasche
>> *Subject:* Re: Question about DataStream serialization
>>
>>  
>>
>> Hi Radu,
>>
>>  
>>
>> both emails reached the mailing list :)
>>
>>  
>>
>> You can not reference to DataSets or DataStreams from inside user 
>> defined functions. Both are just abstractions for a data set or 
>> stream, so the elements are not really inside the set.
>>
>>  
>>
>> We don't have any support for mixing the DataSet and DataStream API.
>>
>>  
>>
>> For your use case, I would recommend you to use a RichFlatMapFunction 
>> and in the open() call read the text file.
>>
>>  
>>
>>  
>>
>>  
>>
>> On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran <radu.tudo...@huawei.com 
>> <mailto:radu.tudo...@huawei.com>> wrote:
>>
>>  
>>
>> Hello,
>>
>>  
>>
>> I am not sure if this message was received on the user list, if so I 
>> apologies for duplicate messages
>>
>>  
>>
>> I have the following scenario
>>
>>  
>>
>> ·         Reading a fixed set
>>
>> DataStream<String> /fixedset/ = env.readtextFile(…
>>
>> ·         Reading a continuous stream of data
>>
>> DataStream<String> /stream/ = ….
>>
>>  
>>
>> I would need that for each event read from the continuous stream to 
>> make some operations onit and on the /fixedsettoghether/
>>
>>  
>>
>>  
>>
>> I have tried something like
>>
>>  
>>
>> final myObject.referenceStaticSet = fixedset;
>>
>> stream.map(new MapFunction<String, String>() {
>>
>>                      @Override
>>
>>                      public String map(String arg0) throws Exception {
>>
>>                           
>>
>>                            //for example:   final string2add = arg0;
>>
>>                                                                 //the 
>> goal of below function would be to add the string2add to the fixedset
>>
>>                            myObject.referenceStaticSet = 
>> myObject.referenceStaticSet.flatMap(new FlatMapFunction<String, 
>> String>() {
>>
>>                           
>>
>>                                   @Override
>>
>>                                   public void flatMap(String arg0, 
>> Collector<String> arg1)
>>
>>                                                                              
>>   
>> //for example adding to the fixed set also the string2add object:  
>> arg1.collect(string2add);
>>
>>                                                
>>                                 }
>>
>> …
>>
>> }
>>
>>  
>>
>> However,  I get an exception (Exception in thread "main"
>> _org.apache.flink.api.common.InvalidProgramException_: ) that object 
>> is not serializable (Object MyClass$3@a71081 not serializable )
>>
>>  
>>
>> Looking into this I see that the issues is that the DataStream<> is 
>> not serializable. What would be the solution to this issue?
>>
>>  
>>
>> As I mentioned before, I would like that for each event from the 
>> continuous stream to use the initial fixed set, add the event to it 
>> and apply an operation.
>>
>> Stephan was mentioning at some point some possibility to create a 
>> DataSet and launch a batch processing while operating in stream mode– 
>> in case this is possible, can you give me a reference for it, because 
>> it might be the good solution to  use in case. I am thinking that I 
>> could keep the fixed set as a DataSet and as each new event comes, 
>> transform it into a dataset and then join with reference set and apply 
>> an operation
>>
>>  
>>
>> Regards,
>>
>>  
>>
>>  
>>
>>  
>>
>>  
>>
>> Dr. Radu Tudoran
>>
>> Research Engineer
>>
>> IT R&D Division
>>
>>  
>>
>> cid:image007.jpg@01CD52EB.AD060EE0
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>
>> European Research Center
>>
>> Riesstrasse 25, 80992 München
>>
>>  
>>
>> E-mail: _radu.tudo...@huawei.com <mailto:radu.tudo...@huawei.com>_
>>
>> Mobile: +49 15209084330 <tel:%2B49%2015209084330>
>>
>> Telephone: +49 891588344173 <tel:%2B49%20891588344173>
>>
>>  
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com 
>> <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court 
>> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, 
>> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, 
>> HRB 56063,
>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>>
>> This e-mail and its attachments contain confidential information from 
>> HUAWEI, which is intended only for the person or entity whose address 
>> is listed above. Any use of the information contained herein in any 
>> way (including, but not limited to, total or partial disclosure, 
>> reproduction, or dissemination) by persons other than the intended
>> recipient(s) is prohibited. If you receive this e-mail in error, 
>> please notify the sender by phone or email immediately and delete it!
>>
>>  
>>
>> *From:*Vieru, Mihail [mailto:mihail.vi...@zalando.de 
>> <mailto:mihail.vi...@zalando.de>]
>> *Sent:* Tuesday, December 01, 2015 4:55 PM
>> *To:* user@flink.apache.org <mailto:user@flink.apache.org>
>> *Subject:* NPE with Flink Streaming from Kafka
>>
>>  
>>
>> Hi,
>>
>> we get the following NullPointerException after ~50 minutes when 
>> running a streaming job with windowing and state that reads data from 
>> Kafka and writes the result to local FS.
>>
>> There are around 170 million messages to be processed, Flink 0.10.1 
>> stops at ~8 million.
>>
>> Flink runs locally, started with the "start-cluster-streaming.sh" script.
>>
>>
>> 12/01/2015 15:06:24    Job execution switched to status RUNNING.
>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>> to SCHEDULED
>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>> to DEPLOYING
>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to 
>> SCHEDULED
>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to 
>> DEPLOYING
>> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
>> to RUNNING
>> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to 
>> RUNNING
>> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at
>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to 
>> CANCELED
>> 12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1) switched
>> to FAILED
>> java.lang.Exception
>>     at
>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>>     at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>>     at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>     at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>     at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.NullPointerException
>>     at
>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>>     at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>>     at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>>     at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>>     at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>>     at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>>     at
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>>     at
>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>>     at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$Periodi
>> cOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>>
>> Any ideas on what could cause this behaviour?
>>
>>  
>>
>> Best,
>>
>> Mihail
>>
>>  
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to