Hi,
We are using apache kafka-streams 0.10.2.0 in an application. We are leveraging 
kafka-streams topology for passing the processed data on to the next topic till 
the end of processing.
Also, We use AWS ECS container to deploy Consumer Application. We observed 
consumer is picking up very old messages to process, although they have been 
processed earlier. This issue which happens randomly at the time of service 
scaling up/down or in case of new deployments. I understand at the time of 
consumer rebalancing, some of the messages can be reprocessed. But in this 
case, it is reprocessing large amount of messages which were successfully 
processed long time back (more than 10 days old)
We are not able to understand the root cause of this issue. Is it not 
committing the offsets properly and picking up random messages in different 
topology. This leads to inconsistent behavior of one message being re-processed 
in any of the topology.
Here is the configurations we are using:
    Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
   streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,"UniqueKey");
    streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,key);
    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 
60000));
    streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 6));

Here is the code snippet for Processors:
    final KStreamBuilder builder = new KStreamBuilder();
    builder.addSource(key, Serdes.String().deserializer(), 
executor.getDeserializer(), key);
    builder.addProcessor(key + "_processor", () -> new KafkaProcessor(), key);
    builder.addSink(key + "_sink", key + "_sink", key + "_processor");
    final KafkaStreams streams = new KafkaStreams(builder, 
StreamConfigurations.getStreamsConfgurations(key, kafkaHost));
    streams.start();
    streams.setUncaughtExceptionHandler((t, th) -> {
    _logger.error("UncaughtException in Kafka StreamThread  " + t.getName() + " 
exception = ", th.getMessage());
    });
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

I have looked into some of the kafka re-processing blogs and thinking to try 
some more configurations listed below:
    streamsConfiguration.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
Integer.MAX_VALUE);
    streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); 
//default is 10000
    streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
5000); //default is 30000
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    streamsConfiguration.put(ProducerConfig.ACKS_CONFIG,1);
    streamsConfiguration.put(ProducerConfig.RETRIES_CONFIG,10);


I also found that Kafka-Streams by default set the auto.commit.enable as false 
and don't let you override the value and auto.offset.reset is set to earliest. 
Can this be an issue for inconsistent behavior?


I have found following types of exceptions :

Warning : When kafka-streams sends the response to the next topic:


*       
2017-05-0317:24:52.908|INSTANCEID_IS_UNDEFINED|lampnode.devops.tesco.org|kafka-producer-network-thread
 | MediaDownload-StreamThread-6-producer|WARN |o.a.k.c.NetworkClient|707|Error 
while fetching metadata with correlation id 41 : 
{MediaValidate=LEADER_NOT_AVAILABLE}

Randomly while processing message in Consumer End


*       org.apache.kafka.streams.errors.LockException: task [0_91] Failed to 
lock the state directory: /tmp/kafka-streams/Flare.MediaPreProcess/0_91

*       org.apache.kafka.streams.errors.StreamsException: stream-thread 
[StreamThread-32] failed to suspend stream tasks



Kafka Logs show this exception for different topics:

[2017-05-03 13:13:52,734] ERROR [ReplicaFetcherThread-0-4], Error for partition 
[__consumer_offsets,45] to broker 
4:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. (kafka.server.ReplicaFetcherThread)


Please let me know if you need any kind of details.

Thanks,
Alpa
M:9742880007
This is a confidential email. Tesco may monitor and record all emails. The 
views expressed in this email are those of the sender and not Tesco. Tesco 
Stores Limited Company Number: 519500 Registered in England Registered Office: 
Tesco House, Shire Park, Kestrel Way, Welwyn Garden City, AL7 1GA VAT 
Registration Number: GB 220 4302 31

Reply via email to