Hi devs,

I think for this approach to work, the internal record schema generated by 
Flink must be exactly the same for batch and stream records, because at runtime 
Flink will use the same serializer to send them downstream. However, it’s not 
always the case, because in batch mode Flink’s optimizer may realize that some 
fields are never actually used, so the records will not contain those fields. 
Such optimizations may not be done in the streaming mode, so records coming 
from the realtime source will have more fields. In that case, after switching 
to the realtime source, the job will fail, because record serializer expects 
records with the batch schema, but instead receives records with more fields 
and doesn’t know how to serialize them.     

Consider the following DDL:
CREATE TABLE hybrid_table
(
    trade ROW(
        `openTime` BIGINT,
        `closeTime` BIGINT),
    server  STRING,
    tradeTime as to_timestamp(from_unixtime(trade.openTime)),
    WATERMARK FOR tradeTime AS tradeTime - INTERVAL '1' MINUTE
)
    WITH (
        'connector' = 'hybrid',
        'source-identifiers' = 'historical,realtime',
        'historical.connector' = 'filesystem',
        'historical.path' = 's3://path.to.daa',
        'historical.format' = 'json',
        'realtime.connector' = 'kafka',
        'realtime.topic' = 'trades',
        'realtime.properties.bootstrap.servers' = '...',
        'realtime.properties.group.id <http://realtime.properties.group.id/>' = 
'flink.tv <http://flink.tv/>',
        'realtime.format' = 'json',
        'realtime.scan.startup.mode' = 'earliest-offset'
        )
This query will fail:

select server from hybrid_table

But this query will work:

select * from hybrid_table

In the first query internal records in the batch source will only have 2 
fields: server and trade. But in the streaming source they will have all the 
fields described in the schema. When switching to the realtime source, the job 
fails because record serializer expects records with the same schema as in the 
batch source. The IllegalArgumentException happens here 
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/CopyingChainingOutput.java#L74>,
 saying “Row arity: 4, but serializer arity: 2"

The second example works because all fields are accessed in the SQL query, so 
Flink doesn’t remove any of them from the internal records in batch, and record 
schemas in batch and streaming match exactly.

__
Best regards,
Ilya Soin

On 2023/05/09 07:09:53 Ran Tao wrote:
> Hi, devs.
> 
> I don't know if you have any other considerations for this FLIP. All
> discussions are welcome.
> If there are no other opinions in the near days, I will try to initiate a
> vote. thank you all.
> 
> 
> Best Regards,
> Ran Tao
> 
> 
> Ran Tao <ch...@gmail.com> 于2023年4月10日周一 15:33写道:
> 
> > Hi, devs. I want to reopen this discussion because some questions have
> > been solved or need more discussions.
> >
> > In the previous discussion, there were some questions and problems.
> >
> > @Timo
> > 1.about option prefix, we decide to use identifiers. e.g.
> >
> > ```
> > create table hybrid_source(
> >  f0 varchar,
> >  f1 varchar,
> >  f2 bigint
> > ) with(
> >  'connector'='hybrid',
> >  'source-identifiers'='historical,realtime',
> >  'historical.connector'='filesystem'
> >  'historical.path' = '/tmp/a.csv',
> >  'historical.format' = 'csv',
> >  'realtime.connector'='kafka',
> >  'realtime.topic' = 'test',
> >  'realtime.properties.bootstrap.servers' = 'localhost:9092',
> >  'realtime.properties.group.id' = 'test',
> >  'realtime.scan.startup.mode' = 'earliest-offset',
> >  'realtime.format' = 'csv'
> > );
> > ```
> >
> > @Martijn Visser <ma...@apache.org>
> > 1.table api usage
> >
> > I updated the FLIP about table api usage.
> >
> > 2.how dynamic switched start timestamp works?
> >
> > In this FLIP, we introduce 2 interfaces to support it.
> > If we open switched-start-position-enabled try to use dynamic switched
> > start timestamp, then first source split numerator needs to
> > implement SupportsGetEndTimestamp, next source needs to
> > implement SupportsSwitchedStartTimestamp.
> > We use SupportsGetEndTimestamp and SupportsSwitchedStartTimestamp to get
> > the previous bounded source end timestamp and apply it to the next
> > streaming source.
> >
> > @John Roesler
> > 1.source handoff
> >
> > We both support Fixed-Start-Position And Switched-start-Position. The
> > default is Fixed-Start-Position. Use option switched-start-position-enabled
> > to control it.
> > In Fixed-Start-Position, the next streaming source uses its own startup
> > strategy, e.g. in kafka, we use predefined kafka scan.startup.mode in user
> > sql.
> > In Switched-start-Position, this is the same question as `how dynamic
> > switched start timestamp works` from @Martijn above. We offer
> > SupportsGetEndTimestamp interface to extract first source split enumerator
> > endTimestamp
> > and pass it to the next source. and Next source uses
> > SupportsSwitchedStartTimestamp to apply it.
> >
> > 2.more child sources
> >
> > Yes, this is consistent with the hybrid source datastream api, there is no
> > limit on the number of children sources.
> > e.g. this is a 3 source case below.
> >
> > ```
> > create table hybrid_source(
> >  f0 varchar,
> >  f1 varchar,
> >  f2 bigint
> > ) with(
> >  'connector'='hybrid',
> >  'source-identifiers'='historical01,historical02,realtime',
> >  'historical01.connector'='filesystem'
> >  'historical01.path' = '/tmp/a.csv',
> >  'historical01.format' = 'csv',
> >  'historical02.connector'='filesystem'
> >  'historical02.path' = '/tmp/a.csv',
> >  'historical02.format' = 'csv',
> >  'realtime.connector'='kafka',
> >  'realtime.topic' = 'test',
> >  'realtime.properties.bootstrap.servers' = 'localhost:9092',
> >  'realtime.properties.group.id' = 'testGroup',
> >  'realtime.scan.startup.mode' = 'earliest-offset',
> >  'realtime.format' = 'csv'
> > );
> > ```
> >
> > more details can be found at [1] & [2].
> > Looking forward to your more concerns and opinions.
> >
> > 1.
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225
> > 2.https://github.com/apache/flink/pull/21841
> >
> > Best Regards,
> > Ran Tao
> >
> > Ran Tao <ch...@gmail.com> 于2022年12月15日周四 16:02写道:
> >
> >> Hi guys. HybridSource is a good feature, but now released version did not
> >> support table & sql api for a long time.
> >>
> >> I have wrote a discussed FLIP.
> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235836225
> >>
> >> Sorry for my unclear subject of previous email,  so here i have copied
> >> the respond from the Timo and sent this email.  look forward to your
> >> comments.
> >>
> >> ```
> >> Hi Ran,
> >>
> >> Thanks for proposing a FLIP. Btw according to the process, the subject
> >> of this email should be `[DISCUSS] FLIP-278: Hybrid Source Connector` so
> >> that people can identify this discussion as a FLIP discussion.
> >>
> >> Supporting the hybrid source for SQL was a long-standing issue on our
> >> roadmap. Happy to give feedback here:
> >>
> >> 1) Options
> >>
> >> Coming up with stable long-term options should be a shared effort.
> >> Having an index as a key could cause unintended side effects if the
> >> index is not correctly chosen, I would suggest we use IDs instead.
> >>
> >> What do you think about the following structure?
> >>
> >> CREATE TABLE ... WITH (
> >>    'sources'='historical;realtime',   -- Config option of type string list
> >>    'historical.connector' = 'filesystem',
> >>    'historical.path' = '/tmp/a.csv',
> >>    'historcal.format' = 'csv',
> >>    'realtime.path' = '/tmp/b.csv',
> >>    'realtime.format' = 'csv'"
> >> )
> >>
> >> I would limit the IDs to simple [a-z0-9_] identifiers. Once we support
> >> metadata columns, we can also propagate these IDs easily.
> >>
> >> 2) Schema field mappings
> >>
> >> The FLIP mentions `schema-field-mappings` could you elaborate on this in
> >> the document?
> >>
> >> 3) Start position strategies
> >>
> >> Have you thought about how we can represent start position strategies.
> >> The FLIP is very minimal but it would be nice to at least hear some
> >> opinions on this topic. Maybe we can come up with some general strategy
> >> that makes the most common use case possible in the near future.
> >>
> >> Thanks,
> >> Timo
> >> ```
> >>
> >> --
> >> Best Regards,
> >> Ran Tao
> >> https://github.com/chucheng92
> >>
> >
>  

Reply via email to