Hello, I am not sure if this message was received on the user list, if so I apologies for duplicate messages
I have the following scenario · Reading a fixed set DataStream<String> fixedset = env.readtextFile(… · Reading a continuous stream of data DataStream<String> stream = …. I would need that for each event read from the continuous stream to make some operations onit and on the fixedsettoghether I have tried something like final myObject.referenceStaticSet = fixedset; stream.map(new MapFunction<String, String>() { @Override public String map(String arg0) throws Exception { //for example: final string2add = arg0; //the goal of below function would be to add the string2add to the fixedset myObject.referenceStaticSet = myObject.referenceStaticSet.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String arg0, Collector<String> arg1) //for example adding to the fixed set also the string2add object: arg1.collect(string2add); } … } However, I get an exception (Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: ) that object is not serializable (Object MyClass$3@a71081 not serializable ) Looking into this I see that the issues is that the DataStream<> is not serializable. What would be the solution to this issue? As I mentioned before, I would like that for each event from the continuous stream to use the initial fixed set, add the event to it and apply an operation. Stephan was mentioning at some point some possibility to create a DataSet and launch a batch processing while operating in stream mode– in case this is possible, can you give me a reference for it, because it might be the good solution to use in case. I am thinking that I could keep the fixed set as a DataSet and as each new event comes, transform it into a dataset and then join with reference set and apply an operation Regards, Dr. Radu Tudoran Research Engineer IT R&D Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! From: Vieru, Mihail [mailto:mihail.vi...@zalando.de] Sent: Tuesday, December 01, 2015 4:55 PM To: user@flink.apache.org Subject: NPE with Flink Streaming from Kafka Hi, we get the following NullPointerException after ~50 minutes when running a streaming job with windowing and state that reads data from Kafka and writes the result to local FS. There are around 170 million messages to be processed, Flink 0.10.1 stops at ~8 million. Flink runs locally, started with the "start-cluster-streaming.sh" script. 12/01/2015 15:06:24 Job execution switched to status RUNNING. 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to SCHEDULED 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to DEPLOYING 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to SCHEDULED 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to DEPLOYING 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched to RUNNING 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to RUNNING 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to CANCELED 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) switched to FAILED java.lang.Exception at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) at org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) Any ideas on what could cause this behaviour? Best, Mihail