Runtime error is because you have non-serializable code in your 'map' operator. DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol)); stockPrices.print();
The approach that you took will create infinite stockprice sources inside the map operator. I don' think it will be an easy DAG for flink to resolve. One approach could be to have a separate keyed stream for stock price, window them for a specific period and look for the price in that window. This is an interesting use case. I'm sure the flink team tried this. Thanks, Sandeep On Fri, Dec 16, 2016 at 12:16 PM, hnadathur <nksrih...@gmail.com> wrote: > I'm trying to build a sample application using Flink that does the > following: > 1. Reads a stream of stock symbols (e.g. 'CSCO', 'FB') from a Kafka queue > 2. For each symbol performs a real-time lookup of current prices and > streams > the values > > The program compiles fine but I get the following run-time error message: > "The implementation of the MapFunction is not serializable. The object > probably contains or references non serializable fields". > > I suspect the problem is due to the way I'm accessing the > StreamExecutionEnvironment. Could someone please provide pointers to how I > can use values from a data stream to create a new streaming data source? > Any > response is appreciated. > > Relevant code snippet is provided below: > > public class RetrieveStockPrices { > > @SuppressWarnings("serial") > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment streamExecEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > > streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic. > IngestionTime); > > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", > "localhost:9092"); > properties.setProperty("zookeeper.connect", > "localhost:2181"); > properties.setProperty("group.id", "stocks"); > > DataStream<String> streamOfStockSymbols = > streamExecEnv.addSource(new > FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), > properties)); > streamOfStockSymbols.map(new MapFunction<String, String> > () { > @Override > public String map(String stockSymbol) throws > Exception { > DataStream<String> stockPrices = > streamExecEnv.addSource(new > LookupStockPrice(stockSymbol)); > stockPrices.print(); > return null; > } > }); > > streamExecEnv.execute("Retrieve Stock Prices"); > } > > } > > > public class LookupStockPrice extends RichSourceFunction<String> { > public String stockSymbol = null; > public boolean isRunning = true; > > public LookupStockPrice(String inSymbol) { > stockSymbol = inSymbol; > } > > @Override > public void open(Configuration parameters) throws Exception { > isRunning = true; > } > > > @Override > public void cancel() { > isRunning = false; > } > > @Override > public void run(SourceFunction.SourceContext<String> ctx) > throws Exception { > while (isRunning) { > //TODO: query Google Finance API > ctx.collect("12.5"); > } > } > } > > > > > -- > View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/How-do-I-use- > values-from-a-data-stream-to-create-a-new-streaming-data- > source-tp10680.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >