Hard to say. Couple of things you could try: upgrade to 0.10.2.1 (got released last week) -- it contains a couple of bug fixed with regard to rebalancing and state store locks.
Also, when you application "jumps back", is this somewhere in the middle of your input topic or is it "earliest" -- if it is earliest, it might indicate that auto.offset.reset policy triggered, ie, no valid commit offset was found. Also, can you check if you applications is committing regularly? Per default, this should happen in 30 sec intervals. You can use bin/kafka-consumer-group.sh command to check committed offsets. Hope this helps. -Matthias On 5/3/17 11:07 AM, Khatri, Alpa wrote: > Hi, > We are using apache kafka-streams 0.10.2.0 in an application. We are > leveraging kafka-streams topology for passing the processed data on to the > next topic till the end of processing. > Also, We use AWS ECS container to deploy Consumer Application. We observed > consumer is picking up very old messages to process, although they have been > processed earlier. This issue which happens randomly at the time of service > scaling up/down or in case of new deployments. I understand at the time of > consumer rebalancing, some of the messages can be reprocessed. But in this > case, it is reprocessing large amount of messages which were successfully > processed long time back (more than 10 days old) > We are not able to understand the root cause of this issue. Is it not > committing the offsets properly and picking up random messages in different > topology. This leads to inconsistent behavior of one message being > re-processed in any of the topology. > Here is the configurations we are using: > Properties streamsConfiguration = new Properties(); > streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > kafkaHost); > streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,"UniqueKey"); > streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,key); > streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > streamsConfiguration.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, > 60000)); > streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 6)); > > Here is the code snippet for Processors: > final KStreamBuilder builder = new KStreamBuilder(); > builder.addSource(key, Serdes.String().deserializer(), > executor.getDeserializer(), key); > builder.addProcessor(key + "_processor", () -> new KafkaProcessor(), key); > builder.addSink(key + "_sink", key + "_sink", key + "_processor"); > final KafkaStreams streams = new KafkaStreams(builder, > StreamConfigurations.getStreamsConfgurations(key, kafkaHost)); > streams.start(); > streams.setUncaughtExceptionHandler((t, th) -> { > _logger.error("UncaughtException in Kafka StreamThread " + t.getName() + > " exception = ", th.getMessage()); > }); > Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); > > I have looked into some of the kafka re-processing blogs and thinking to try > some more configurations listed below: > streamsConfiguration.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, > Integer.MAX_VALUE); > streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, > 20000); //default is 10000 > streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, > 5000); //default is 30000 > streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "latest"); > streamsConfiguration.put(ProducerConfig.ACKS_CONFIG,1); > streamsConfiguration.put(ProducerConfig.RETRIES_CONFIG,10); > > > I also found that Kafka-Streams by default set the auto.commit.enable as > false and don't let you override the value and auto.offset.reset is set to > earliest. Can this be an issue for inconsistent behavior? > > > I have found following types of exceptions : > > Warning : When kafka-streams sends the response to the next topic: > > > * > 2017-05-0317:24:52.908|INSTANCEID_IS_UNDEFINED|lampnode.devops.tesco.org|kafka-producer-network-thread > | MediaDownload-StreamThread-6-producer|WARN > |o.a.k.c.NetworkClient|707|Error while fetching metadata with correlation id > 41 : {MediaValidate=LEADER_NOT_AVAILABLE} > > Randomly while processing message in Consumer End > > > * org.apache.kafka.streams.errors.LockException: task [0_91] Failed to > lock the state directory: /tmp/kafka-streams/Flare.MediaPreProcess/0_91 > > * org.apache.kafka.streams.errors.StreamsException: stream-thread > [StreamThread-32] failed to suspend stream tasks > > > > Kafka Logs show this exception for different topics: > > [2017-05-03 13:13:52,734] ERROR [ReplicaFetcherThread-0-4], Error for > partition [__consumer_offsets,45] to broker > 4:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > > > Please let me know if you need any kind of details. > > Thanks, > Alpa > M:9742880007 > This is a confidential email. Tesco may monitor and record all emails. The > views expressed in this email are those of the sender and not Tesco. Tesco > Stores Limited Company Number: 519500 Registered in England Registered > Office: Tesco House, Shire Park, Kestrel Way, Welwyn Garden City, AL7 1GA VAT > Registration Number: GB 220 4302 31 >
signature.asc
Description: OpenPGP digital signature