Do you use a keyBy() between the source and the window operator? One think I can think of is the following:
- With the higher source parallelism, you have more logical connections (each source rebalances across all window operators). - with source parallelism 20, you have 20 * 160 = 3200 logical connections (fewer physical TCP connections, because Flink multiplexes) - with source parallelism 40, you have 40 * 160 = 6400 logical connections. With more logical connections, you need more network buffers. While you seem to have enough buffers to make the 6400 connections work, it may be just a bit to little to balance out some short lived skew/latency effects in the network. With the 3200 connections, each connection can claim twice the number of buffers, giving it more elasticity to balance out network latency effects. I would try to double the number of network buffers for the case where the source has a parallelism of 40, and see if that helps. Greetings, Stephan On Wed, Aug 3, 2016 at 12:07 PM, Stephan Ewen <[email protected]> wrote: > Hi! > > Are you running on ProcessingTime or on EventTime? > > Thanks, > Stephan > > > On Wed, Aug 3, 2016 at 11:57 AM, neo21 zerro <[email protected]> > wrote: > >> Hi guys, >> >> Thanks for getting back to me. >> >> So to clarify: >> Topology wise flink kafka source (does avro deserialization and small >> map) -> window operator which does batching for 3 seconds -> hbase sink >> >> Experiments: >> >> 1. flink source: parallelism 40 (20 idle tasks) -> window operator: >> parallelism 160 -> hbase sink: parallelism 160 >> - roughly 10.000 requests/sec on hbase >> 2. flink source: parallelism 20 -> window operator: parallelism 160 -> >> hbase sink: parallelism 160 >> - roughly 100.000 requests/sec on hbase (10x more) >> >> @Stephan as described below the parallelism of the sink was kept the >> same. I agree with you that there is nothing to backpressue on the source >> ;) However, my understanding right now is that only backpressure can be the >> explanation for this situation. Since we just change the source >> parallelism, other things like hbase parallelism are kept the same. >> >> @Sameer all of those things are valid points. We make sure that we reduce >> the row locking by partitioning the data on the hbase sinks. We are just >> after why this limitation is happening. And since the same setup is used >> but just the source parallelism is changed I don't expect this to be a >> hbase issue. >> >> Thanks guys! >> >> >> >> On Wednesday, August 3, 2016 11:38 AM, Sameer Wadkar <[email protected]> >> wrote: >> What is the parallelism of the sink or the operator which writes to the >> sinks in the first case. HBase puts are constrained by the following: >> 1. How your regions are distributed. Are you pre-splitting your regions >> for the table. Do you know the number of regions your Hbase tables are >> split into. >> 2. Are all the sinks writing to all the regions. Meaning are you getting >> records in the sink operator which could potentially go to any region. This >> can become a big bottleneck if you have 40 sinks talking to all regions. I >> pre-split my regions based on key salting and use custom partitioning to >> ensure each sink operator writes to only a few regions and my performance >> shot up by several orders. >> 3. You are probably doing this but adding puts in batches helps in >> general but having each batch contain puts for too many regions hurts. >> >> If the source parallelism is the same as the parallelism of other >> operators the 40 sinks communicating to all regions might be a problem. >> When you go down to 20 sinks you actually might be getting better >> performance due to lesser resource contention on HBase. >> >> Sent from my iPhone >> >> >> > On Aug 3, 2016, at 4:14 AM, neo21 zerro <[email protected]> wrote: >> > >> > Hello everybody, >> > >> > I'm using Flink Kafka consumer 0.8.x with kafka 0.8.2 and flink 1.0.3 >> on YARN. >> > In kafka I have a topic which have 20 partitions and my flink topology >> reads from kafka (source) and writes to hbase (sink). >> > >> > when: >> > 1. flink source has parallelism set to 40 (20 of the tasks are >> idle) I see 10.000 requests/sec on hbase >> > 2. flink source has parallelism set to 20 (exact number of >> partitions) I see 100.0000 requests/sec on hbase (so a 10x improvement) >> > >> > >> > It's clear that hbase is not the limiting factor in my topology. >> > Assumption: Flink backpressure mechanism kicks in in the 1. case more >> aggressively and it's limiting the ingestion of tuples in the topology. >> > >> > The question: In the first case, why are those 20 sources which are >> sitting idle contributing so much to the backpressure? >> > >> > >> > Thanks guys! >> > >
