Hi All, We recently migrated from flink to kafka streams in production and we are facing a major issue. Any quick help would really be appreciated.
There are 72 input data topic partitions and 72 control stream topic partitions. There is a minimum of 12 nodes with 6 streams threads on each instance and we are using auto scaling based on CPU load. Also we do have scenarios where the instances go down and it's replaced by a new instance. Now the problem that we see is unequal partition allocation among instances. For example one node has 3 data partitions allocated per stream thread and the CPU on that node is about 80% whereas there is another node in which only 4 stream threads have allocations and they are assigned with one partition each. Is there a way to equally distribute the partitions so that there will not be a problem in processing the incoming data without much lag. In this case some partitions have very high lag versus some in a few thousands. This is impacting our production system. Streams Configuration: acceptable.recovery.lag = 10000 application.id = prod-v1 application.server = *.*.*.*:80 bootstrap.servers = [*] buffered.records.per.partition = 1000 built.in.metrics.version = latest cache.max.bytes.buffering = 104857600 client.id = commit.interval.ms = 10000 connections.max.idle.ms = 540000 default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndContinueExceptionHandler default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler default.timestamp.extractor = class org.apache.kafka.streams.processor.WallclockTimestampExtractor default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde default.windowed.key.serde.inner = null default.windowed.value.serde.inner = null max.task.idle.ms = 0 max.warmup.replicas = 2 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 num.standby.replicas = 1 num.stream.threads = 6 partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper poll.ms = 100 probing.rebalance.interval.ms = 600000 processing.guarantee = at_least_once receive.buffer.bytes = 52428800 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 replication.factor = 2 request.timeout.ms = 40000 retries = 0 retry.backoff.ms = 100 rocksdb.config.setter = null security.protocol = PLAINTEXT send.buffer.bytes = 131072 state.cleanup.delay.ms = 600000 state.dir = /mnt/state topology.optimization = none upgrade.from = null windowstore.changelog.additional.retention.ms = 86400000 Thanks, Navneeth