Hi Navneeth,
I need some clarifications to be able to help you.
First of all it would be useful to know if your topology is stateful,
i.e., if it has to maintain state. Since you have two topics with 72
partitions but only 72 tasks (or partitions groups to assign) that need
to be distributed over the nodes, I am assuming you have a join in your
topology which would make the topology (and the tasks) stateful. Is this
correct?
You say that one node has 3 data partitions assigned per stream thread
which would mean that a node gets assigned 18 tasks if 6 stream threads
are configured per node. I cannot see a node with 18 tasks assigned in
the assignment in your previous e-mail. Can you clarify?
Streams' task assignment algorithm tries to balance the work load and
tries to minimize the restoration of stateful tasks. Since it cannot
achieve both at the same time, Streams performs a couple of probing
rebalances every ten minutes (configurable with
probing.rebalance.interval.ms) and moves around tasks until the workload
is balanced. That means, it could be that the assignment is unbalanced
in the beginning and improves over time. To know if Streams still
performs probing rebalances, you can look into the log files and look
for INFO messages with the following content
"Decided on assignment: ... with followup probing rebalance."
and
"Decided on assignment: ... with no followup probing rebalance."
Did you see the assignment you posted in your previous e-mail after the
latter log message? If not, it could be that after the next probing
rebalances the distribution of the tasks improved.
Said that, keep in mind that Streams does not guarantee an optimal
distribution. The assignment is best-effort. Streams will try to do its
best to distribute the tasks over the nodes, but there are no guarantees.
Finally, the rulw of thumb is to use as many stream threads on each node
as the number of cores on the node to utilize the CPU efficiently.
Best,
Bruno
On 03.06.21 07:47, Navneeth Krishnan wrote:
This is how the assignment looks like after full restart. N represents node
and the second column is the number of partitions assigned. There are just
two input topics with equal partitions in the topology. I was expecting
each node to have 6 partitions assigned.
N1 7
N2 5
N3 7
N4 5
N5 7
N6 5
N7 7
N8 7
N9 7
N10 6
N11 5
N12 4
There are 72 partitions are here is the allocation.
0 N1
1 N2
2 N3
3 N4
4 N5
5 N6
6 N7
7 N8
8 N9
9 N10
10 N11
11 N12
12 N1
13 N5
14 N6
15 N7
16 N5
17 N9
18 N7
19 N8
20 N9
21 N10
22 N3
23 N12
24 N1
25 N7
26 N3
27 N4
28 N5
29 N6
30 N1
31 N2
32 N9
33 N10
34 N11
35 N7
36 N1
37 N9
38 N3
39 N4
40 N5
41 N2
42 N11
43 N8
44 N6
45 N10
46 N8
47 N9
48 N1
49 N2
50 N3
51 N4
52 N3
53 N5
54 N6
55 N8
56 N8
57 N10
58 N11
59 N12
60 N1
61 N2
62 N3
63 N4
64 N5
65 N7
66 N7
67 N8
68 N9
69 N10
70 N11
71 N12
On Wed, Jun 2, 2021 at 9:40 PM Navneeth Krishnan <reachnavnee...@gmail.com>
wrote:
We are using kafka version 2.6.1 on broker and 2.6.2 for streams.
Thanks
On Wed, Jun 2, 2021 at 7:18 PM Navneeth Krishnan <reachnavnee...@gmail.com>
wrote:
Hi All,
We recently migrated from flink to kafka streams in production and we are
facing a major issue. Any quick help would really be appreciated.
There are 72 input data topic partitions and 72 control stream topic
partitions. There is a minimum of 12 nodes with 6 streams threads on each
instance and we are using auto scaling based on CPU load. Also we do have
scenarios where the instances go down and it's replaced by a new instance.
Now the problem that we see is unequal partition allocation among
instances. For example one node has 3 data partitions allocated per stream
thread and the CPU on that node is about 80% whereas there is another node
in which only 4 stream threads have allocations and they are assigned with
one partition each.
Is there a way to equally distribute the partitions so that there will
not be a problem in processing the incoming data without much lag. In this
case some partitions have very high lag versus some in a few thousands.
This is impacting our production system.
Streams Configuration:
acceptable.recovery.lag = 10000
application.id = prod-v1
application.server = *.*.*.*:80
bootstrap.servers = [*]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 104857600
client.id =
commit.interval.ms = 10000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.key.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class
org.apache.kafka.streams.processor.WallclockTimestampExtractor
default.value.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.windowed.key.serde.inner = null
default.windowed.value.serde.inner = null
max.task.idle.ms = 0
max.warmup.replicas = 2
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 1
num.stream.threads = 6
partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
probing.rebalance.interval.ms = 600000
processing.guarantee = at_least_once
receive.buffer.bytes = 52428800
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 2
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /mnt/state
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000
Thanks,
Navneeth