Hello DEV!

I am currently working on issue [1].
The contribution consists in migrating the batch SQL test from a bash 
implementation to a Java one.

After a couple of rounds of review we agreed to proceed by removing the 
existing deprecations due to how the source table is generated.
The main suggestion is to use `DynamicTableSource` and the latest framework to 
create a new table source.

The result is this commit [2].

I would like a double-check from the community (and, possibly any volunteer 
reviewer :)), as I found many stopper and quite difficult to resort to this 
implementation.
Here are the difficulties I faced:

• At first, I just wanted to use `TableEnvironment.fromValues()`, however, I 
could not get anywhere, as the test was hanging (for hours).
Still, I don't know if there is some sort of deadlock/stack overflow, but I 
literally let it run for hours.
For now, I noticed by creating a minimal test case, that it hangs when you 
generate thousands of rows.
I am planning to understand this better and file an issue, possibly
• Once I got stuck there, I decided to jump into the implementation of a 
`DynamicTableSource`
• I followed the tutorial at [3], but it uses deprecated interfaces, so I had 
to explore a bit the Flink repo (how to contribute to the blog post to fix 
that?)
• Finally I found the classes I needed: use `SourceProvider.of()` (no 
`SourceFunctionProvider`) and use the wonderful `FromElementsSource` to get 
nice no-op split enumerators and checkpoint enumerators, good
• `SourceProvider.of()` only accepts a `Source<RowData, ?, ?>`, but `RowData` 
is not `Serializable` (I find this weird, definitely I am doing something wrong 
here...)!
• Implement my `RowData` and make it `Serializable`


And the result is what you see at [2], what do you think?
It was quite cumbersome honestly and I would like to hear your opinion about 
this.
Especially, are there any action items we can extract from this?
I would suggest:

• Fix `fromValues`
• Improve documentation for new Table sources with example implementations
• update blog post to use non-deprecated methods/interfaces


Thank you for your advices and attention.
Best,
Lorenzo

[1] https://issues.apache.org/jira/browse/FLINK-20398
[2] 
https://github.com/apache/flink/pull/24471/commits/9f706dda924c679dff0ca53f7cfef9b546003472
[3] 
https://flink.apache.org/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-one/

Reply via email to