Hey Rion, Regarding > Accessing Statement Execution / Results, There are no ways currently to get the update count from the database unfortunately.
As for the > Batching Mechanisms (withBatchIntervalMs & withBatchSize), These parameters should have "OR" semantics: the database should be updated whichever happens first. The decision to flush based on the current batch size is made in JdbcBatchingOutputFormat#writeRecord [1]. [1] https://github.com/apache/flink/blob/c0cf91ce6fbe55f9f1df1fb1626a50e79cb9fc50/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java#L166 Thanks from the community for using Flink and for your feedback =) Regards, Roman On Fri, Mar 19, 2021 at 9:05 PM Rion Williams <rionmons...@gmail.com> wrote: > > Hey all, > > I've been working with JdbcSink and it's really made my life much easier, but > I had two questions about it that folks might be able to answer or provide > some clarity around. > > Accessing Statement Execution / Results > > Is there any mechanism in place (or out of the box) to support reading the > results of statements executed by the JdbcSink or would I need to implement > my own to support it? > > The problem that I'm trying to solve relates to observability (i.e. metrics) > and incrementing specific counters based on the response from a given > statement executing. One example might be if I need to upsert 40 widgets that > are coming in, although some may be the same widget, I only want to increment > my metric if the widget didn't already exist, which I could get via the > response from the underlying queries. > > Batching Mechanisms (withBatchIntervalMs & withBatchSize) > > This was another great feature that I was happy to see since I didn't want to > handle writing my own windowing logic for something as trivial as this. I > noticed some odd behaviors when I attempted to implement this being driven by > configuration: > > private fun getJdbcExecutionOptions(parameters: ParameterTool): > JdbcExecutionOptions { > var executionOptions = JdbcExecutionOptions.builder() > if (parameters.getBoolean("database.batching.enabled", false)){ > if (parameters.has("database.batching.ms")){ > val batchingIntervalMs = > parameters.getLong("database.batching.ms") > executionOptions = executionOptions > .withBatchIntervalMs(batchingIntervalMs) > } > > if (parameters.has("database.batching.records")){ > val batchingRecords = > parameters.getInt("database.batching.records") > executionOptions = executionOptions > .withBatchSize(batchingRecords) > } > } > > return executionOptions.build() > } > > With the settings of 60000 (batchIntervalMs) and 100 (batchSize), it was > around 7-8 minutes prior to a write to the destination taking place, however > when previously just using the batchIntervalMs configuration, I'd see it > consistently write out one a minute. > > I was looking through the source and it seems the time-based emissions are > scheduled asynchronously. I may have missed something, but I didn't > explicitly see something where a records-based emission would affect the > scheduled emission. > > I'm just trying to get confirmation if these work together as an OR operation > (i.e. flush the pending records once a given number of records have been seen > or once a time interval has elasped). > > Thanks so much, you folks have been an incredible community in my short time > here and I've enjoyed working with Flink, contributing, and I hope to > continue to do much more! > > Rion