We are using kafka version 2.6.1 on broker and 2.6.2 for streams. Thanks
On Wed, Jun 2, 2021 at 7:18 PM Navneeth Krishnan <reachnavnee...@gmail.com> wrote: > Hi All, > > We recently migrated from flink to kafka streams in production and we are > facing a major issue. Any quick help would really be appreciated. > > There are 72 input data topic partitions and 72 control stream topic > partitions. There is a minimum of 12 nodes with 6 streams threads on each > instance and we are using auto scaling based on CPU load. Also we do have > scenarios where the instances go down and it's replaced by a new instance. > > Now the problem that we see is unequal partition allocation among > instances. For example one node has 3 data partitions allocated per stream > thread and the CPU on that node is about 80% whereas there is another node > in which only 4 stream threads have allocations and they are assigned with > one partition each. > > Is there a way to equally distribute the partitions so that there will not > be a problem in processing the incoming data without much lag. In this case > some partitions have very high lag versus some in a few thousands. This is > impacting our production system. > > Streams Configuration: > acceptable.recovery.lag = 10000 > application.id = prod-v1 > application.server = *.*.*.*:80 > bootstrap.servers = [*] > buffered.records.per.partition = 1000 > built.in.metrics.version = latest > cache.max.bytes.buffering = 104857600 > client.id = > commit.interval.ms = 10000 > connections.max.idle.ms = 540000 > default.deserialization.exception.handler = class > org.apache.kafka.streams.errors.LogAndContinueExceptionHandler > default.key.serde = class > org.apache.kafka.common.serialization.Serdes$ByteArraySerde > default.production.exception.handler = class > org.apache.kafka.streams.errors.DefaultProductionExceptionHandler > default.timestamp.extractor = class > org.apache.kafka.streams.processor.WallclockTimestampExtractor > default.value.serde = class > org.apache.kafka.common.serialization.Serdes$ByteArraySerde > default.windowed.key.serde.inner = null > default.windowed.value.serde.inner = null > max.task.idle.ms = 0 > max.warmup.replicas = 2 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > num.standby.replicas = 1 > num.stream.threads = 6 > partition.grouper = class > org.apache.kafka.streams.processor.DefaultPartitionGrouper > poll.ms = 100 > probing.rebalance.interval.ms = 600000 > processing.guarantee = at_least_once > receive.buffer.bytes = 52428800 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > replication.factor = 2 > request.timeout.ms = 40000 > retries = 0 > retry.backoff.ms = 100 > rocksdb.config.setter = null > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > state.cleanup.delay.ms = 600000 > state.dir = /mnt/state > topology.optimization = none > upgrade.from = null > windowstore.changelog.additional.retention.ms = 86400000 > > Thanks, > Navneeth > >