Thanks Roman. I had already tried disableChaining, it didn’t have any effect. The built in JDBC sink is really slow compared to a bulk load(close to 100x), but I had tested that and saw the same issue. When a given message triggers the JDBC sink to write a batch, everything else waits for it.
From: Roman Khachatryan <ro...@apache.org> Date: Thursday, April 29, 2021 at 2:49 PM To: Kurtis Walker <kurtis.wal...@sugarcrm.com> Cc: user@flink.apache.org <user@flink.apache.org> Subject: Re: Backpressure configuration EXTERNAL EMAIL Hello Kurt, Assuming that your sink is blocking, I would first make sure that it is not chained with the preceding operators. Otherwise, the same thread will output data and perform windowing/triggering. You can add disableChaining after addSink to prevent this [1]. Besides that, you probably could use existing JDBC batching functionality by configuring JdbcExecutionOptions [2] and providing it to JdbcSink.sink() [3]. [1] https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/*task-chaining-and-resource-groups__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101s4Xc0Ef$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/*task-chaining-and-resource-groups__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101s4Xc0Ef$> [2] https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.Builder.html*withBatchSize-int-__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101mcyPBdT$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.Builder.html*withBatchSize-int-__;Iw!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101mcyPBdT$> [3] https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html__;!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101gZKXATg$<https://urldefense.com/v3/__https:/ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html__;!!P-5o2buULA!i9GwPpZZ6ouLOx3yaHOC3lbyCh4XMrcNjlW_PgQm5G5Ds7PX409kpvwkvf101gZKXATg$> Regards, Roman On Thu, Apr 29, 2021 at 6:12 PM Kurtis Walker <kurtis.wal...@sugarcrm.com> wrote: > > Hello, > > I’m building a POC for a Flink app that loads data from Kafka in to a > Greenplum data warehouse. I’ve built a custom sink operation that will bulk > load a number of rows. I’m using a global window, triggering on number of > records, to collect the rows for each sink. I’m finding that while the sink > is running, the previous operations of my app stop processing messages until > the sink operation completes. I guess this is the backpressure logic kicking > in. The cost being I get about 60% of the throughput that is theoretically > possible. Is there any configuration that would let me control that > backpressure so that Flink will buffer rows when it encounters backpressure? > In the ideal world when a sink operation completes, the next batch of rows is > ready for the sink to pick up immediately. Here’s my code: > > > > env.addSource(new FlinkKafkaConsumer011<>(CXP_MARKET_EXAMPLE, new > SugarDeserializer(), localKafkaProperties)) > > .keyBy((KeySelector<Envelope, String>) value -> > value.getPayload().getSource().getTable()) > > .window(GlobalWindows.create()) > > > .trigger(PurgingTrigger.of(ProcessingTimeoutTrigger.of(CountTrigger.of(5000), > Duration.of(1, ChronoUnit.MINUTES)))) > > .aggregate(new ListAggregator()) > > .addSink(new CopySink(new > SimpleJdbcConnectionProvider(localConnectionOptions))).name("copysink"); > > > > > > Thanks! > > > > Kurt > >