Hi all,
I’ve been playing around with a proof-of-concept application with Flink to
assist a colleague of mine. The application is fairly simple (take in a
single input and identify various attributes about it) with the goal of
outputting those to separate tables in Postgres:
object AttributeIdentificationJob {
@JvmStatic
fun main(args: Array<String>) {
val stream = StreamExecutionEnvironment.getExecutionEnvironment()
stream
.addSource(ReadFromKafka())
.process(IdentifyAttributesFunction())
.addSink(DynamicJdbcHere())
// execute program
stream.execute("Attribute Identification")
}
}
Considering my attributes may be of varying types (all implementing an
Attribute interface), I don't know if the existing JdbcSink functionality
or some variant of it (i.e. one of the dynamic ones that I see listed)
could handle this functionality. Essentially for a given "bundle" of
records, I'd need to ensure that each respective type of attribute was
upserted into its corresponding table within a Postgres database.
Is that something that the connector can handle on it's own? Or would I
need to implement my own RichSinkFunction<Collection<Attribute>> that could
handle opening a connection to Postgres and dynamically generating the
appropriate UPSERT statements to handle sending the records? As a follow up
to that, if I did need to write my own RichSinkFunction, would I need to
implement my own checkmarking for resilience purposes or does that come
along for the ride for RichSinkFunctions?
Any insight or approaches would be welcome!
Thanks,
Rion