I'm following up from your other thread as well here. Thanks for the info above, that is helpful.
I think the AWS instance type might be a factor here, but let's do some more homework first. For a next step, we could enable logging for RocksDB so we can observe the performance. Here is some sample code that will allow logging at the INFO level as well as print out statistics (using RocksDB internal stats) every 15 minutes. Would you mind reverting your Streams application to use a persistent store again? Then let it run until you observe the behavior you described before and if you don't mind share the logs with me so we can look them over. Thanks! import org.apache.kafka.streams.state.RocksDBConfigSetter; import org.rocksdb.InfoLogLevel; import org.rocksdb.Options; import java.util.Map; public class RocksDbLogsConfig implements RocksDBConfigSetter { @Override public void setConfig(String storeName, Options options, Map<String, Object> configs) { options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL); options.createStatistics(); options.setStatsDumpPeriodSec(900); options.setDbLogDir("UPDATE WITH PATH WHERE YOU WANT LOG FILES"); } } To use the RocksDbLogsConfig class, you'll need to update your Streams configs like so: props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDbLogsConfig.class); Thanks Bill On Sat, Sep 16, 2017 at 11:22 PM, dev loper <spark...@gmail.com> wrote: > Hi Bill. > > Thank you pointing out, But in actual code I am calling iter.close() in the > finally block if the iterator is not null. I don't see any issues when I am > running it on light traffic. As soon as I switch to production traffic I > start seeing these issues. > > Below I have provided additional details about our current application. If > you are looking for specific logs or details , please let me know. I will > get the details captured. > > In production environment I am receiving 10,000 messages per second. There > are 36 partitions for the topic and there are around 2500 unique entities > per partition for which I have to maintain the state. > > Below I have mentioned the hardware configuration and number of instances > we are using for this solution. Please let me know if hardware is the > limiting factor here. We didn't go for higher configuration since the load > average on these instances were quite low and I could hardly see any CPU > spikes . > > > Kafka Machine Machine Details: - 2 Broker Instances with below > Configuration , (Current CPU Usage 2%- 8%) > > Instance Type : AWS T2 Large > Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS > > Kafka Streams Instance : 3 Kafka Streams Application Instances (Current > CPU Usage 8%- 24%) > > Instance Type : AWS M4 Large > Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated EBS > bandwidth 450 mbps) > > > > On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <b...@confluent.io> wrote: > > > Hi, > > > > It's hard to say exactly without a little more information. > > > > On a side note, I don't see where you are closing the KeyValueIterator in > > the code above. Not closing a KeyValueIterator on a Permanent State Store > > can cause a resource leak over time, so I'd add `iter.close()` right > before > > your `logger.info` call. It might be worth retrying at that point. > > > > Thanks, > > Bill > > > > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <spark...@gmail.com> wrote: > > > > > Hi Kafka Streams Users, > > > > > > I am trying to improve the performance of Kafka Streams State Store > > > Persistent Store. In our application we are using Kafka Streams > Processor > > > API and using Persistent State Store.. My application when starts up > it > > > performing well but over a period of time the performance > deteriorated. I > > > am computing certain results in computeAnalytics method and this method > > is > > > not taking time at all. This method is being called within both process > > and > > > punctuate and I am storing the updated object back to store. Over the > > > period of time its taking huge time for completing the punctuate > process > > > and I could see majority of the time is spent in storing the records > and > > > Iterating the records. The record size is just 2500 per partition. I am > > not > > > where I am going wrong and how can I improve the performance. > > > > > > Below is one such sample log record. > > > > > > INFO | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time > > > Metrics for punctuate for TimeStamp :: 1505564655878 processed Records > > :: > > > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord :: > > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken :: > > 40394 > > > > > > Below I have given my pseudo code for my processor which exactly > > resembles > > > the code which I am using in my application. > > > > > > MyProcessor(){ > > > > > > process(Key objectkey, Update eventupdate){ > > > long timestamp=context.timestamp(); > > > AnalyticeObj storeobj=store.get(objectkey); > > > > > > if( storeobj ===null) > > > { > > > storeobj=new AnalyticeObj(objectkey,eventupdate,timestamp) > > > } > > > else > > > { > > > storeobj.update(eventupdate,timestamp) > > > } > > > storeobj=storeobj.computeAnalytics(); > > > > > > store.put(objectkey,storeobj); > > > context.commit(); > > > } > > > // Every 5 seconds > > > punctuate(long timestamp) > > > { > > > long startTime = System.currentTimeMillis(); > > > long totalTimeTakenToProcessRecords=0; > > > long totalTimeTakenToStoreRecords=0; > > > long counter=0; > > > KeyValueIterator iter=this.visitStore.all(); > > > while (iter.hasNext()) { > > > KeyValue<Key, AnalyticeObj> entry = iter.next(); > > > > > > if(AnalyticeObj.hasExpired(timestamp) > > > store.remove(entry.key) > > > else > > > { > > > long processStartTime=System.currentTimeMillis(); > > > AnalyticeObj storeobj= entry.value.computeAnalytics( > timestamp); > > > > > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords > > > +(System.currentTimeMillis()-processStartTime); > > > > > > long storeStartTime=System.currentTimeMillis(); > > > store.put(entry.key,storeobj); > > > > > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+( > > > System.currentTimeMillis()-storeStartTime); > > > } > > > counter++; > > > } > > > logger.info(" Time Metrics for punctuate " > > > " for TimeStamp :: " + "" + timestamp + " processed > > > Records :: " > > > + counter +" totalTimeTakenToProcessRecords :: > > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord :: > > > "+totalTimeTakenToStoreRecords > > > +"toal time Taken to retrieve Records :: "+ > > > (System.currentTimeMillis() - > > > (startTime+totalTimeTakenToProcessRecords > +totalTimeTakenToStoreRecords) > > )+" > > > Total time Taken :: " + (System.currentTimeMillis() - startTime)); > > > } > > > } > > > > > >