Hi, community!
When dealing with retractable stream, i meet a problem about converting Table to DataSet / DataStream on batch mode in Flink-1.13.5. scenario and process: - 1. Database CDC to Kafka - 2. Sync data into Hive with HoodieTableFormat(Apache Hudi) - 3. Incremental processing hoodie table in Streaming mode, or full processing in Batch mode. Because it's more difficult to implement UDAF / UDTF on retractable table than on retractable stream, we choose to convert the Table to DataStream for processing. But we find that - 1. Only BatchTableEnvironment can convert Table to DataSet, other implementations of TableEnvironment will throw Exception as code below. - 2. BatchTableEnvironment will be dropped in 1.14 because it only support the old planner. - 3. TableEnvironment#create API returns the TableEnvironmentImpl instance, TableEnvironmentImpl works exclusively with Table API and can't convert Table to DataSet. org.apache.flink.table.api.bridge.scala.TableConversions.scala so, how to convert Table to DataStream on batch mode? Thanks for any replies or suggestions. in streaming mode, ``` val tenv: StreamTableEnvironment = StreanTableEnvironment.create(senv, setting); val table = tenv.sqlQuery("select ...") val dStream: DataStream[Row] = tenv.toChangelogStream(table) dStream.map(..)... ``` in batch mode, ``` val tenv: TableEnvironment = TableEnvironment.create(setting) // actually, return TableEnvironmentImpl val table = tenv.sqlQuery("select ...") // how to convert Table to DataSet / DataStream val dStream: DataSet[Row] = ? ``` Best Regards!
fecb192f-3915-4958-8600-7b9c1afebfec.png
Description: Binary data