Hi Petter, I'd recommend turning off caching by setting p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING, 0);
2.3.0 also has some known performance issues that will be fixed in 2.3.1, but they shouldn't be noticeable if you turn caching off and aren't reading/writing to topics with a very high partition count. These are fixed in 2.3.1 which should be released soon for you to upgrade, but the caching is likely the main reason for the latency you see. I'd also note that Streams, and Kafka in general, is typically tuned for high throughput rather than low latency, so I wouldn't be too concerned about a large latency unless that is a specific requirement. Cheers, Sophie On Wed, Oct 9, 2019 at 6:05 AM Petter Arvidsson <petter.arvids...@relayr.io> wrote: > Hi, > > I have a fairly simple kafka streams application that read messages from > two topics. The problem I am facing is that the delay between sending > events to the streams application and it producing results is very high (as > in several minutes). My question is: how can I make this latency smaller? > > The streams is doing the following: > ktable1 = topic1 > -> (filter out messages using flatMap) > -> groupBy (with new key, adds internal rekeying topic) > -> aggregate (in memory store backed by internal compacted topic) > > ktabe2 = topic2 > -> (rekey to same key as ktable1 over internal topic) > -> join (with ktable1) > -> aggregate (in memory store backed by internal compacted topic) > > ktable2.toStream.to(topic2) > > Ktable1 keep configuration that allows messages to pass through and be > aggregated into ktable2. Ktable2 keeps aggregates based on messages on > topic2. Ktable2.toStream is then used to put the aggregated messages back > out on topic2. The "problem" (or misunderstanding as to how kafka stream is > processing messages) is that the delay from sending a message on topic1 to > the point where messages received on topic2 are passing the join is several > minutes. With the settings I have (see below) on a not that heavily loaded > system, I would assume the latency would be a couple of seconds (based on > the COMMIT_INTERVAL_MS_CONFIG). > > I use the following settings (as well as settings for bootstrap servers, > application id and so forth): > p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000) > p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") > p.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > > The store used for the KTables is the one returned by > "Stores.inMemoryKeyValueStore()". > > Kafka libraries use version "2.3.0" and the "kafka-streams-scala" scaladsl > is used to build the streams. The broker is using version "1.1.0". > > Best regards, > Petter >