Hi, devs. I want to reopen this discussion because some questions have been solved or need more discussions.
In the previous discussion, there were some questions and problems. @Timo 1.about option prefix, we decide to use identifiers. e.g. ``` create table hybrid_source( f0 varchar, f1 varchar, f2 bigint ) with( 'connector'='hybrid', 'source-identifiers'='historical,realtime', 'historical.connector'='filesystem' 'historical.path' = '/tmp/a.csv', 'historical.format' = 'csv', 'realtime.connector'='kafka', 'realtime.topic' = 'test', 'realtime.properties.bootstrap.servers' = 'localhost:9092', 'realtime.properties.group.id' = 'test', 'realtime.scan.startup.mode' = 'earliest-offset', 'realtime.format' = 'csv' ); ``` @Martijn Visser <martijnvis...@apache.org> 1.table api usage I updated the FLIP about table api usage. 2.how dynamic switched start timestamp works? In this FLIP, we introduce 2 interfaces to support it. If we open switched-start-position-enabled try to use dynamic switched start timestamp, then first source split numerator needs to implement SupportsGetEndTimestamp, next source needs to implement SupportsSwitchedStartTimestamp. We use SupportsGetEndTimestamp and SupportsSwitchedStartTimestamp to get the previous bounded source end timestamp and apply it to the next streaming source. @John Roesler 1.source handoff We both support Fixed-Start-Position And Switched-start-Position. The default is Fixed-Start-Position. Use option switched-start-position-enabled to control it. In Fixed-Start-Position, the next streaming source uses its own startup strategy, e.g. in kafka, we use predefined kafka scan.startup.mode in user sql. In Switched-start-Position, this is the same question as `how dynamic switched start timestamp works` from @Martijn above. We offer SupportsGetEndTimestamp interface to extract first source split enumerator endTimestamp and pass it to the next source. and Next source uses SupportsSwitchedStartTimestamp to apply it. 2.more child sources Yes, this is consistent with the hybrid source datastream api, there is no limit on the number of children sources. e.g. this is a 3 source case below. ``` create table hybrid_source( f0 varchar, f1 varchar, f2 bigint ) with( 'connector'='hybrid', 'source-identifiers'='historical01,historical02,realtime', 'historical01.connector'='filesystem' 'historical01.path' = '/tmp/a.csv', 'historical01.format' = 'csv', 'historical02.connector'='filesystem' 'historical02.path' = '/tmp/a.csv', 'historical02.format' = 'csv', 'realtime.connector'='kafka', 'realtime.topic' = 'test', 'realtime.properties.bootstrap.servers' = 'localhost:9092', 'realtime.properties.group.id' = 'testGroup', 'realtime.scan.startup.mode' = 'earliest-offset', 'realtime.format' = 'csv' ); ``` more details can be found at [1] & [2]. Looking forward to your more concerns and opinions. 1.https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225 2.https://github.com/apache/flink/pull/21841 Best Regards, Ran Tao Ran Tao <chucheng...@gmail.com> 于2022年12月15日周四 16:02写道: > Hi guys. HybridSource is a good feature, but now released version did not > support table & sql api for a long time. > > I have wrote a discussed FLIP. > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225 > > Sorry for my unclear subject of previous email, so here i have copied the > respond from the Timo and sent this email. look forward to your comments. > > ``` > Hi Ran, > > Thanks for proposing a FLIP. Btw according to the process, the subject > of this email should be `[DISCUSS] FLIP-278: Hybrid Source Connector` so > that people can identify this discussion as a FLIP discussion. > > Supporting the hybrid source for SQL was a long-standing issue on our > roadmap. Happy to give feedback here: > > 1) Options > > Coming up with stable long-term options should be a shared effort. > Having an index as a key could cause unintended side effects if the > index is not correctly chosen, I would suggest we use IDs instead. > > What do you think about the following structure? > > CREATE TABLE ... WITH ( > 'sources'='historical;realtime', -- Config option of type string list > 'historical.connector' = 'filesystem', > 'historical.path' = '/tmp/a.csv', > 'historcal.format' = 'csv', > 'realtime.path' = '/tmp/b.csv', > 'realtime.format' = 'csv'" > ) > > I would limit the IDs to simple [a-z0-9_] identifiers. Once we support > metadata columns, we can also propagate these IDs easily. > > 2) Schema field mappings > > The FLIP mentions `schema-field-mappings` could you elaborate on this in > the document? > > 3) Start position strategies > > Have you thought about how we can represent start position strategies. > The FLIP is very minimal but it would be nice to at least hear some > opinions on this topic. Maybe we can come up with some general strategy > that makes the most common use case possible in the near future. > > Thanks, > Timo > ``` > > -- > Best Regards, > Ran Tao > https://github.com/chucheng92 >