Rion, A given JdbcSink can only write to one table, but if the number of tables involved isn't unreasonable, you could use a separate sink for each table, and use side outputs [1] from a process function to steer each record to the appropriate sink.
I suggest you avoid trying to implement a sink. In general, custom sinks need to implement their own checkpointing, though there is a generic two phase commit sink you can use as a starting point for implementing a transactional sink. FYI, the JDBC sink has been reworked for 1.13 to include exactly-once guarantees based on the XA standard [2]. Regards, David [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html [2] https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink On Fri, Mar 5, 2021 at 7:34 PM Rion Williams <rionmons...@gmail.com> wrote: > Hi all, > > I’ve been playing around with a proof-of-concept application with Flink to > assist a colleague of mine. The application is fairly simple (take in a > single input and identify various attributes about it) with the goal of > outputting those to separate tables in Postgres: > > object AttributeIdentificationJob { > @JvmStatic > fun main(args: Array<String>) { > val stream = StreamExecutionEnvironment.getExecutionEnvironment() > > stream > .addSource(ReadFromKafka()) > .process(IdentifyAttributesFunction()) > .addSink(DynamicJdbcHere()) > > // execute program > stream.execute("Attribute Identification") > } > } > > Considering my attributes may be of varying types (all implementing an > Attribute interface), I don't know if the existing JdbcSink functionality > or some variant of it (i.e. one of the dynamic ones that I see listed) > could handle this functionality. Essentially for a given "bundle" of > records, I'd need to ensure that each respective type of attribute was > upserted into its corresponding table within a Postgres database. > > Is that something that the connector can handle on it's own? Or would I > need to implement my own RichSinkFunction<Collection<Attribute>> that > could handle opening a connection to Postgres and dynamically generating > the appropriate UPSERT statements to handle sending the records? As a > follow up to that, if I did need to write my own RichSinkFunction, would I > need to implement my own checkmarking for resilience purposes or does that > come along for the ride for RichSinkFunctions? > > Any insight or approaches would be welcome! > > Thanks, > > Rion >