Hi, I have been testing behavior of multiple broker instances of kafka in same machine and facing inconsistent behavior of producer sent records to buffer not being available in queue always.
Tried kafka versions: Scenario: 1. Ran two broker instances in same machine. Say broker 1 as initial leader, broker 2 as initial follower. 2. Stopped broker 1. Now broker 2 became leader. 3. Now producer sends records for a given topic TEST through send() method, followed by flush(). Records have to go to Broker 2 logically. No error/exception is thrown by code. (So it is assumed data has been sent successfully to buffer) 4. When using command to check the records count for TEST topic in Broker 2, the sent records are not added to existing records count for that topic in queue. a. Used command - kafka-run-class.bat kafka.tools.GetOffsetShell --broker-list localhost:9094 --topic TEST --time -1 (where TEST is the used topic) NOTE: **Step 4 is not happening always and is inconsistent**. In the scenario when it does not work, if Broker 1 is made UP and then made DOWN, records are always been available in queue in Broker 2 post doing Step 3. Configurations: Overall Producer configurations: (most are default values) acks = all batch.size = 16384 block.on.buffer.full = false bootstrap.servers = <BROKER-1-IP:9091> , <BROKER-2-IP:9091> buffer.memory = 33554432 client.id = producer-1 compression.type = none connections.max.idle.ms = 540000 interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 1 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.fetch.timeout.ms = 60000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS timeout.ms = 30000 value.serializer = class org.apache.kafka.common.serialization.StringSerializer Broker 1: (server.properties) broker.id=1 port=9091 advertised.host.name=<System-IP> auto.create.topics.enable=true default.replication.factor=2 leader.imbalance.check.interval.seconds=20 topic.metadata.refresh.interval.ms=-1 Broker 2: (server1.properties) broker.id=2 port=9094 advertised.host.name=<System-IP> auto.create.topics.enable=true default.replication.factor=2 leader.imbalance.check.interval.seconds=20 topic.metadata.refresh.interval.ms=-1 Do let know if anyone have faced similar scenarios and why such an issue occurs? Let know if any more details are needed. Thanks, Ranjith