Hi,

Thank you for replying.

We are using Java as the client library.
This is the version:

$ java -version
openjdk version "14.0.1" 2020-04-14
OpenJDK Runtime Environment (build 14.0.1+7)
OpenJDK 64-Bit Server VM (build 14.0.1+7, mixed mode, sharing)

Is there any article where I can find out whether this Java version is KIP
392 compatible?

For reference, this is the label that we have in our consumer pod:

client.rack=us-east-1a

So we are assuming that by having this label in the pod, this consumer pod
will fetch data from the broker present in us-east-1a AZ. It would be
helpful to know if we have to set this somewhere inside the pod as an
environment variable as well.

Thanks,


On Wed, Oct 30, 2024 at 6:47 PM Ömer Şiar Baysal <osiarbay...@gmail.com>
wrote:

> Hi,
>
> The log shown on the blog post was for the console consumer tool.  These
> log messages emitted for the consumer process so you need to configure
> log4j configuration for the kafka tools if you want to achieve the same
> thing.
>
> The functionality provided by the KIP-392 is only realized only if client
> implementation supports it. So the question is which library are you using
> for the consumers?
>
> Regards,
> OSB
>
> On Wed, Oct 30, 2024, 13:51 Soham Chakraborty <dec.so...@gmail.com> wrote:
>
> > Hi,
> >
> > We are trying to implement the feature from KIP 392 to allow consumers to
> > fetch from the closest replica. We are using AWS and Kubernetes (kOps)
> and
> > the consumers do not always fetch from the broker in the same AZ,
> resulting
> > in cross AZ traffic cost. The entire purpose of this mail/exercise is to
> > solve that.
> >
> > There is a very nice documentation from AWS which tells what and how to
> do:
> >
> >
> https://aws.amazon.com/blogs/big-data/reduce-network-traffic-costs-of-your-amazon-msk-consumers-with-rack-awareness/
> >
> > We have followed all these steps:
> >
> > * added the two configuration parameters on kafka brokers (EC2 instances)
> > * added a label `client.rack=<AZ_WHERE_THE_K8S_POD_IS_RUNNING>` to the
> pod
> > with an admission controller (kyverno - which mutates the pods during
> > placement to nodes)
> >
> > After doing these, we expected the traffic cost to go down but AWS cost
> > explorer thinks otherwise, which is why we are thinking that something
> > might be wrong somewhere. To find out we are resorting to good ol logs
> from
> > kafka brokers. And that is the reason for this mail, we haven't been able
> > to come up with a good logging configuration that shows us from where the
> > consumers are fetching data. We are expecting to see something like what
> > the blog article from AWS is saying:
> >
> > +++++
> > [2022-04-27 18:04:18,200] DEBUG [Consumer
> > clientId=consumer-console-consumer-99846-1,
> groupId=console-consumer-99846]
> > Added READ_UNCOMMITTED fetch request for partition order-0 at position
> > FetchPosition{offset=39, offsetEpoch=Optional[0],
> > currentLeader=LeaderAndEpoch{leader=Optional[
> > b-1.mskcluster-msk.jcojml.c23.kafka.us-east-1.amazonaws.com:9092 (id: 1
> > rack: use1-az2)], epoch=0}} to node
> > b-3.mskcluster-msk.jcojml.c23.kafka.us-east-1.amazonaws.com:9092 (id: 3
> > rack: use1-az1) (org.apache.kafka.clients.consumer.internals.Fetcher)
> > +++++
> >
> > We tried using the configuration provided in the article but that also
> > didn't help out.
> >
> > This is our configuration:
> >
> > +++++
> > # ./kafka-topics.sh --version
> > 2.8.0 (Commit:ebb1d6e21cc92130)
> >
> > # grep -v "^#" ../config/server.properties | grep .
> > broker.id=11
> > broker.rack=us-east-1a
> > num.network.threads=64
> > num.io.threads=8
> > socket.send.buffer.bytes=102400
> > socket.receive.buffer.bytes=102400
> > socket.request.max.bytes=104857600
> > message.max.bytes=10485760
> > log.dirs=/mnt/data/kafka
> > num.partitions=1
> > num.recovery.threads.per.data.dir=4
> > offsets.topic.replication.factor=2
> > transaction.state.log.replication.factor=2
> > transaction.state.log.min.isr=1
> > delete.topic.enable=true
> > auto.create.topics.enable=false
> > log.retention.hours=24
> > log.segment.bytes=1073741824
> > log.retention.check.interval.ms=300000
> > zookeeper.connect=zk11.foo.bar.com,zk12.foo.bar.com,zk13.foo.bar.com,
> > zk14.foo.bar.com,zk15.foo.bar.com
> > zookeeper.connection.timeout.ms=6000
> > group.initial.rebalance.delay.ms=5000
> > replica.selector.class =
> > org.apache.kafka.common.replica.RackAwareReplicaSelector
> >
> > # cat ../config/log4j.properties
> > log4j.rootLogger=INFO,ROLLINGFILE
> >
> > log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
> > log4j.appender.CONSOLE.Threshold=WARN
> > log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
> > log4j.appender.CONSOLE.layout.ConversionPattern=%c{35}:%L - %m%n
> >
> > log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
> > log4j.appender.ROLLINGFILE.Threshold=INFO
> > log4j.appender.ROLLINGFILE.File=/var/log/kafka/server.log
> > log4j.appender.ROLLINGFILE.MaxFileSize=200MB
> > log4j.appender.ROLLINGFILE.MaxBackupIndex=10
> > log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
> > log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d %p [%t] %c{35}:%L
> -
> > %m%n
> >
> > log4j.logger.org.apache.kafka.clients.consumer.internals.Fetcher=DEBUG
> > +++++
> >
> > Can anyone please tell how we should customize the logging properties to
> > see messages shown above.
> >
> > Thanks,
> >
>

Reply via email to