Hi Austin, That is exactly what I want. Is it possible to use JdbcTableSource as the first source? Looks like only FileSource can be used as the first source? Below is the error.
val jdbcSource = JdbcTableSource.builder() .setOptions(options) .setReadOptions(readOptions) .setLookupOptions(lookupOptions) .setSchema(schema) .build() val hybridSource = HybridSource.builder(jdbcSource) .addSource(kafkaSource) .build(); [image: Screen Shot 2021-11-05 at 10.41.59 PM.png] [image: Screen Shot 2021-11-05 at 10.42.13 PM.png] On Fri, Nov 5, 2021 at 5:11 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Hey Qihua, > > If I understand correctly, you should be able to with the HybridSource, > released in 1.14 [1] > > Best, > Austin > > [1]: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/ > > On Fri, Nov 5, 2021 at 3:53 PM Qihua Yang <yang...@gmail.com> wrote: > >> Hi, >> >> Our stream has two sources. one is a Kafka topic, one is a database. Is >> it possible to consume from kafka topic only after DB scan is completed? We >> configured it in batch mode. >> >> Thanks, >> Qihua >> >