Hi, community!

When dealing with retractable stream, i meet a problem about converting Table 
to DataSet / DataStream on batch mode in Flink-1.13.5. 


scenario and process: 
- 1. Database CDC to Kafka
- 2. Sync data into Hive with HoodieTableFormat(Apache Hudi)
- 3. Incremental processing hoodie table in Streaming mode, or full processing 
in Batch mode. 


Because it's more difficult to implement UDAF / UDTF on retractable table than 
on retractable stream, we choose to convert the Table to DataStream for 
processing. But we find that 


- 1. Only BatchTableEnvironment can convert Table to DataSet, other 
implementations of TableEnvironment will throw Exception as code below.  
- 2. BatchTableEnvironment will be dropped in 1.14 because it only support the 
old planner.  
- 3. TableEnvironment#create API returns the TableEnvironmentImpl instance, 
TableEnvironmentImpl works exclusively with Table API and can't convert Table 
to DataSet.


org.apache.flink.table.api.bridge.scala.TableConversions.scala




so, how to convert Table to DataStream on batch mode? Thanks for any replies or 
suggestions.




in streaming mode,


```
val tenv: StreamTableEnvironment = StreanTableEnvironment.create(senv, setting);
val table = tenv.sqlQuery("select ...") 
val dStream: DataStream[Row] = tenv.toChangelogStream(table)
dStream.map(..)...
```


in batch mode, 


```
val tenv: TableEnvironment = TableEnvironment.create(setting) // actually, 
return TableEnvironmentImpl 
val table = tenv.sqlQuery("select ...")


// how to convert Table to DataSet / DataStream 
val dStream: DataSet[Row] = ? 
```


Best Regards!

Attachment: fecb192f-3915-4958-8600-7b9c1afebfec.png
Description: Binary data

Reply via email to