Hi All,

We recently migrated from flink to kafka streams in production and we are
facing a major issue. Any quick help would really be appreciated.

There are 72 input data topic partitions and 72 control stream topic
partitions. There is a minimum of 12 nodes with 6 streams threads on each
instance and we are using auto scaling based on CPU load. Also we do have
scenarios where the instances go down and it's replaced by a new instance.

Now the problem that we see is unequal partition allocation among
instances. For example one node has 3 data partitions allocated per stream
thread and the CPU on that node is about 80% whereas there is another node
in which only 4 stream threads have allocations and they are assigned with
one partition each.

Is there a way to equally distribute the partitions so that there will not
be a problem in processing the incoming data without much lag. In this case
some partitions have very high lag versus some in a few thousands. This is
impacting our production system.

Streams Configuration:
        acceptable.recovery.lag = 10000
        application.id = prod-v1
        application.server = *.*.*.*:80
        bootstrap.servers = [*]
        buffered.records.per.partition = 1000
        built.in.metrics.version = latest
        cache.max.bytes.buffering = 104857600
        client.id =
        commit.interval.ms = 10000
        connections.max.idle.ms = 540000
        default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
        default.key.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
        default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
        default.timestamp.extractor = class
org.apache.kafka.streams.processor.WallclockTimestampExtractor
        default.value.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
        default.windowed.key.serde.inner = null
        default.windowed.value.serde.inner = null
        max.task.idle.ms = 0
        max.warmup.replicas = 2
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        num.standby.replicas = 1
        num.stream.threads = 6
        partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper
        poll.ms = 100
        probing.rebalance.interval.ms = 600000
        processing.guarantee = at_least_once
        receive.buffer.bytes = 52428800
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        replication.factor = 2
        request.timeout.ms = 40000
        retries = 0
        retry.backoff.ms = 100
        rocksdb.config.setter = null
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        state.cleanup.delay.ms = 600000
        state.dir = /mnt/state
        topology.optimization = none
        upgrade.from = null
        windowstore.changelog.additional.retention.ms = 86400000

Thanks,
Navneeth

Reply via email to