Hello Kurt,

Assuming that your sink is blocking, I would first make sure that it
is not chained with the preceding operators. Otherwise, the same
thread will output data and perform windowing/triggering.
You can add disableChaining after addSink to prevent this [1].

Besides that, you probably could use existing JDBC batching
functionality by configuring JdbcExecutionOptions [2] and providing it
to JdbcSink.sink() [3].

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.Builder.html#withBatchSize-int-
[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/jdbc.html

Regards,
Roman

On Thu, Apr 29, 2021 at 6:12 PM Kurtis Walker
<kurtis.wal...@sugarcrm.com> wrote:
>
> Hello,
>
>   I’m building a POC for a Flink app that loads data from Kafka in to a 
> Greenplum data warehouse.  I’ve built a custom sink operation that will bulk 
> load a number of rows.  I’m using a global window, triggering on number of 
> records, to collect the rows for each sink.  I’m finding that while the sink 
> is running, the previous operations of my app stop processing messages until 
> the sink operation completes.  I guess this is the backpressure logic kicking 
> in.  The cost being I get about 60% of the throughput that is theoretically 
> possible.  Is there any configuration that would let me control that 
> backpressure so that Flink will buffer rows when it encounters backpressure?  
> In the ideal world when a sink operation completes, the next batch of rows is 
> ready for the sink to pick up immediately.  Here’s my code:
>
>
>
>         env.addSource(new FlinkKafkaConsumer011<>(CXP_MARKET_EXAMPLE, new 
> SugarDeserializer(), localKafkaProperties))
>
>                 .keyBy((KeySelector<Envelope, String>) value -> 
> value.getPayload().getSource().getTable())
>
>                 .window(GlobalWindows.create())
>
>                 
> .trigger(PurgingTrigger.of(ProcessingTimeoutTrigger.of(CountTrigger.of(5000), 
> Duration.of(1, ChronoUnit.MINUTES))))
>
>                 .aggregate(new ListAggregator())
>
>                 .addSink(new CopySink(new 
> SimpleJdbcConnectionProvider(localConnectionOptions))).name("copysink");
>
>
>
>
>
> Thanks!
>
>
>
> Kurt
>
>

Reply via email to