Hello Kurt, Assuming that your sink is blocking, I would first make sure that it is not chained with the preceding operators. Otherwise, the same thread will output data and perform windowing/triggering. You can add disableChaining after addSink to prevent this [1].
Besides that, you probably could use existing JDBC batching functionality by configuring JdbcExecutionOptions [2] and providing it to JdbcSink.sink() [3]. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups [2] https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.Builder.html#withBatchSize-int- [3] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html Regards, Roman On Thu, Apr 29, 2021 at 6:12 PM Kurtis Walker <kurtis.wal...@sugarcrm.com> wrote: > > Hello, > > I’m building a POC for a Flink app that loads data from Kafka in to a > Greenplum data warehouse. I’ve built a custom sink operation that will bulk > load a number of rows. I’m using a global window, triggering on number of > records, to collect the rows for each sink. I’m finding that while the sink > is running, the previous operations of my app stop processing messages until > the sink operation completes. I guess this is the backpressure logic kicking > in. The cost being I get about 60% of the throughput that is theoretically > possible. Is there any configuration that would let me control that > backpressure so that Flink will buffer rows when it encounters backpressure? > In the ideal world when a sink operation completes, the next batch of rows is > ready for the sink to pick up immediately. Here’s my code: > > > > env.addSource(new FlinkKafkaConsumer011<>(CXP_MARKET_EXAMPLE, new > SugarDeserializer(), localKafkaProperties)) > > .keyBy((KeySelector<Envelope, String>) value -> > value.getPayload().getSource().getTable()) > > .window(GlobalWindows.create()) > > > .trigger(PurgingTrigger.of(ProcessingTimeoutTrigger.of(CountTrigger.of(5000), > Duration.of(1, ChronoUnit.MINUTES)))) > > .aggregate(new ListAggregator()) > > .addSink(new CopySink(new > SimpleJdbcConnectionProvider(localConnectionOptions))).name("copysink"); > > > > > > Thanks! > > > > Kurt > >