I'm less familiar with that part of the code but that sounds correct to me.
You're default request timeout is 300 seconds though, is that right? Seems
like it should be large enough in most scenarios. Did you see any network
outages around that time?

On Wed, Oct 16, 2019 at 10:30 AM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:

> Hi Sophie,
>
> A follow up questions, I set the cache to 0 and after 12 hours
> running, some nodes turn dead with error messages: task [1_0] Abort
> sending since an error caught with a previous record (key
> 333283323209294164cf16bb333c14a3506001b8fec3XXX3\x00\x00\x01m\xD5\x155\x80
> value [B@33574f49 timestamp null) to topic
> TEST-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog due to
> org.apache.kafka.common.errors.TimeoutException: Expiring 28 record(s)
> for TEST-KTABLE-SUPPRESS-STATE-STORE-0000000009-changelog-0:300100 ms
> has passed since batch creation
>
> My timeout related setting:
> request.timeout.ms = 300000
> poll.ms = 100
> retry.backoff.ms = 100
> linger.ms = 100
>
> Once the node has above exception, it will turn to DEAD state and
> can't recover. Do you know what might cause this timeout issue? Does
> it mean it takes too long to send the messages to this changelog
> topic?
>
> Thanks
> Kathy
>
> On Mon, Oct 14, 2019 at 10:54 PM Sophie Blee-Goldman
> <sop...@confluent.io> wrote:
> >
> > Glad that helped!
> >
> > Honestly I can't say I recognize either that exception but I'm fairly
> > confident it's
> > not directly related to rocksdb or Streams. It sounds like a connection
> > somewhere
> > got screwed up, which would be more of a configuration issue I probably
> > can't help with.
> >
> > Of course it's always possible rocksdb is doing something weird that
> we're
> > unaware of.
> > If you revert the changes you just made and don't see these issues, I'd
> say
> > try continuing
> > to use your own rocksdb config setter and/or reset the app (including
> > clearing local state).
> >
> > Cheers,
> > Sophie
> >
> > On Mon, Oct 14, 2019 at 4:35 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:
> >
> > > Hi Sophie,
> > >
> > > Thanks for the information! After setting cache.max.bytes.buffering to
> > > zero, commenting out my customized rocksDB config and using default
> > > rocksDB config, I do see the repartition topic bytesout goes up. But I
> > > noticed that some nodes have IO exception as: An I/O error has
> > > occurred while writing a response message entity to the container
> > > output
> > > stream.(org.glassfish.jersey.server.internal.process.MappableException:
> > > org.apache.catalina.connector.ClientAbortException:
> > > java.io.IOException: Broken pipe). Is is also related to the rocksDB
> > > read and write? Anything I should do to get rid of this exception?
> > >
> > > Thanks a lot!
> > >
> > > On Mon, Oct 14, 2019 at 6:10 PM Sophie Blee-Goldman <
> sop...@confluent.io>
> > > wrote:
> > > >
> > > > Ah ok, 2.3.0 has a known performance issue in the caching layer which
> > > tends
> > > > to get worse the larger the cache size. That might explain what
> you're
> > > > seeing with
> > > > regards to the traffic correlation.
> > > >
> > > > It's fixed in 2.3.1 which should be released very soon, but until
> then
> > > you
> > > > might want
> > > > to try turning off the Streams cache (by setting
> > > cache.max.bytes.buffering
> > > > to zero,
> > > > although you can also use .withCachingDisabled to turn it off only
> for
> > > the
> > > > large/problem
> > > > store)
> > > >
> > > > Of course any memory you reclaim by turning off the Streams cache can
> > > just
> > > > go towards the
> > > > rocksdb cache instead, just note that the rocksdb cache comes from
> > > off-heap
> > > > memory
> > > > while the Streams cache would be taken from heap memory allocated to
> the
> > > > jvm.
> > > >
> > > > On Mon, Oct 14, 2019 at 2:48 PM Xiyuan Hu <xiyuan.h...@gmail.com>
> wrote:
> > > >
> > > > > Hi Sophie,
> > > > >
> > > > > Thanks for the help!
> > > > > I'm using version 2.3.0.
> > > > >
> > > > > The repartition topic with huge lag is the created during the first
> > > > > reduce method, named "XX-KSTREAM-STATE-STORE-0030-repartition". All
> > > > > other internal topics have almost zero lags. For my case, how
> could I
> > > > > find out if rocksDB causes the lags? One thing I noticed is, when
> the
> > > > > source traffic is about 30K/sec, I don't have any lags for the
> entire
> > > > > system but when the traffic goes up to 100K/sec, it has a huge
> lag. As
> > > > > you mentioned, if the memory usage is high, should I set any
> rocksDB
> > > > > memory related config to higher value? Thanks a lot!
> > > > > My topology is like below:
> > > > >
> > > > > final KStream<String, byte[]> source = builder.stream(inputTopic);
> > > > > KStream<String, Event> deserializedStream = source.mapValues( ...
> });
> > > > >
> > > > > KStream<Windowed<String>, Event> dedupedStream =
> > > > > deserializedStream.selectKey( ... )
> > > > > .groupByKey(Grouped.with(Serdes.String(), new
> > > JsonSerde<>(Event.class)))
> > > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > > > > .reduce((value1, value2) -> value2)
> > > > > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > .toStream();
> > > > >
> > > > > dedupedStream.selectKey( ... )
> > > > > .mapValues( ... )
> > > > > .filter(...)
> > > > > .groupByKey(Grouped.with(Serdes.String(), new MessagetSerde()))
> > > > > .reduce((value1, value2) -> {
> > > > >     long count1 = value1.getCount();
> > > > >     long count2 = value2.getCount();
> > > > >     value2.setCount(count1 + count2);
> > > > >     return value2;
> > > > > }
> > > > > )
> > > > > .toStream()
> > > > > .selectKey( ... )
> > > > > .to(outputTopic);
> > > > >
> > > > > On Mon, Oct 14, 2019 at 3:53 PM Sophie Blee-Goldman <
> > > sop...@confluent.io>
> > > > > wrote:
> > > > > >
> > > > > > Out of curiosity, which version are you using?
> > > > > >
> > > > > > There's nothing that really jumps out at me as problematic in
> your
> > > > > > RocksDBConfigSetter, but note that I think you may need to
> increase
> > > > > > the number of threads in the "LOW priority thread pool" in
> addition
> > > to
> > > > > > setting the maxBackgroundCompactions -- this can be done as
> > > > > >
> > > > > > options.setEnv(Env.getDefault().setBackgroundThreads(n,
> > > > > > Env.COMPACTION_POOL));
> > > > > >
> > > > > > Is your disk throughput possibly the bottleneck? Note that if the
> > > > > > repartition topic
> > > > > > is followed by a subtopology doing heavy processing this will
> likely
> > > show
> > > > > > up as
> > > > > > lag like you describe. Also, if you have a large number of
> stateful
> > > tasks
> > > > > > (large
> > > > > > number of stateful operations, and/or large number of partitions)
> > > each
> > > > > one
> > > > > > will
> > > > > > have its own separate rocksdb instance, and the memory usage
> could be
> > > > > quite
> > > > > > high (which can cause rocks to page in/out things like index
> blocks
> > > which
> > > > > > always
> > > > > > need to be read before a lookup) -- I'd recommend also setting
> > > > > >
> > > > > > tableConfig.setPinL0FilterAndIndexBlocksInCache(true);
> > > > > >
> > > > > >
> > > > > > On Sun, Oct 13, 2019 at 6:40 PM Xiyuan Hu <xiyuan.h...@gmail.com
> >
> > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I'm running a Kafka Streams app with windowing function. I
> noticed
> > > > > > > that internal topic -repartition has huge lag while the system
> CPU
> > > > > > > usage is low and app is stable(join rate is almost 0).
> > > > > > >
> > > > > > > The repartition topic is an internal topic and created by the
> > > > > > > application automatically. The bytes in per sec for this topic
> is
> > > > > > > about 65MB/sec while the bytes out for this topic is only
> > > 15MB/sec. I
> > > > > > > have tried a couple configs to customize RocksDB config, but
> none
> > > of
> > > > > > > it could increase the bytes out value.
> > > > > > >
> > > > > > > I changed the default RocksDB block case size to 125MB and
> block
> > > size
> > > > > > > to 125MB as well. Also set the max write buffer number to 3.
> But it
> > > > > > > didn't help.
> > > > > > >
> > > > > > > May I know what I missed here? What's the best way to find why
> > > > > > > internal repartition topic has huge lags?
> > > > > > >
> > > > > > > Thanks for all the helps!!
> > > > > > >
> > > > > > > My RocksDB config:
> > > > > > > public static class CustomRocksDBConfig implements
> > > RocksDBConfigSetter
> > > > > {
> > > > > > >     private org.rocksdb.Cache cache = new
> org.rocksdb.LRUCache(125
> > > *
> > > > > > > 1024L * 1024L);
> > > > > > >
> > > > > > >     @Override
> > > > > > >     public void setConfig(final String storeName, final Options
> > > > > > > options, final Map<String, Object> configs) {
> > > > > > >         int n = Runtime.getRuntime().availableProcessors();
> > > > > > >         options.setMaxBackgroundCompactions(n);
> > > > > > >         options.setWriteBufferSize(125 * 1024 * 1024);
> > > > > > >         BlockBasedTableConfig tableConfig =
> (BlockBasedTableConfig)
> > > > > > > options.tableFormatConfig();
> > > > > > >         tableConfig.setBlockCache(cache);
> > > > > > >         tableConfig.setBlockCacheSize(125 * 1024 * 1024L);
> > > > > > >         tableConfig.setBlockSize(125 * 1024L);
> > > > > > >         tableConfig.setCacheIndexAndFilterBlocks(true);
> > > > > > >         options.setTableFormatConfig(tableConfig);
> > > > > > >         options.setMaxWriteBufferNumber(3);
> > > > > > >     }
> > > > > > >
> > > > > > >     public void close(final String storeName, final Options
> > > options) {
> > > > > > >         // See #5 below.
> > > > > > >         cache.close();
> > > > > > >     }
> > > > > > >
> > > > > > > }
> > > > > > >
> > > > > > > Thanks
> > > > > > > Kathy
> > > > > > >
> > > > >
> > >
>

Reply via email to