Hi, I think (but please someone verify) that an OperatorState is actually not required -- I think that "open()" is called after a failure and recovery, too. So you can use a regular member variable to store the data instead of an OperatorState. In case of failure, you just re-read the data as on regular start-up.
-Matthias On 12/08/2015 09:38 AM, Radu Tudoran wrote: > Hi, > > Thanks for the answer - it is helpful. > The issue that remains is why is the open function not being executed before > the flatmap to load the data in the OperatorState. > > I used something like - and I observe that the dataset is not initialized > when being used in the flatmap function > > env.socketTextStream > .map() -> to transform data to a Tuple1<String> > .keyby(0) -> to enable the usage of the operatorState which I saw requires > keyed structured > .flatmap(RichFlatMapFunction<Tuple1<String>, String> -> the function > { > private OperatorState<String> dataset; > @Override > public void flatMap( > { > Dataset -> use ...is empty > } > @Override > public void open( > { > dataset -> load > } > }) > > > > Dr. Radu Tudoran > Research Engineer > IT R&D Division > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > European Research Center > Riesstrasse 25, 80992 München > > E-mail: radu.tudo...@huawei.com > Mobile: +49 15209084330 > Telephone: +49 891588344173 > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN > This e-mail and its attachments contain confidential information from HUAWEI, > which is intended only for the person or entity whose address is listed > above. Any use of the information contained herein in any way (including, but > not limited to, total or partial disclosure, reproduction, or dissemination) > by persons other than the intended recipient(s) is prohibited. If you receive > this e-mail in error, please notify the sender by phone or email immediately > and delete it! > > -----Original Message----- > From: Matthias J. Sax [mailto:mj...@apache.org] > Sent: Tuesday, December 08, 2015 8:42 AM > To: user@flink.apache.org > Subject: Re: Question about DataStream serialization > > Hi Radu, > > you are right. The open() method is called for each parallel instance of a > rich function. Thus, if all instanced use the same code, you might read the > same data multiple times. > > The easiest was to distinguish different instanced within open() is to user > the RuntimeContext. If offers two methods "int > getNumberOfParallelSubtasks()" and "int getIndexOfThisSubtask()" that you can > use to compute your own partitioning within open(). > > For example (just a sketch): > > @Override > public void open(Configuration parameters) throws Exception { > RuntimeContext context = super.getRuntimeContext(); > int dop = context.getNumberOfParallelSubtasks(); > int idx = context.getIndexOfThisSubtask(); > > // open file > // get size of file in bytes > > // seek to partition #idx: > long seek = fileSize * idx / dop; > > // read "fileSize/dop" bytes > } > > Hope this helps. > > -Matthias > > > On 12/08/2015 04:28 AM, Radu Tudoran wrote: >> Hi, >> >> >> >> Taking the example you mentioned of using RichFlatMapFunction and in >> the >> open() reading a file. >> >> Would this open function be executed on each node where the >> RichFlatMapFunction gets executed? (I have done some tests and I would >> get the feeling it does – but I wanted to double - check ) >> >> If so, would this mean that the same data will be loaded multiple >> times on each parallel instance? Is there anyway, this can be >> prevented and the data to be hashed and partitioned somehow across nodes? >> >> >> >> Would using the operator state help?: >> >> “ >> >> OperatorState*<*MyList<String>*>*dataset*;* >> >> ” >> >> I would be curious in this case how could the open function look like >> to initialize the data for this operator state: >> >> >> >> >> >> I have tried to just read a file and write it into the dataset, but I >> encountered a strange behavior that would look like the flatmap >> function gets executed before the open function, which leads to using >> an empty dataset in the flatmap function while when this finish >> executing the dataset gets loaded. Is this an error or I am doing something >> wrong? >> >> >> >> >> >> >> >> Dr. Radu Tudoran >> >> Research Engineer >> >> IT R&D Division >> >> >> >> cid:image007.jpg@01CD52EB.AD060EE0 >> >> HUAWEI TECHNOLOGIES Duesseldorf GmbH >> >> European Research Center >> >> Riesstrasse 25, 80992 München >> >> >> >> E-mail: _radu.tudoran@huawei.com_ >> >> Mobile: +49 15209084330 >> >> Telephone: +49 891588344173 >> >> >> >> HUAWEI TECHNOLOGIES Duesseldorf GmbH >> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com >> <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court >> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, >> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, >> HRB 56063, >> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN >> >> This e-mail and its attachments contain confidential information from >> HUAWEI, which is intended only for the person or entity whose address >> is listed above. Any use of the information contained herein in any >> way (including, but not limited to, total or partial disclosure, >> reproduction, or dissemination) by persons other than the intended >> recipient(s) is prohibited. If you receive this e-mail in error, >> please notify the sender by phone or email immediately and delete it! >> >> >> >> *From:*Robert Metzger [mailto:rmetz...@apache.org] >> *Sent:* Tuesday, December 01, 2015 6:21 PM >> *To:* user@flink.apache.org >> *Cc:* Goetz Brasche >> *Subject:* Re: Question about DataStream serialization >> >> >> >> Hi Radu, >> >> >> >> both emails reached the mailing list :) >> >> >> >> You can not reference to DataSets or DataStreams from inside user >> defined functions. Both are just abstractions for a data set or >> stream, so the elements are not really inside the set. >> >> >> >> We don't have any support for mixing the DataSet and DataStream API. >> >> >> >> For your use case, I would recommend you to use a RichFlatMapFunction >> and in the open() call read the text file. >> >> >> >> >> >> >> >> On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran <radu.tudo...@huawei.com >> <mailto:radu.tudo...@huawei.com>> wrote: >> >> >> >> Hello, >> >> >> >> I am not sure if this message was received on the user list, if so I >> apologies for duplicate messages >> >> >> >> I have the following scenario >> >> >> >> · Reading a fixed set >> >> DataStream<String> /fixedset/ = env.readtextFile(… >> >> · Reading a continuous stream of data >> >> DataStream<String> /stream/ = …. >> >> >> >> I would need that for each event read from the continuous stream to >> make some operations onit and on the /fixedsettoghether/ >> >> >> >> >> >> I have tried something like >> >> >> >> final myObject.referenceStaticSet = fixedset; >> >> stream.map(new MapFunction<String, String>() { >> >> @Override >> >> public String map(String arg0) throws Exception { >> >> >> >> //for example: final string2add = arg0; >> >> //the >> goal of below function would be to add the string2add to the fixedset >> >> myObject.referenceStaticSet = >> myObject.referenceStaticSet.flatMap(new FlatMapFunction<String, >> String>() { >> >> >> >> @Override >> >> public void flatMap(String arg0, >> Collector<String> arg1) >> >> >> >> //for example adding to the fixed set also the string2add object: >> arg1.collect(string2add); >> >> >> } >> >> … >> >> } >> >> >> >> However, I get an exception (Exception in thread "main" >> _org.apache.flink.api.common.InvalidProgramException_: ) that object >> is not serializable (Object MyClass$3@a71081 not serializable ) >> >> >> >> Looking into this I see that the issues is that the DataStream<> is >> not serializable. What would be the solution to this issue? >> >> >> >> As I mentioned before, I would like that for each event from the >> continuous stream to use the initial fixed set, add the event to it >> and apply an operation. >> >> Stephan was mentioning at some point some possibility to create a >> DataSet and launch a batch processing while operating in stream mode– >> in case this is possible, can you give me a reference for it, because >> it might be the good solution to use in case. I am thinking that I >> could keep the fixed set as a DataSet and as each new event comes, >> transform it into a dataset and then join with reference set and apply >> an operation >> >> >> >> Regards, >> >> >> >> >> >> >> >> >> >> Dr. Radu Tudoran >> >> Research Engineer >> >> IT R&D Division >> >> >> >> cid:image007.jpg@01CD52EB.AD060EE0 >> >> HUAWEI TECHNOLOGIES Duesseldorf GmbH >> >> European Research Center >> >> Riesstrasse 25, 80992 München >> >> >> >> E-mail: _radu.tudo...@huawei.com <mailto:radu.tudo...@huawei.com>_ >> >> Mobile: +49 15209084330 <tel:%2B49%2015209084330> >> >> Telephone: +49 891588344173 <tel:%2B49%20891588344173> >> >> >> >> HUAWEI TECHNOLOGIES Duesseldorf GmbH >> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com >> <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court >> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, >> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, >> HRB 56063, >> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN >> >> This e-mail and its attachments contain confidential information from >> HUAWEI, which is intended only for the person or entity whose address >> is listed above. Any use of the information contained herein in any >> way (including, but not limited to, total or partial disclosure, >> reproduction, or dissemination) by persons other than the intended >> recipient(s) is prohibited. If you receive this e-mail in error, >> please notify the sender by phone or email immediately and delete it! >> >> >> >> *From:*Vieru, Mihail [mailto:mihail.vi...@zalando.de >> <mailto:mihail.vi...@zalando.de>] >> *Sent:* Tuesday, December 01, 2015 4:55 PM >> *To:* user@flink.apache.org <mailto:user@flink.apache.org> >> *Subject:* NPE with Flink Streaming from Kafka >> >> >> >> Hi, >> >> we get the following NullPointerException after ~50 minutes when >> running a streaming job with windowing and state that reads data from >> Kafka and writes the result to local FS. >> >> There are around 170 million messages to be processed, Flink 0.10.1 >> stops at ~8 million. >> >> Flink runs locally, started with the "start-cluster-streaming.sh" script. >> >> >> 12/01/2015 15:06:24 Job execution switched to status RUNNING. >> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >> to SCHEDULED >> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >> to DEPLOYING >> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >> SCHEDULED >> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >> DEPLOYING >> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >> to RUNNING >> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >> RUNNING >> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at >> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >> CANCELED >> 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) switched >> to FAILED >> java.lang.Exception >> at >> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.NullPointerException >> at >> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) >> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) >> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) >> at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) >> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) >> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) >> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) >> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) >> at >> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) >> at >> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$Periodi >> cOffsetCommitter.run(FlinkKafkaConsumer.java:632) >> >> Any ideas on what could cause this behaviour? >> >> >> >> Best, >> >> Mihail >> >> >> >
signature.asc
Description: OpenPGP digital signature