
We are using the GenericWriteAheadSink to buffer up values to then send to a 
SQL Server database with a fast bulk copy upload. However, when I watch my 
process running it seems to be a huge amount of time iterating the Iterable<T> 
provided to the sendValues() method. It takes such a long time I’ve had to 
increase the checkpoint timeout because it causes the whole workflow to suspend.

I am using Flink 1.14.0 and have attached a simple, self-contained example. If 
I was to guess then there is a very large deserialization overhead from the 
checkpointed data even though I’m currently using a HashMapStateBackend. I have 
profiled the application and it definitely seems to spend most of its time 
there. The object involved is just a plain POJO.

A second “issue” is that I am forced to clone the objects provided by the 
iterator – when I dug into the code I could see a 
ReusingMutableToRegularIteratorWrapper class being using and the objects passed 
were being reused between 2 objects. I don’t know the reasoning behind this 
(except to prevent extra garbage?) but it would be nice if I could specify a 
“non-reusing” one otherwise there is a deserialization AND a clone for every 
object in the list.

Any pointers or advice on a better way to send large amounts of data to a SQL 
Server sink would be appreciated.


Attachment: SlowBulkCopySinkMain.java
Description: SlowBulkCopySinkMain.java

Reply via email to