[ https://issues.apache.org/jira/browse/FLINK-8663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084870#comment-17084870 ]
SmallWong commented on FLINK-8663: ---------------------------------- The first 5 values is be trigger to calcalate when using window(10, 5), at this moment the window size is less then 10. > Execution of DataStreams result in non functionality of size of Window for > countWindow > -------------------------------------------------------------------------------------- > > Key: FLINK-8663 > URL: https://issues.apache.org/jira/browse/FLINK-8663 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.4.0 > Environment: package com.vnl.stocks; > import java.util.concurrent.TimeUnit; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.streaming.api.datastream.AllWindowedStream; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.datastream.WindowedStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; > import org.apache.flink.streaming.api.windowing.windows.TimeWindow; > public class StocksProcessing { > > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > //Read from a socket stream at map it to StockPrice objects > DataStream<StockPrice> socketStockStream = env > .socketTextStream("localhost", 9999) > .map(new MapFunction<String, StockPrice>() { > private String[] tokens; > > @Override > public StockPrice map(String value) throws > Exception { > tokens = value.split(","); > return new StockPrice(tokens[0], > Double.parseDouble(tokens[1])); > } > }); > > socketStockStream.print(); > //Generate other stock streams > DataStream<StockPrice> SPX_stream = env.addSource(new > StockSource("SPX", 10)); > // DataStream<StockPrice> FTSE_stream = env.addSource(new > StockSource("FTSE", 20)); > // DataStream<StockPrice> DJI_stream = env.addSource(new > StockSource("DJI", 30)); > // DataStream<StockPrice> BUX_stream = env.addSource(new > StockSource("BUX", 40)); > > //Merge all stock streams together > > DataStream<StockPrice> stockStream = > socketStockStream.union(SPX_stream/*, FTSE_stream, DJI_stream, BUX_stream*/); > > > // stockStream.print(); > Thread.sleep(100); > > AllWindowedStream<StockPrice, GlobalWindow> windowedStream = > stockStream > .countWindowAll(10, 5); > > //.keyBy("symbol") > //.timeWindowAll(Time.of(10, TimeUnit.SECONDS), > Time.of(1, TimeUnit.SECONDS)); > > //stockStream.keyBy("symbol"); > //Compute some simple statistics on a rolling window > DataStream<StockPrice> lowest = > windowedStream.maxBy("price"); > //DataStream<StockPrice> highest = windowedStream.; > /*DataStream<StockPrice> maxByStock = > windowedStream.groupBy("symbol") > .maxBy("price").flatten(); > DataStream<StockPrice> rollingMean = > windowedStream.groupBy("symbol") > .mapWindow(new WindowMean()).flatten();*/ > lowest.print(); > > Thread.sleep(100); > /* > AllWindowedStream<StockPrice, GlobalWindow> > windowedStream1 = lowest > .countWindowAll(5,2); > //windowedStream1.print(); > DataStream<StockPrice> highest = > windowedStream1.minBy("price");*/ > //highest.print(); > > env.execute("Stock stream"); > } > } > Reporter: Subham > Priority: Major > > I used AllWindowedStream<?,GlobalWindow> to process a stream and generate > maximum of my window using countWindowAll functions. In this output the size > and slide of window works incorrectly. > Refer below example for the bug > Initial stream : 1,2,3,4,5,6......... > Output 1: (Find min for window 10,5) : 1,6,11.....(This is correct) > However if i calculate maximum, I get output as: > Output 2: (Find max for window 10,5) : 5,10,15.... (which is wrong) > Expected: 10,15,20.... > > Please resolve this error. > -- This message was sent by Atlassian Jira (v8.3.4#803005)