I assume that the provided FetchStock code is not complete. As the
exception indicates, you somehow store a LocalStreamEnvironment in you
source function. The StreamExecutionEnvironments are not serializable and
cannot be part of the source function’s closure.

Cheers,
Till
​

On Tue, Apr 19, 2016 at 2:32 PM, subash basnet <yasub...@gmail.com> wrote:

> Hello all,
>
> My requirement is to re-read the csv file from a file path at certain time
> intervals and process the csv data. The csv file gets updated at regular
> intervals.
> Below is my code:
> StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
> *DataStream<String> dataStream = getCsvDataStream(see);*
> DataStream<Stock> edits = see.addSource(new FetchStock("path/to/csv"));
>
> In FetchStock.java
> public class FetchStock extends RichSourceFunction<Stock> {
> public FetchStock(String csvPath) {
> this.csvPath = csvPath;
> }
> }
>
> I am trying to adapt code from *WikipediaAnalysis, *but getting the below
> not serializable exception on adding source:
> *Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: Object
> wikiedits.FetchStock@d7b1517 not serializable*
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1075)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1057)
> at wikiedits.StockAnalysis.main(StockAnalysis.java:30)
> *Caused by: java.io.NotSerializableException:
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment*
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
> ... 6 more
>
>
> I have attached Stock.java which is just a model with getters and setters.
> Not sure what am I doing wrong.
>
> Best Regards,
> Subash Basnet
>

Reply via email to