We are using kafka version 2.6.1 on broker and 2.6.2 for streams.

Thanks

On Wed, Jun 2, 2021 at 7:18 PM Navneeth Krishnan <reachnavnee...@gmail.com>
wrote:

> Hi All,
>
> We recently migrated from flink to kafka streams in production and we are
> facing a major issue. Any quick help would really be appreciated.
>
> There are 72 input data topic partitions and 72 control stream topic
> partitions. There is a minimum of 12 nodes with 6 streams threads on each
> instance and we are using auto scaling based on CPU load. Also we do have
> scenarios where the instances go down and it's replaced by a new instance.
>
> Now the problem that we see is unequal partition allocation among
> instances. For example one node has 3 data partitions allocated per stream
> thread and the CPU on that node is about 80% whereas there is another node
> in which only 4 stream threads have allocations and they are assigned with
> one partition each.
>
> Is there a way to equally distribute the partitions so that there will not
> be a problem in processing the incoming data without much lag. In this case
> some partitions have very high lag versus some in a few thousands. This is
> impacting our production system.
>
> Streams Configuration:
>         acceptable.recovery.lag = 10000
>         application.id = prod-v1
>         application.server = *.*.*.*:80
>         bootstrap.servers = [*]
>         buffered.records.per.partition = 1000
>         built.in.metrics.version = latest
>         cache.max.bytes.buffering = 104857600
>         client.id =
>         commit.interval.ms = 10000
>         connections.max.idle.ms = 540000
>         default.deserialization.exception.handler = class
> org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
>         default.key.serde = class
> org.apache.kafka.common.serialization.Serdes$ByteArraySerde
>         default.production.exception.handler = class
> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
>         default.timestamp.extractor = class
> org.apache.kafka.streams.processor.WallclockTimestampExtractor
>         default.value.serde = class
> org.apache.kafka.common.serialization.Serdes$ByteArraySerde
>         default.windowed.key.serde.inner = null
>         default.windowed.value.serde.inner = null
>         max.task.idle.ms = 0
>         max.warmup.replicas = 2
>         metadata.max.age.ms = 300000
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 30000
>         num.standby.replicas = 1
>         num.stream.threads = 6
>         partition.grouper = class
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
>         poll.ms = 100
>         probing.rebalance.interval.ms = 600000
>         processing.guarantee = at_least_once
>         receive.buffer.bytes = 52428800
>         reconnect.backoff.max.ms = 1000
>         reconnect.backoff.ms = 50
>         replication.factor = 2
>         request.timeout.ms = 40000
>         retries = 0
>         retry.backoff.ms = 100
>         rocksdb.config.setter = null
>         security.protocol = PLAINTEXT
>         send.buffer.bytes = 131072
>         state.cleanup.delay.ms = 600000
>         state.dir = /mnt/state
>         topology.optimization = none
>         upgrade.from = null
>         windowstore.changelog.additional.retention.ms = 86400000
>
> Thanks,
> Navneeth
>
>

Reply via email to