Hello,


I’m working on creating an application that leverages Kafka and Kafka Streams. 
I have some issues with application startup that I’ve been unable to solve 
myself even with the help of my team mates, so I’m writing here in the hopes 
that someone could offer help. I’d be very grateful.





Application description:



The application is running in AWS and uses the AWS MSK service for the 
operation of the brokers. The application is running in parallel on multiple 
nodes, typically we have 3 but it’s meant to scale to tens if needed. The 
number of brokers is also currently 3. Kafka version is 2.6.0, both in MSK and 
in the Kafka libraries included with the application.



The application is running in US west coast, while the Kafka brokers are in 
Europe, so there is some network lag between. (There’s another group of 3 
servers running also in Europe, with a different application id configured, so 
the servers in a given geographic location have their own consumer groups.)



The application uses a Kafka Streams ReadOnlyKeyValueStore to consume a Kafka 
topic, say topic R, which has key-value pairs. The key is a string or other 
multibyte value, the value is a serialised structure with a number (Long) and 
some other data. The application provides a REST API through which clients can 
make requests with some key, and the application returns the number from the 
value, which should be the latest number seen for the given key in the topic. 
The goal of the API is to respond within milliseconds, e.g. 5 or 10 ms or so. 
(This is the reason why the application servers are geographically far away 
from the brokers, to provide low latency in that location.)



If the requested key is not local on a given server, the application determines 
which server has that key based on the Kafka metadata, and forwards the request 
to that server. This part works fine, at least in terms of Kafka use.



The key space is expected to be very large, perhaps tens or hundreds of 
millions and maybe more. The application is still in development so we have not 
seen that many yet in practice, at most it’s probably some few thousands or 
tens of thousands with generated test data.





Problem description:



The reason why I’m writing here is to get help with Kafka/Kafka Streams startup 
issues. Sometimes, but much too frequently, when all the servers are restarted 
e.g. due to deploying a new version of the application, some of the 
applications will not start up cleanly.



At first there was the error with the message “topic may have migrated to 
another instance”. This was eventually solved by applying retrying for more 
than 10 minutes, after which there was apparently a rebalance and the server in 
question was able to synchronise with Kafka and join to the consumer group. 
This still happens and having a startup time of over 10 minutes is not 
desirable, but at least it’s no longer blocking development.



Now there’s a second startup issue, with an exception 
org.apache.kafka.common.errors.DisconnectException being thrown by 
org.apache.kafka.clients.FetchSessionHandler with the message “Error sending 
fetch request (sessionId=INVALID, epoch=INITIAL) to node 2:”



Before the timeout there’s a restore log message “stream-thread 
[query-api-us-west-2-0943f8d4-1720-4b3b-904d-d2efa190a135-StreamThread-1] 
Restoration in progress for 20 partitions.” followed by a dump of the 20 
partitions. e.g. 
“{query-api-us-west-2-query-api-us-west-2-prevalence-ratings-changelog-49: 
position=0, end=37713, totalRestored=0}” -- the position and totalRestored are 
always 0.

The partitions are for the changelog topic associated with the above mentioned 
topic R. There are 60 partitions total in R, so 20 matches the expected count 
per server (60/3). I’m assuming the number of partitions in the changelog is 
the same as the actual topic.



These log messages repeat every 31 seconds or so.



Kafka Streams state does not reach RUNNING, the application waits for that to 
happen before starting to serve requests.



This error can persist even if the application is restarted.



I’ve looked into network issues, but there doesn’t seem to be any At times the 
servers run fine, so this seems to be intermittent. Also, it’s possible to use 
the command line Kafka tools e.g. kafka-topics.sh to list the topics, so 
communication with Kafka brokers can work just fine from the server even while 
the application is stuck in the failing state. The issue seems to be somehow 
with the application, quite likely with the configuration.



I have tried to increase the configuration value fetch.max.wait.ms from 500 
(the default) to 1000 and even to 10000 with no apparent effect.





There does not seem to be any issues with the brokers themselves. There are no 
errors in the logs and all metrics are normal as recommended by AWS for the MSK.







Kafka Streams configuration values below, most are defaults:



StreamsConfig values:



acceptable.recovery.lag = 10000

application.id = query-api-us-west-2

application.server = ip-10-200-246-134.us-west-2.compute.internal:8080

bootstrap.servers = 
[b-3.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094, 
b-1.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094, 
b-2.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094]

buffered.records.per.partition = 1000

built.in.metrics.version = latest

cache.max.bytes.buffering = 10485760

client.id =

commit.interval.ms = 30000

connections.max.idle.ms = 540000

default.deserialization.exception.handler = class 
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler

default.key.serde = class 
org.apache.kafka.common.serialization.Serdes$StringSerde

default.production.exception.handler = class 
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler

default.timestamp.extractor = class 
org.apache.kafka.streams.processor.FailOnInvalidTimestamp

default.value.serde = class 
org.apache.kafka.common.serialization.Serdes$LongSerde

max.task.idle.ms = 0

max.warmup.replicas = 2

metadata.max.age.ms = 300000

metric.reporters = []

metrics.num.samples = 2

metrics.recording.level = INFO

metrics.sample.window.ms = 30000

num.standby.replicas = 0

num.stream.threads = 1

partition.grouper = class 
org.apache.kafka.streams.processor.DefaultPartitionGrouper

poll.ms = 100

probing.rebalance.interval.ms = 600000

processing.guarantee = at_least_once

receive.buffer.bytes = 32768

reconnect.backoff.max.ms = 1000

reconnect.backoff.ms = 50

replication.factor = 1

request.timeout.ms = 40000

retries = 0

retry.backoff.ms = 100

rocksdb.config.setter = null

security.protocol = ssl

send.buffer.bytes = 131072

state.cleanup.delay.ms = 600000

state.dir = /data/query-api/kafka-streams

topology.optimization = none

upgrade.from = null

windowstore.changelog.additional.retention.ms = 86400000





ConsumerConfig values:



allow.auto.create.topics = false

auto.commit.interval.ms = 5000

auto.offset.reset = earliest

bootstrap.servers = 
[b-3.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094, 
b-1.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094, 
b-2.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094]

check.crcs = true

client.dns.lookup = use_all_dns_ips

client.id = 
query-api-us-west-2-0943f8d4-1720-4b3b-904d-d2efa190a135-StreamThread-1-consumer

client.rack =

connections.max.idle.ms = 540000

default.api.timeout.ms = 60000

enable.auto.commit = false

exclude.internal.topics = true

fetch.max.bytes = 52428800

fetch.max.wait.ms = 500

fetch.min.bytes = 1

group.id = query-api-us-west-2

group.instance.id = null

heartbeat.interval.ms = 10000

interceptor.classes = []

internal.leave.group.on.close = false

internal.throw.on.fetch.stable.offset.unsupported = false

isolation.level = read_uncommitted

key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

max.partition.fetch.bytes = 1048576

max.poll.interval.ms = 300000

max.poll.records = 1000

metadata.max.age.ms = 300000

metric.reporters = []

metrics.num.samples = 2

metrics.recording.level = INFO

metrics.sample.window.ms = 30000

partition.assignment.strategy = 
[org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]

receive.buffer.bytes = 65536

reconnect.backoff.max.ms = 1000

reconnect.backoff.ms = 50

request.timeout.ms = 30000

retry.backoff.ms = 100

sasl.client.callback.handler.class = null

sasl.jaas.config = null

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.min.time.before.relogin = 60000

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

sasl.kerberos.ticket.renew.window.factor = 0.8

sasl.login.callback.handler.class = null

sasl.login.class = null

sasl.login.refresh.buffer.seconds = 300

sasl.login.refresh.min.period.seconds = 60

sasl.login.refresh.window.factor = 0.8

sasl.login.refresh.window.jitter = 0.05

sasl.mechanism = GSSAPI

security.protocol = ssl

security.providers = null

send.buffer.bytes = 131072

session.timeout.ms = 30000

ssl.cipher.suites = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.3]

ssl.endpoint.identification.algorithm = https

ssl.engine.factory.class = null

ssl.key.password = null

ssl.keymanager.algorithm = SunX509

ssl.keystore.location = null

ssl.keystore.password = null

ssl.keystore.type = JKS

ssl.protocol = TLSv1.3

ssl.provider = null

ssl.secure.random.implementation = null

ssl.trustmanager.algorithm = PKIX

ssl.truststore.location = null

ssl.truststore.password = null

ssl.truststore.type = JKS

value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer





Best regards,

Mikko Hänninen
--
Mikko Hänninen
Senior Developer, Security Research and Technologies
F-Secure
mikko.hanni...@f-secure.com<mailto:mikko.hanni...@f-secure.com>

[cid:image001.png@01D7265A.6559B910]

www.f-secure.com<https://www.f-secure.com/>

[cid:image002.png@01D7265A.6559B910]<https://www.facebook.com/F-Secure-107471754306/>[cid:image003.png@01D7265A.6559B910]<https://twitter.com/fsecure>
 [cid:image004.png@01D7265A.6559B910] <https://www.youtube.com/f-secure>  
[cid:image005.png@01D7265A.6559B910] 
<https://www.linkedin.com/company/f-secure-corporation/>  
[cid:image006.png@01D7265A.6559B910] <https://blog.f-secure.com/>

Reply via email to