Hi, Muazim Flink is an incremental computing framework, in streaming mode it considers data to be unbounded, so every piece of data that comes in triggers the computation logic because it doesn't know when the data will end. Based on your description, I understand that you may have a full data computation requirement and only need to compute once after seeing all the data, so you can try setting the execution mode to batch mode[1].
env.setRuntimeMode(RuntimeExecutionMode.BATCH); [1]: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/ Best Ron Muazim Wani <muazim1...@gmail.com> 于2023年8月1日周二 18:04写道: > Hi Team, > I am new to Flink. I have this use case where I have a dataStream of > Doubles and I am trying to get the total sum of whole DataStream. > > I have used ReduceFunction and AggregateFunction. > > Case 1: In Reduced function the output is dataStream of rolling Sum. To > get the final Sum I have to traverse the outputStream and the last value > would be my total . In my case, I don't want to iterate the whole > dataStream to get the final Sum and also don't wan't to use an extra > DataStream to just store the final aggregated value. > > Case 2: I am able to access aggregate() method only after countWindow() > and in countWindow() we have to pass the size. As I don't know the size of > my dataStream(User will be sending data to me) I can't use it. > > Below is my implementation of ReduceFunction > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > DataStreamSource<Double> dataStream = env.fromElements(2.00, 3.00, 4.00, > 11.00, 13.00, 14.00); > > DataStream<Double> singleOutputStreamOperator = dataStream.keyBy(value -> > "key").reduce(new ReduceFunction<Double>() { > @Override > public Double reduce(Double aDouble, Double t1) throws Exception { > return aDouble+ t1; > } > }); > > singleOutputStreamOperator.print(); > DataStream.Collector<Double> doubleCollector = new > DataStream.Collector<>(); > singleOutputStreamOperator.collectAsync(doubleCollector); > singleOutputStreamOperator.executeAndCollect("Aggregation"); > > Double result = null; > while( doubleCollector.getOutput().hasNext() ) { > result = doubleCollector.getOutput().next(); > System.out.println("result = " + result); > } > The output looks like : 2.0, 5.0, 9.0, 20.0, 33.0. I simply want to get > 33.0 as my aggregated value and store it in variable and give it to user. > > Is there any better way to solve it for my useCase? > > Thanks and regards >