Hi, We are using apache kafka-streams 0.10.2.0 in an application. We are leveraging kafka-streams topology for passing the processed data on to the next topic till the end of processing. Also, We use AWS ECS container to deploy Consumer Application. We observed consumer is picking up very old messages to process, although they have been processed earlier. This issue which happens randomly at the time of service scaling up/down or in case of new deployments. I understand at the time of consumer rebalancing, some of the messages can be reprocessed. But in this case, it is reprocessing large amount of messages which were successfully processed long time back (more than 10 days old) We are not able to understand the root cause of this issue. Is it not committing the offsets properly and picking up random messages in different topology. This leads to inconsistent behavior of one message being re-processed in any of the topology. Here is the configurations we are using: Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,"UniqueKey"); streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,key); streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 60000)); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 6));
Here is the code snippet for Processors: final KStreamBuilder builder = new KStreamBuilder(); builder.addSource(key, Serdes.String().deserializer(), executor.getDeserializer(), key); builder.addProcessor(key + "_processor", () -> new KafkaProcessor(), key); builder.addSink(key + "_sink", key + "_sink", key + "_processor"); final KafkaStreams streams = new KafkaStreams(builder, StreamConfigurations.getStreamsConfgurations(key, kafkaHost)); streams.start(); streams.setUncaughtExceptionHandler((t, th) -> { _logger.error("UncaughtException in Kafka StreamThread " + t.getName() + " exception = ", th.getMessage()); }); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); I have looked into some of the kafka re-processing blogs and thinking to try some more configurations listed below: streamsConfiguration.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE); streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); //default is 10000 streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000); //default is 30000 streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); streamsConfiguration.put(ProducerConfig.ACKS_CONFIG,1); streamsConfiguration.put(ProducerConfig.RETRIES_CONFIG,10); I also found that Kafka-Streams by default set the auto.commit.enable as false and don't let you override the value and auto.offset.reset is set to earliest. Can this be an issue for inconsistent behavior? I have found following types of exceptions : Warning : When kafka-streams sends the response to the next topic: * 2017-05-0317:24:52.908|INSTANCEID_IS_UNDEFINED|lampnode.devops.tesco.org|kafka-producer-network-thread | MediaDownload-StreamThread-6-producer|WARN |o.a.k.c.NetworkClient|707|Error while fetching metadata with correlation id 41 : {MediaValidate=LEADER_NOT_AVAILABLE} Randomly while processing message in Consumer End * org.apache.kafka.streams.errors.LockException: task [0_91] Failed to lock the state directory: /tmp/kafka-streams/Flare.MediaPreProcess/0_91 * org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-32] failed to suspend stream tasks Kafka Logs show this exception for different topics: [2017-05-03 13:13:52,734] ERROR [ReplicaFetcherThread-0-4], Error for partition [__consumer_offsets,45] to broker 4:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) Please let me know if you need any kind of details. Thanks, Alpa M:9742880007 This is a confidential email. Tesco may monitor and record all emails. The views expressed in this email are those of the sender and not Tesco. Tesco Stores Limited Company Number: 519500 Registered in England Registered Office: Tesco House, Shire Park, Kestrel Way, Welwyn Garden City, AL7 1GA VAT Registration Number: GB 220 4302 31