Hi, if the open() method is indeed not called before the first flatMap() call then this would be a bug. Could you please verify that this is the case and maybe provide an example where this is observable?
Cheers, Aljoscha > On 08 Dec 2015, at 10:41, Matthias J. Sax <mj...@apache.org> wrote: > > Hi, > > I think (but please someone verify) that an OperatorState is actually > not required -- I think that "open()" is called after a failure and > recovery, too. So you can use a regular member variable to store the > data instead of an OperatorState. In case of failure, you just re-read > the data as on regular start-up. > > -Matthias > > > On 12/08/2015 09:38 AM, Radu Tudoran wrote: >> Hi, >> >> Thanks for the answer - it is helpful. >> The issue that remains is why is the open function not being executed before >> the flatmap to load the data in the OperatorState. >> >> I used something like - and I observe that the dataset is not initialized >> when being used in the flatmap function >> >> env.socketTextStream >> .map() -> to transform data to a Tuple1<String> >> .keyby(0) -> to enable the usage of the operatorState which I saw requires >> keyed structured >> .flatmap(RichFlatMapFunction<Tuple1<String>, String> -> the function >> { >> private OperatorState<String> dataset; >> @Override >> public void flatMap( >> { >> Dataset -> use ...is empty >> } >> @Override >> public void open( >> { >> dataset -> load >> } >> }) >> >> >> >> Dr. Radu Tudoran >> Research Engineer >> IT R&D Division >> >> >> HUAWEI TECHNOLOGIES Duesseldorf GmbH >> European Research Center >> Riesstrasse 25, 80992 München >> >> E-mail: radu.tudo...@huawei.com >> Mobile: +49 15209084330 >> Telephone: +49 891588344173 >> >> HUAWEI TECHNOLOGIES Duesseldorf GmbH >> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com >> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, >> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN >> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, >> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN >> This e-mail and its attachments contain confidential information from >> HUAWEI, which is intended only for the person or entity whose address is >> listed above. Any use of the information contained herein in any way >> (including, but not limited to, total or partial disclosure, reproduction, >> or dissemination) by persons other than the intended recipient(s) is >> prohibited. If you receive this e-mail in error, please notify the sender by >> phone or email immediately and delete it! >> >> -----Original Message----- >> From: Matthias J. Sax [mailto:mj...@apache.org] >> Sent: Tuesday, December 08, 2015 8:42 AM >> To: user@flink.apache.org >> Subject: Re: Question about DataStream serialization >> >> Hi Radu, >> >> you are right. The open() method is called for each parallel instance of a >> rich function. Thus, if all instanced use the same code, you might read the >> same data multiple times. >> >> The easiest was to distinguish different instanced within open() is to user >> the RuntimeContext. If offers two methods "int >> getNumberOfParallelSubtasks()" and "int getIndexOfThisSubtask()" that you >> can use to compute your own partitioning within open(). >> >> For example (just a sketch): >> >> @Override >> public void open(Configuration parameters) throws Exception { >> RuntimeContext context = super.getRuntimeContext(); >> int dop = context.getNumberOfParallelSubtasks(); >> int idx = context.getIndexOfThisSubtask(); >> >> // open file >> // get size of file in bytes >> >> // seek to partition #idx: >> long seek = fileSize * idx / dop; >> >> // read "fileSize/dop" bytes >> } >> >> Hope this helps. >> >> -Matthias >> >> >> On 12/08/2015 04:28 AM, Radu Tudoran wrote: >>> Hi, >>> >>> >>> >>> Taking the example you mentioned of using RichFlatMapFunction and in >>> the >>> open() reading a file. >>> >>> Would this open function be executed on each node where the >>> RichFlatMapFunction gets executed? (I have done some tests and I would >>> get the feeling it does – but I wanted to double - check ) >>> >>> If so, would this mean that the same data will be loaded multiple >>> times on each parallel instance? Is there anyway, this can be >>> prevented and the data to be hashed and partitioned somehow across nodes? >>> >>> >>> >>> Would using the operator state help?: >>> >>> “ >>> >>> OperatorState*<*MyList<String>*>*dataset*;* >>> >>> ” >>> >>> I would be curious in this case how could the open function look like >>> to initialize the data for this operator state: >>> >>> >>> >>> >>> >>> I have tried to just read a file and write it into the dataset, but I >>> encountered a strange behavior that would look like the flatmap >>> function gets executed before the open function, which leads to using >>> an empty dataset in the flatmap function while when this finish >>> executing the dataset gets loaded. Is this an error or I am doing something >>> wrong? >>> >>> >>> >>> >>> >>> >>> >>> Dr. Radu Tudoran >>> >>> Research Engineer >>> >>> IT R&D Division >>> >>> >>> >>> cid:image007.jpg@01CD52EB.AD060EE0 >>> >>> HUAWEI TECHNOLOGIES Duesseldorf GmbH >>> >>> European Research Center >>> >>> Riesstrasse 25, 80992 München >>> >>> >>> >>> E-mail: _radu.tudoran@huawei.com_ >>> >>> Mobile: +49 15209084330 >>> >>> Telephone: +49 891588344173 >>> >>> >>> >>> HUAWEI TECHNOLOGIES Duesseldorf GmbH >>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com >>> <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court >>> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, >>> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, >>> HRB 56063, >>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN >>> >>> This e-mail and its attachments contain confidential information from >>> HUAWEI, which is intended only for the person or entity whose address >>> is listed above. Any use of the information contained herein in any >>> way (including, but not limited to, total or partial disclosure, >>> reproduction, or dissemination) by persons other than the intended >>> recipient(s) is prohibited. If you receive this e-mail in error, >>> please notify the sender by phone or email immediately and delete it! >>> >>> >>> >>> *From:*Robert Metzger [mailto:rmetz...@apache.org] >>> *Sent:* Tuesday, December 01, 2015 6:21 PM >>> *To:* user@flink.apache.org >>> *Cc:* Goetz Brasche >>> *Subject:* Re: Question about DataStream serialization >>> >>> >>> >>> Hi Radu, >>> >>> >>> >>> both emails reached the mailing list :) >>> >>> >>> >>> You can not reference to DataSets or DataStreams from inside user >>> defined functions. Both are just abstractions for a data set or >>> stream, so the elements are not really inside the set. >>> >>> >>> >>> We don't have any support for mixing the DataSet and DataStream API. >>> >>> >>> >>> For your use case, I would recommend you to use a RichFlatMapFunction >>> and in the open() call read the text file. >>> >>> >>> >>> >>> >>> >>> >>> On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran <radu.tudo...@huawei.com >>> <mailto:radu.tudo...@huawei.com>> wrote: >>> >>> >>> >>> Hello, >>> >>> >>> >>> I am not sure if this message was received on the user list, if so I >>> apologies for duplicate messages >>> >>> >>> >>> I have the following scenario >>> >>> >>> >>> · Reading a fixed set >>> >>> DataStream<String> /fixedset/ = env.readtextFile(… >>> >>> · Reading a continuous stream of data >>> >>> DataStream<String> /stream/ = …. >>> >>> >>> >>> I would need that for each event read from the continuous stream to >>> make some operations onit and on the /fixedsettoghether/ >>> >>> >>> >>> >>> >>> I have tried something like >>> >>> >>> >>> final myObject.referenceStaticSet = fixedset; >>> >>> stream.map(new MapFunction<String, String>() { >>> >>> @Override >>> >>> public String map(String arg0) throws Exception { >>> >>> >>> >>> //for example: final string2add = arg0; >>> >>> //the >>> goal of below function would be to add the string2add to the fixedset >>> >>> myObject.referenceStaticSet = >>> myObject.referenceStaticSet.flatMap(new FlatMapFunction<String, >>> String>() { >>> >>> >>> >>> @Override >>> >>> public void flatMap(String arg0, >>> Collector<String> arg1) >>> >>> >>> //for example adding to the fixed set also the string2add object: >>> arg1.collect(string2add); >>> >>> >>> } >>> >>> … >>> >>> } >>> >>> >>> >>> However, I get an exception (Exception in thread "main" >>> _org.apache.flink.api.common.InvalidProgramException_: ) that object >>> is not serializable (Object MyClass$3@a71081 not serializable ) >>> >>> >>> >>> Looking into this I see that the issues is that the DataStream<> is >>> not serializable. What would be the solution to this issue? >>> >>> >>> >>> As I mentioned before, I would like that for each event from the >>> continuous stream to use the initial fixed set, add the event to it >>> and apply an operation. >>> >>> Stephan was mentioning at some point some possibility to create a >>> DataSet and launch a batch processing while operating in stream mode– >>> in case this is possible, can you give me a reference for it, because >>> it might be the good solution to use in case. I am thinking that I >>> could keep the fixed set as a DataSet and as each new event comes, >>> transform it into a dataset and then join with reference set and apply >>> an operation >>> >>> >>> >>> Regards, >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> Dr. Radu Tudoran >>> >>> Research Engineer >>> >>> IT R&D Division >>> >>> >>> >>> cid:image007.jpg@01CD52EB.AD060EE0 >>> >>> HUAWEI TECHNOLOGIES Duesseldorf GmbH >>> >>> European Research Center >>> >>> Riesstrasse 25, 80992 München >>> >>> >>> >>> E-mail: _radu.tudo...@huawei.com <mailto:radu.tudo...@huawei.com>_ >>> >>> Mobile: +49 15209084330 <tel:%2B49%2015209084330> >>> >>> Telephone: +49 891588344173 <tel:%2B49%20891588344173> >>> >>> >>> >>> HUAWEI TECHNOLOGIES Duesseldorf GmbH >>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com >>> <http://www.huawei.com/> Registered Office: Düsseldorf, Register Court >>> Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, >>> Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, >>> HRB 56063, >>> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN >>> >>> This e-mail and its attachments contain confidential information from >>> HUAWEI, which is intended only for the person or entity whose address >>> is listed above. Any use of the information contained herein in any >>> way (including, but not limited to, total or partial disclosure, >>> reproduction, or dissemination) by persons other than the intended >>> recipient(s) is prohibited. If you receive this e-mail in error, >>> please notify the sender by phone or email immediately and delete it! >>> >>> >>> >>> *From:*Vieru, Mihail [mailto:mihail.vi...@zalando.de >>> <mailto:mihail.vi...@zalando.de>] >>> *Sent:* Tuesday, December 01, 2015 4:55 PM >>> *To:* user@flink.apache.org <mailto:user@flink.apache.org> >>> *Subject:* NPE with Flink Streaming from Kafka >>> >>> >>> >>> Hi, >>> >>> we get the following NullPointerException after ~50 minutes when >>> running a streaming job with windowing and state that reads data from >>> Kafka and writes the result to local FS. >>> >>> There are around 170 million messages to be processed, Flink 0.10.1 >>> stops at ~8 million. >>> >>> Flink runs locally, started with the "start-cluster-streaming.sh" script. >>> >>> >>> 12/01/2015 15:06:24 Job execution switched to status RUNNING. >>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >>> to SCHEDULED >>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >>> to DEPLOYING >>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >>> SCHEDULED >>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >>> DEPLOYING >>> 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched >>> to RUNNING >>> 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at >>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >>> RUNNING >>> 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at >>> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to >>> CANCELED >>> 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) switched >>> to FAILED >>> java.lang.Exception >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.lang.NullPointerException >>> at >>> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) >>> at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) >>> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) >>> at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) >>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) >>> at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) >>> at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) >>> at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$Periodi >>> cOffsetCommitter.run(FlinkKafkaConsumer.java:632) >>> >>> Any ideas on what could cause this behaviour? >>> >>> >>> >>> Best, >>> >>> Mihail >>> >>> >>> >> >