This is how the assignment looks like after full restart. N represents node
and the second column is the number of partitions assigned. There are just
two input topics with equal partitions in the topology. I was expecting
each node to have 6 partitions assigned.

N1 7
N2 5
N3 7
N4 5
N5 7
N6 5
N7 7
N8 7
N9 7
N10 6
N11 5
N12 4


There are 72 partitions are here is the allocation.

0 N1
1 N2
2 N3
3 N4
4 N5
5 N6
6 N7
7 N8
8 N9
9 N10
10 N11
11 N12
12 N1
13 N5
14 N6
15 N7
16 N5
17 N9
18 N7
19 N8
20 N9
21 N10
22 N3
23 N12
24 N1
25 N7
26 N3
27 N4
28 N5
29 N6
30 N1
31 N2
32 N9
33 N10
34 N11
35 N7
36 N1
37 N9
38 N3
39 N4
40 N5
41 N2
42 N11
43 N8
44 N6
45 N10
46 N8
47 N9
48 N1
49 N2
50 N3
51 N4
52 N3
53 N5
54 N6
55 N8
56 N8
57 N10
58 N11
59 N12
60 N1
61 N2
62 N3
63 N4
64 N5
65 N7
66 N7
67 N8
68 N9
69 N10
70 N11
71 N12

On Wed, Jun 2, 2021 at 9:40 PM Navneeth Krishnan <reachnavnee...@gmail.com>
wrote:

> We are using kafka version 2.6.1 on broker and 2.6.2 for streams.
>
> Thanks
>
> On Wed, Jun 2, 2021 at 7:18 PM Navneeth Krishnan <reachnavnee...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> We recently migrated from flink to kafka streams in production and we are
>> facing a major issue. Any quick help would really be appreciated.
>>
>> There are 72 input data topic partitions and 72 control stream topic
>> partitions. There is a minimum of 12 nodes with 6 streams threads on each
>> instance and we are using auto scaling based on CPU load. Also we do have
>> scenarios where the instances go down and it's replaced by a new instance.
>>
>> Now the problem that we see is unequal partition allocation among
>> instances. For example one node has 3 data partitions allocated per stream
>> thread and the CPU on that node is about 80% whereas there is another node
>> in which only 4 stream threads have allocations and they are assigned with
>> one partition each.
>>
>> Is there a way to equally distribute the partitions so that there will
>> not be a problem in processing the incoming data without much lag. In this
>> case some partitions have very high lag versus some in a few thousands.
>> This is impacting our production system.
>>
>> Streams Configuration:
>>         acceptable.recovery.lag = 10000
>>         application.id = prod-v1
>>         application.server = *.*.*.*:80
>>         bootstrap.servers = [*]
>>         buffered.records.per.partition = 1000
>>         built.in.metrics.version = latest
>>         cache.max.bytes.buffering = 104857600
>>         client.id =
>>         commit.interval.ms = 10000
>>         connections.max.idle.ms = 540000
>>         default.deserialization.exception.handler = class
>> org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
>>         default.key.serde = class
>> org.apache.kafka.common.serialization.Serdes$ByteArraySerde
>>         default.production.exception.handler = class
>> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
>>         default.timestamp.extractor = class
>> org.apache.kafka.streams.processor.WallclockTimestampExtractor
>>         default.value.serde = class
>> org.apache.kafka.common.serialization.Serdes$ByteArraySerde
>>         default.windowed.key.serde.inner = null
>>         default.windowed.value.serde.inner = null
>>         max.task.idle.ms = 0
>>         max.warmup.replicas = 2
>>         metadata.max.age.ms = 300000
>>         metric.reporters = []
>>         metrics.num.samples = 2
>>         metrics.recording.level = INFO
>>         metrics.sample.window.ms = 30000
>>         num.standby.replicas = 1
>>         num.stream.threads = 6
>>         partition.grouper = class
>> org.apache.kafka.streams.processor.DefaultPartitionGrouper
>>         poll.ms = 100
>>         probing.rebalance.interval.ms = 600000
>>         processing.guarantee = at_least_once
>>         receive.buffer.bytes = 52428800
>>         reconnect.backoff.max.ms = 1000
>>         reconnect.backoff.ms = 50
>>         replication.factor = 2
>>         request.timeout.ms = 40000
>>         retries = 0
>>         retry.backoff.ms = 100
>>         rocksdb.config.setter = null
>>         security.protocol = PLAINTEXT
>>         send.buffer.bytes = 131072
>>         state.cleanup.delay.ms = 600000
>>         state.dir = /mnt/state
>>         topology.optimization = none
>>         upgrade.from = null
>>         windowstore.changelog.additional.retention.ms = 86400000
>>
>> Thanks,
>> Navneeth
>>
>>

Reply via email to