Hello Qihua, This will require you to implement and maintain your own database insertion logic using any of the clients that your database and programming language supports. Bear in mind that you will be losing all the optimizations Flink's connector provides for you and this will add complexity to the amount of the code you will have to maintain. On the other hand it will handle the case within one job.
If you have more control on the things you can do with your database, and the latency to kafka is not a major issue since there will be more moving parts, then what @Francesco Guardiani <france...@ververica.com> suggested is also a good approach. You will need to maintain more systems, i.e. Debezium, but less custom code. Therefore, it is mostly up to your requirements and available resources you have on how to proceed. Sincerely, Ali Bahadir Zeybek On Wed, Nov 3, 2021 at 10:13 PM Qihua Yang <yang...@gmail.com> wrote: > Many thanks guys! > Hi Ali, for approach 2, what is the better way to do the database inserts > for this case? Currently we simply use JDBC SQL connector to sink to > database. > > Thanks, > Qihua > > On Wed, Nov 3, 2021 at 8:13 AM Ali Bahadir Zeybek <a...@ververica.com> > wrote: > >> Hello Qihua, >> >> If you do not care with the events that are not committed to DB, >> you can use Async I/O [1] and implement a logic that >> >> - does the database inserts >> - completes the original events that are only accepted by DB >> >> You can then sink this new datastream to kafka. >> >> If you are also interested in the events that are not committed to DB, >> you can use a Process Function [2] and implement a logic that >> >> - does the database inserts >> - collects the original events that are only accepted by DB >> - sends the ones that are not accepted by DB to a side output >> >> You can then sink this new datastream to kafka and maybe sideoutput to >> another topic. >> >> Sincerely, >> >> Ali Bahadir Zeybek >> >> [1]: >> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio >> [2]: >> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function >> >> On Wed, Nov 3, 2021 at 3:16 PM Francesco Guardiani < >> france...@ververica.com> wrote: >> >>> An alternative is to use a CDC tool like Debezium to stream your table >>> changes, and then ingest that stream using Flink to push data later to >>> Kafka. >>> >>> On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma <guowei....@gmail.com> wrote: >>> >>>> Hi, Qihua >>>> >>>> AFAIK there is no way to do it. Maybe you need to implement a "new" >>>> sink to archive this target. >>>> >>>> Best, >>>> Guowei >>>> >>>> >>>> On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang <yang...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> Our flink application has two sinks(DB and kafka topic). We want to >>>>> push same data to both sinks. Is it possible to push data to kafka topic >>>>> only after data is pushed to DB successfully? If the commit to DB fail, we >>>>> don't want those data is pushed to kafka. >>>>> >>>>> Thanks, >>>>> Qihua >>>>> >>>>