Hi Robert,
sorry, I should have been clearer in my initial mail. The two cases I was 
comparing are:

1) distinct() before Insert (which is necessary as we have a unique key 
constraint in our database), no distinct() before update
2) distinct() before insert AND distinct() before update

The test data used actually only contains unique values for the affected field 
though, so the dataset size is not reduced in case 2.

In case 1 the insert does not start until all the data has arrived at 
distinct() while the update is already going along (slowing down upstream 
operators as well). In case 2 both sinks wait for their respective distinct()'s 
(which is reached much faster now), then start roughly at the same time leading 
to a shorter net job time for job 2 as compared to 1.

A pause operator might be useful, yes.

The update should not be an inherently much more expensive operation, as the 
WHERE clause only contains the table's primary key.

Cheers,
Max
— 
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com * 0176 1000 
75 50
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 21.01.2016 um 15:57 schrieb Robert Metzger <rmetz...@apache.org>:
> 
> Hi Max,
> 
> is the distinct() operation reducing the size of the DataSet? If so, I assume 
> you have an idempotent update and the job is faster because fewer updates are 
> done?
> if the distinct() operator is not changing anything, then, the job might be 
> faster because the INSERT is done while Flink is still executing the 
> distinct() operation. So the insert is over when the updates are starting. 
> This would mean that concurrent inserts and updates on the database are much 
> slower than doing this sequentially.
> 
> I'm wondering if there is a way in Flink to explicitly ask for spilling an 
> intermediate operator to "pause" execution:
> 
> Source ----- > (spill for pausing) ---> (update sink)
>         \
>          ------- > (insert)
> 
> I don't have a lot of practical experience with RDBMS, but I guess updates 
> are slower because an index lookup + update is necessary. Maybe optimizing 
> the database configuration / schema / indexes is more promising. I think its 
> indeed much nicer to avoid any unnecessary steps in Flink.
> 
> Did you do any "microbenchmarks" for the update and insert part? I guess that 
> would help a lot to understand the impact of certain index structures, 
> batching sizes, or database drivers.
> 
> Regards,
> Robert
> 
> 
> 
> 
> On Thu, Jan 21, 2016 at 3:35 PM, Maximilian Bode <maximilian.b...@tngtech.com 
> <mailto:maximilian.b...@tngtech.com>> wrote:
> Hi everyone,
> 
> in a Flink (0.10.1) job with two JDBCOutputFormat sinks, one of them (doing a 
> database update) is performing slower than the other one (an insert). The job 
> as a whole is also slow as upstream operators are slowed down due to 
> backpressure. I am able to speed up the whole job by introducing an a priori 
> unnecessary .distinct(), which of course blocks downstream execution of the 
> slow sink, which in turn seems to be able to execute faster when given all 
> data at once.
> 
> Any ideas what is going on here? Is there something I can do without 
> introducing unnecessary computation steps?
> 
> Cheers,
> Max
> — 
> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com 
> <mailto:maximilian.b...@tngtech.com> * 0176 1000 75 50 
> <tel:0176%201000%2075%2050>
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
> 

Reply via email to