Hello, Since you are running on EBS not SSD, the first suspicion I'd have is its write and storage amplification. This can possibly be verified from RocksDB's own stats, and Bill once shared the code to expose such metrics for investigaion:
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#rocksdb-statistics Guozhang On Thu, Sep 21, 2017 at 9:32 AM, Ian Duffy <i...@ianduffy.ie> wrote: > Have you checked the EBS burst balance on your disks that the streams > application is running on? > > On 21 September 2017 at 04:28, dev loper <spark...@gmail.com> wrote: > > > Hi Bill, > > > > I will repeat my tests with Rocks DB enabled and I will revert to you > with > > details. I might take 1-2 days to get back to you with details since I am > > traveling. But I will try my level best to get it tonight. > > > > On Mon, Sep 18, 2017 at 5:30 PM, Bill Bejeck <b...@confluent.io> wrote: > > > > > I'm following up from your other thread as well here. Thanks for the > > info > > > above, that is helpful. > > > > > > I think the AWS instance type might be a factor here, but let's do some > > > more homework first. > > > > > > For a next step, we could enable logging for RocksDB so we can observe > > the > > > performance. > > > > > > Here is some sample code that will allow logging at the INFO level as > > well > > > as print out statistics (using RocksDB internal stats) every 15 > minutes. > > > > > > Would you mind reverting your Streams application to use a persistent > > store > > > again? > > > > > > Then let it run until you observe the behavior you described before and > > if > > > you don't mind share the logs with me so we can look them over. > Thanks! > > > > > > import org.apache.kafka.streams.state.RocksDBConfigSetter; > > > import org.rocksdb.InfoLogLevel; > > > import org.rocksdb.Options; > > > > > > import java.util.Map; > > > > > > public class RocksDbLogsConfig implements RocksDBConfigSetter { > > > > > > @Override > > > public void setConfig(String storeName, Options options, > Map<String, > > > Object> configs) { > > > options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL); > > > options.createStatistics(); > > > options.setStatsDumpPeriodSec(900); > > > options.setDbLogDir("UPDATE WITH PATH WHERE YOU WANT > > LOG > > > FILES"); > > > } > > > } > > > > > > To use the RocksDbLogsConfig class, you'll need to update your Streams > > > configs like so: > > > > > > props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, > > > RocksDbLogsConfig.class); > > > > > > > > > > > > Thanks > > > Bill > > > > > > On Sat, Sep 16, 2017 at 11:22 PM, dev loper <spark...@gmail.com> > wrote: > > > > > > > Hi Bill. > > > > > > > > Thank you pointing out, But in actual code I am calling iter.close() > in > > > the > > > > finally block if the iterator is not null. I don't see any issues > when > > I > > > am > > > > running it on light traffic. As soon as I switch to production > traffic > > I > > > > start seeing these issues. > > > > > > > > Below I have provided additional details about our current > application. > > > If > > > > you are looking for specific logs or details , please let me know. I > > will > > > > get the details captured. > > > > > > > > In production environment I am receiving 10,000 messages per second. > > > There > > > > are 36 partitions for the topic and there are around 2500 unique > > > entities > > > > per partition for which I have to maintain the state. > > > > > > > > Below I have mentioned the hardware configuration and number of > > instances > > > > we are using for this solution. Please let me know if hardware is the > > > > limiting factor here. We didn't go for higher configuration since the > > > load > > > > average on these instances were quite low and I could hardly see any > > CPU > > > > spikes . > > > > > > > > > > > > Kafka Machine Machine Details: - 2 Broker Instances with below > > > > Configuration , (Current CPU Usage 2%- 8%) > > > > > > > > Instance Type : AWS T2 Large > > > > Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS > > > > > > > > Kafka Streams Instance : 3 Kafka Streams Application Instances > > (Current > > > > CPU Usage 8%- 24%) > > > > > > > > Instance Type : AWS M4 Large > > > > Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS > (Dedicated > > > EBS > > > > bandwidth 450 mbps) > > > > > > > > > > > > > > > > On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <b...@confluent.io> > > wrote: > > > > > > > > > Hi, > > > > > > > > > > It's hard to say exactly without a little more information. > > > > > > > > > > On a side note, I don't see where you are closing the > > KeyValueIterator > > > in > > > > > the code above. Not closing a KeyValueIterator on a Permanent State > > > Store > > > > > can cause a resource leak over time, so I'd add `iter.close()` > right > > > > before > > > > > your `logger.info` call. It might be worth retrying at that > point. > > > > > > > > > > Thanks, > > > > > Bill > > > > > > > > > > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <spark...@gmail.com> > > wrote: > > > > > > > > > > > Hi Kafka Streams Users, > > > > > > > > > > > > I am trying to improve the performance of Kafka Streams State > Store > > > > > > Persistent Store. In our application we are using Kafka Streams > > > > Processor > > > > > > API and using Persistent State Store.. My application when > starts > > up > > > > it > > > > > > performing well but over a period of time the performance > > > > deteriorated. I > > > > > > am computing certain results in computeAnalytics method and this > > > method > > > > > is > > > > > > not taking time at all. This method is being called within both > > > process > > > > > and > > > > > > punctuate and I am storing the updated object back to store. Over > > the > > > > > > period of time its taking huge time for completing the punctuate > > > > process > > > > > > and I could see majority of the time is spent in storing the > > records > > > > and > > > > > > Iterating the records. The record size is just 2500 per > partition. > > I > > > am > > > > > not > > > > > > where I am going wrong and how can I improve the performance. > > > > > > > > > > > > Below is one such sample log record. > > > > > > > > > > > > INFO | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) > - > > > Time > > > > > > Metrics for punctuate for TimeStamp :: 1505564655878 processed > > > Records > > > > > :: > > > > > > 2109 totalTimeTakenToProcessRecords :: 2 > > totalTimeTakenToStoreRecord > > > :: > > > > > > 27605toal time Taken to retrieve Records :: 12787 Total time > Taken > > :: > > > > > 40394 > > > > > > > > > > > > Below I have given my pseudo code for my processor which exactly > > > > > resembles > > > > > > the code which I am using in my application. > > > > > > > > > > > > MyProcessor(){ > > > > > > > > > > > > process(Key objectkey, Update eventupdate){ > > > > > > long timestamp=context.timestamp(); > > > > > > AnalyticeObj storeobj=store.get(objectkey); > > > > > > > > > > > > if( storeobj ===null) > > > > > > { > > > > > > storeobj=new AnalyticeObj(objectkey, > > > eventupdate,timestamp) > > > > > > } > > > > > > else > > > > > > { > > > > > > storeobj.update(eventupdate,timestamp) > > > > > > } > > > > > > storeobj=storeobj.computeAnalytics(); > > > > > > > > > > > > store.put(objectkey,storeobj); > > > > > > context.commit(); > > > > > > } > > > > > > // Every 5 seconds > > > > > > punctuate(long timestamp) > > > > > > { > > > > > > long startTime = System.currentTimeMillis(); > > > > > > long totalTimeTakenToProcessRecords=0; > > > > > > long totalTimeTakenToStoreRecords=0; > > > > > > long counter=0; > > > > > > KeyValueIterator iter=this.visitStore.all(); > > > > > > while (iter.hasNext()) { > > > > > > KeyValue<Key, AnalyticeObj> entry = iter.next(); > > > > > > > > > > > > if(AnalyticeObj.hasExpired(timestamp) > > > > > > store.remove(entry.key) > > > > > > else > > > > > > { > > > > > > long processStartTime=System.currentTimeMillis(); > > > > > > AnalyticeObj storeobj= entry.value.computeAnalytics( > > > > timestamp); > > > > > > > > > > > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords > > > > > > +(System.currentTimeMillis()-processStartTime); > > > > > > > > > > > > long storeStartTime=System.currentTimeMillis(); > > > > > > store.put(entry.key,storeobj); > > > > > > > > > > > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+( > > > > > > System.currentTimeMillis()-storeStartTime); > > > > > > } > > > > > > counter++; > > > > > > } > > > > > > logger.info(" Time Metrics for punctuate " > > > > > > " for TimeStamp :: " + "" + timestamp + " > > > processed > > > > > > Records :: " > > > > > > + counter +" totalTimeTakenToProcessRecords > :: > > > > > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord > :: > > > > > > "+totalTimeTakenToStoreRecords > > > > > > +"toal time Taken to retrieve Records :: "+ > > > > > > (System.currentTimeMillis() - > > > > > > (startTime+totalTimeTakenToProcessRecords > > > > +totalTimeTakenToStoreRecords) > > > > > )+" > > > > > > Total time Taken :: " + (System.currentTimeMillis() - > startTime)); > > > > > > } > > > > > > } > > > > > > > > > > > > > > > > > > > > > -- -- Guozhang