[ https://issues.apache.org/jira/browse/FLINK-31753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dong Lin updated FLINK-31753: ----------------------------- Issue Type: Improvement (was: Bug) > Support DataStream CoGroup in stream Mode with similar performance as DataSet > CoGroup > ------------------------------------------------------------------------------------- > > Key: FLINK-31753 > URL: https://issues.apache.org/jira/browse/FLINK-31753 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning > Reporter: Dong Lin > Assignee: Dong Lin > Priority: Major > Labels: pull-request-available > Fix For: ml-2.3.0 > > > DataSet has been deprecated and will be removed from Flink. However, > DataStream CoCroup is still considerably slower than DataSet when co-grouping > two bounded streams. > Here are the benchmark results of co-grouping two bounded streams with 4*10^6 > records from each stream under different modes. The co-group function is > chosen to be very lightweight so that benchmark is dominated by the Flink's > co-group overhead. > DataSet: 5.6 sec > DataStream batch mode: 15.4 sec > DataStream stream mode with rocksdb: 81 sec > We should be able to performance co-group operation in DataStream stream mode > so that users' don't have to take big regression in order to migrate from > DataSet to DataStream. > We will first add util function in Flink ML to unblock the migration of some > algorithms from Alink to Flink ML. > Here is the code used to benchmark DataSet's CoGroup. > {code:java} > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().enableObjectReuse(); > env.getConfig().disableGenericTypes(); > env.setRestartStrategy(RestartStrategies.noRestart()); > env.setParallelism(1); > DataSet<Tuple3<Integer, Integer, Double>> data1 = > env.fromCollection( > new DataGenerator(numRecords), > Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE)); > DataSet<Tuple3<Integer, Integer, Double>> data2 = > env.fromCollection( > new DataGenerator(numRecords), > Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE)); > data1.coGroup(data2) > .where((KeySelector<Tuple3<Integer, Integer, Double>, Integer>) tuple > -> tuple.f0) > .equalTo((KeySelector<Tuple3<Integer, Integer, Double>, Integer>) > tuple -> tuple.f0) > .with( > new RichCoGroupFunction< > Tuple3<Integer, Integer, Double>, > Tuple3<Integer, Integer, Double>, > Integer>() { > @Override > public void open(Configuration parameters) throws > Exception { > super.open(parameters); > } > @Override > public void close() throws Exception { > super.close(); > } > @Override > public void coGroup( > Iterable<Tuple3<Integer, Integer, Double>> > iterable, > Iterable<Tuple3<Integer, Integer, Double>> > iterable1, > Collector<Integer> collector) > throws Exception { > collector.collect(1); > } > }) > .write(new CountingAndDiscardingSink(), "/tmp"); > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)