Hello,
I’m working on creating an application that leverages Kafka and Kafka Streams.
I have some issues with application startup that I’ve been unable to solve
myself even with the help of my team mates, so I’m writing here in the hopes
that someone could offer help. I’d be very grateful.
Application description:
The application is running in AWS and uses the AWS MSK service for the
operation of the brokers. The application is running in parallel on multiple
nodes, typically we have 3 but it’s meant to scale to tens if needed. The
number of brokers is also currently 3. Kafka version is 2.6.0, both in MSK and
in the Kafka libraries included with the application.
The application is running in US west coast, while the Kafka brokers are in
Europe, so there is some network lag between. (There’s another group of 3
servers running also in Europe, with a different application id configured, so
the servers in a given geographic location have their own consumer groups.)
The application uses a Kafka Streams ReadOnlyKeyValueStore to consume a Kafka
topic, say topic R, which has key-value pairs. The key is a string or other
multibyte value, the value is a serialised structure with a number (Long) and
some other data. The application provides a REST API through which clients can
make requests with some key, and the application returns the number from the
value, which should be the latest number seen for the given key in the topic.
The goal of the API is to respond within milliseconds, e.g. 5 or 10 ms or so.
(This is the reason why the application servers are geographically far away
from the brokers, to provide low latency in that location.)
If the requested key is not local on a given server, the application determines
which server has that key based on the Kafka metadata, and forwards the request
to that server. This part works fine, at least in terms of Kafka use.
The key space is expected to be very large, perhaps tens or hundreds of
millions and maybe more. The application is still in development so we have not
seen that many yet in practice, at most it’s probably some few thousands or
tens of thousands with generated test data.
Problem description:
The reason why I’m writing here is to get help with Kafka/Kafka Streams startup
issues. Sometimes, but much too frequently, when all the servers are restarted
e.g. due to deploying a new version of the application, some of the
applications will not start up cleanly.
At first there was the error with the message “topic may have migrated to
another instance”. This was eventually solved by applying retrying for more
than 10 minutes, after which there was apparently a rebalance and the server in
question was able to synchronise with Kafka and join to the consumer group.
This still happens and having a startup time of over 10 minutes is not
desirable, but at least it’s no longer blocking development.
Now there’s a second startup issue, with an exception
org.apache.kafka.common.errors.DisconnectException being thrown by
org.apache.kafka.clients.FetchSessionHandler with the message “Error sending
fetch request (sessionId=INVALID, epoch=INITIAL) to node 2:”
Before the timeout there’s a restore log message “stream-thread
[query-api-us-west-2-0943f8d4-1720-4b3b-904d-d2efa190a135-StreamThread-1]
Restoration in progress for 20 partitions.” followed by a dump of the 20
partitions. e.g.
“{query-api-us-west-2-query-api-us-west-2-prevalence-ratings-changelog-49:
position=0, end=37713, totalRestored=0}” -- the position and totalRestored are
always 0.
The partitions are for the changelog topic associated with the above mentioned
topic R. There are 60 partitions total in R, so 20 matches the expected count
per server (60/3). I’m assuming the number of partitions in the changelog is
the same as the actual topic.
These log messages repeat every 31 seconds or so.
Kafka Streams state does not reach RUNNING, the application waits for that to
happen before starting to serve requests.
This error can persist even if the application is restarted.
I’ve looked into network issues, but there doesn’t seem to be any At times the
servers run fine, so this seems to be intermittent. Also, it’s possible to use
the command line Kafka tools e.g. kafka-topics.sh to list the topics, so
communication with Kafka brokers can work just fine from the server even while
the application is stuck in the failing state. The issue seems to be somehow
with the application, quite likely with the configuration.
I have tried to increase the configuration value fetch.max.wait.ms from 500
(the default) to 1000 and even to 10000 with no apparent effect.
There does not seem to be any issues with the brokers themselves. There are no
errors in the logs and all metrics are normal as recommended by AWS for the MSK.
Kafka Streams configuration values below, most are defaults:
StreamsConfig values:
acceptable.recovery.lag = 10000
application.id = query-api-us-west-2
application.server = ip-10-200-246-134.us-west-2.compute.internal:8080
bootstrap.servers =
[b-3.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094,
b-1.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094,
b-2.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.key.serde = class
org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class
org.apache.kafka.common.serialization.Serdes$LongSerde
max.task.idle.ms = 0
max.warmup.replicas = 2
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
probing.rebalance.interval.ms = 600000
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = ssl
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /data/query-api/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000
ConsumerConfig values:
allow.auto.create.topics = false
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers =
[b-3.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094,
b-1.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094,
b-2.ew1-pcp-ci-prevalence.vgugfz.c3.kafka.eu-west-1.amazonaws.com:9094]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id =
query-api-us-west-2-0943f8d4-1720-4b3b-904d-d2efa190a135-StreamThread-1-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = query-api-us-west-2
group.instance.id = null
heartbeat.interval.ms = 10000
interceptor.classes = []
internal.leave.group.on.close = false
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy =
[org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = ssl
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 30000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
Best regards,
Mikko Hänninen
--
Mikko Hänninen
Senior Developer, Security Research and Technologies
F-Secure
[email protected]<mailto:[email protected]>
[cid:[email protected]]
www.f-secure.com<https://www.f-secure.com/>
[cid:[email protected]]<https://www.facebook.com/F-Secure-107471754306/>[cid:[email protected]]<https://twitter.com/fsecure>
[cid:[email protected]] <https://www.youtube.com/f-secure>
[cid:[email protected]]
<https://www.linkedin.com/company/f-secure-corporation/>
[cid:[email protected]] <https://blog.f-secure.com/>