I assume that the provided FetchStock code is not complete. As the exception indicates, you somehow store a LocalStreamEnvironment in you source function. The StreamExecutionEnvironments are not serializable and cannot be part of the source function’s closure.
Cheers, Till On Tue, Apr 19, 2016 at 2:32 PM, subash basnet <yasub...@gmail.com> wrote: > Hello all, > > My requirement is to re-read the csv file from a file path at certain time > intervals and process the csv data. The csv file gets updated at regular > intervals. > Below is my code: > StreamExecutionEnvironment see = > StreamExecutionEnvironment.getExecutionEnvironment(); > *DataStream<String> dataStream = getCsvDataStream(see);* > DataStream<Stock> edits = see.addSource(new FetchStock("path/to/csv")); > > In FetchStock.java > public class FetchStock extends RichSourceFunction<Stock> { > public FetchStock(String csvPath) { > this.csvPath = csvPath; > } > } > > I am trying to adapt code from *WikipediaAnalysis, *but getting the below > not serializable exception on adding source: > *Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: Object > wikiedits.FetchStock@d7b1517 not serializable* > at > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99) > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1075) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1057) > at wikiedits.StockAnalysis.main(StockAnalysis.java:30) > *Caused by: java.io.NotSerializableException: > org.apache.flink.streaming.api.environment.LocalStreamEnvironment* > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) > at > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) > ... 6 more > > > I have attached Stock.java which is just a model with getters and setters. > Not sure what am I doing wrong. > > Best Regards, > Subash Basnet >