This is how the assignment looks like after full restart. N represents node and the second column is the number of partitions assigned. There are just two input topics with equal partitions in the topology. I was expecting each node to have 6 partitions assigned.
N1 7 N2 5 N3 7 N4 5 N5 7 N6 5 N7 7 N8 7 N9 7 N10 6 N11 5 N12 4 There are 72 partitions are here is the allocation. 0 N1 1 N2 2 N3 3 N4 4 N5 5 N6 6 N7 7 N8 8 N9 9 N10 10 N11 11 N12 12 N1 13 N5 14 N6 15 N7 16 N5 17 N9 18 N7 19 N8 20 N9 21 N10 22 N3 23 N12 24 N1 25 N7 26 N3 27 N4 28 N5 29 N6 30 N1 31 N2 32 N9 33 N10 34 N11 35 N7 36 N1 37 N9 38 N3 39 N4 40 N5 41 N2 42 N11 43 N8 44 N6 45 N10 46 N8 47 N9 48 N1 49 N2 50 N3 51 N4 52 N3 53 N5 54 N6 55 N8 56 N8 57 N10 58 N11 59 N12 60 N1 61 N2 62 N3 63 N4 64 N5 65 N7 66 N7 67 N8 68 N9 69 N10 70 N11 71 N12 On Wed, Jun 2, 2021 at 9:40 PM Navneeth Krishnan <reachnavnee...@gmail.com> wrote: > We are using kafka version 2.6.1 on broker and 2.6.2 for streams. > > Thanks > > On Wed, Jun 2, 2021 at 7:18 PM Navneeth Krishnan <reachnavnee...@gmail.com> > wrote: > >> Hi All, >> >> We recently migrated from flink to kafka streams in production and we are >> facing a major issue. Any quick help would really be appreciated. >> >> There are 72 input data topic partitions and 72 control stream topic >> partitions. There is a minimum of 12 nodes with 6 streams threads on each >> instance and we are using auto scaling based on CPU load. Also we do have >> scenarios where the instances go down and it's replaced by a new instance. >> >> Now the problem that we see is unequal partition allocation among >> instances. For example one node has 3 data partitions allocated per stream >> thread and the CPU on that node is about 80% whereas there is another node >> in which only 4 stream threads have allocations and they are assigned with >> one partition each. >> >> Is there a way to equally distribute the partitions so that there will >> not be a problem in processing the incoming data without much lag. In this >> case some partitions have very high lag versus some in a few thousands. >> This is impacting our production system. >> >> Streams Configuration: >> acceptable.recovery.lag = 10000 >> application.id = prod-v1 >> application.server = *.*.*.*:80 >> bootstrap.servers = [*] >> buffered.records.per.partition = 1000 >> built.in.metrics.version = latest >> cache.max.bytes.buffering = 104857600 >> client.id = >> commit.interval.ms = 10000 >> connections.max.idle.ms = 540000 >> default.deserialization.exception.handler = class >> org.apache.kafka.streams.errors.LogAndContinueExceptionHandler >> default.key.serde = class >> org.apache.kafka.common.serialization.Serdes$ByteArraySerde >> default.production.exception.handler = class >> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler >> default.timestamp.extractor = class >> org.apache.kafka.streams.processor.WallclockTimestampExtractor >> default.value.serde = class >> org.apache.kafka.common.serialization.Serdes$ByteArraySerde >> default.windowed.key.serde.inner = null >> default.windowed.value.serde.inner = null >> max.task.idle.ms = 0 >> max.warmup.replicas = 2 >> metadata.max.age.ms = 300000 >> metric.reporters = [] >> metrics.num.samples = 2 >> metrics.recording.level = INFO >> metrics.sample.window.ms = 30000 >> num.standby.replicas = 1 >> num.stream.threads = 6 >> partition.grouper = class >> org.apache.kafka.streams.processor.DefaultPartitionGrouper >> poll.ms = 100 >> probing.rebalance.interval.ms = 600000 >> processing.guarantee = at_least_once >> receive.buffer.bytes = 52428800 >> reconnect.backoff.max.ms = 1000 >> reconnect.backoff.ms = 50 >> replication.factor = 2 >> request.timeout.ms = 40000 >> retries = 0 >> retry.backoff.ms = 100 >> rocksdb.config.setter = null >> security.protocol = PLAINTEXT >> send.buffer.bytes = 131072 >> state.cleanup.delay.ms = 600000 >> state.dir = /mnt/state >> topology.optimization = none >> upgrade.from = null >> windowstore.changelog.additional.retention.ms = 86400000 >> >> Thanks, >> Navneeth >> >>