I would be great if we could help to simplify debugging. Any ideas?
-Matthias On 5/9/17 7:11 AM, João Peixoto wrote: > I'll leave it to your discretion, after realizing the problem it was an > easy work around, the bad experience was debugging and figuring out what > was going on. > > Thanks for the help once again > > JP > On Tue, May 9, 2017 at 4:36 AM Eno Thereska <eno.there...@gmail.com> wrote: > >> Yeah that's a good point, I'm not taking action then. >> >> Eno >> >> On Mon, May 8, 2017 at 10:38 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Hey, >>> >>> I am not against opening a JIRA, but I am wondering what we should >>> describe/report there. If I understand the scenario correctly, João uses >>> a custom RocksDB store and calls seek() in user code land. As it is a >>> bug in RocksDB that seek takes so long, I am not sure what we could >>> improve within Streams to prevent this? The only thing I am seeing >>> right now is that we could reduce `max.poll.interval.ms` that we just >>> increased to guard against failure for long stat recreation phases. >>> >>> Any thoughts? >>> >>> >>> -Matthias >>> >>> >>> On 5/3/17 8:48 AM, João Peixoto wrote: >>>> That'd be great as I'm not familiar with the protocol there >>>> On Wed, May 3, 2017 at 8:41 AM Eno Thereska <eno.there...@gmail.com> >>> wrote: >>>> >>>>> Cool, thanks, shall we open a JIRA? >>>>> >>>>> Eno >>>>>> On 3 May 2017, at 16:16, João Peixoto <joao.harti...@gmail.com> >> wrote: >>>>>> >>>>>> Actually I need to apologize, I pasted the wrong issue, I meant to >>> paste >>>>>> https://github.com/facebook/rocksdb/issues/261. >>>>>> >>>>>> RocksDB did not produce a crash report since it didn't actually >> crash. >>> I >>>>>> performed thread dumps on stale and not-stale instances which >> revealed >>>>> the >>>>>> common behavior and I collect and plot several Kafka metrics, >> including >>>>>> "punctuate" durations, therefore I know it took a long time and >>>>> eventually >>>>>> finished. >>>>>> >>>>>> Joao >>>>>> >>>>>> On Wed, May 3, 2017 at 6:22 AM Eno Thereska <eno.there...@gmail.com> >>>>> wrote: >>>>>> >>>>>>> Hi there, >>>>>>> >>>>>>> Thanks for double checking. Does RocksDB actually crash or produce a >>>>> crash >>>>>>> dump? I’m curious how you know that the issue is >>>>>>> https://github.com/facebook/rocksdb/issues/1121 < >>>>>>> https://github.com/facebook/rocksdb/issues/1121>, so just double >>>>> checking >>>>>>> with you. >>>>>>> >>>>>>> If that’s indeed the case, do you mind opening a JIRA (a copy-paste >> of >>>>> the >>>>>>> below should suffice)? Alternatively let us know and we’ll open it. >>>>> Sounds >>>>>>> like we should handle this better. >>>>>>> >>>>>>> Thanks, >>>>>>> Eno >>>>>>> >>>>>>> >>>>>>>> On May 3, 2017, at 5:49 AM, João Peixoto <joao.harti...@gmail.com> >>>>>>> wrote: >>>>>>>> >>>>>>>> I believe I found the root cause of my problem. I seem to have hit >>> this >>>>>>>> RocksDB bug https://github.com/facebook/rocksdb/issues/1121 >>>>>>>> >>>>>>>> On my stream configuration I have a custom transformer used for >>>>>>>> deduplicating records, highly inspired in the >>>>>>>> EventDeduplicationLambdaIntegrationTest >>>>>>>> < >>>>>>> >>>>> https://github.com/confluentinc/examples/blob/3. >>> 2.x/kafka-streams/src/test/java/io/confluent/examples/streams/ >>> EventDeduplicationLambdaIntegrationTest.java#L161 >>>>>>>> >>>>>>>> but >>>>>>>> adjusted to my use case, special emphasis on the "punctuate" >> method. >>>>>>>> >>>>>>>> All the stale instances had the main stream thread "RUNNING" the >>>>>>>> "punctuate" method of this transformer, which in term was running >>>>> RocksDB >>>>>>>> "seekToFirst". >>>>>>>> >>>>>>>> Also during my debugging one such instance finished the "punctuate" >>>>>>> method, >>>>>>>> which took ~11h, exactly the time the instance was stuck for. >>>>>>>> Changing the backing state store from "persistent" to "inMemory" >>> solved >>>>>>> my >>>>>>>> issue, at least after several days running, no stuck instances. >>>>>>>> >>>>>>>> This leads me to ask, shouldn't Kafka detect such a situation >> fairly >>>>>>>> quickly? Instead of just stopping polling? My guess is that the >>>>> heartbeat >>>>>>>> thread which now is separate continues working fine, since by >>>>> definition >>>>>>>> the stream runs a message through the whole pipeline this step >>> probably >>>>>>>> just looked like it was VERY slow. Not sure what the best approach >>> here >>>>>>>> would be. >>>>>>>> >>>>>>>> PS The linked code clearly states "This code is for demonstration >>>>>>> purposes >>>>>>>> and was not tested for production usage" so that's on me >>>>>>>> >>>>>>>> On Tue, May 2, 2017 at 11:20 AM Matthias J. Sax < >>> matth...@confluent.io >>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Did you check the logs? Maybe you need to increase log level to >>> DEBUG >>>>> to >>>>>>>>> get some more information. >>>>>>>>> >>>>>>>>> Did you double check committed offsets via >>>>> bin/kafka-consumer-groups.sh? >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> On 4/28/17 9:22 AM, João Peixoto wrote: >>>>>>>>>> My stream gets stale after a while and it simply does not receive >>> any >>>>>>> new >>>>>>>>>> messages, aka does not poll. >>>>>>>>>> >>>>>>>>>> I'm using Kafka Streams 0.10.2.1 (same happens with 0.10.2.0) and >>> the >>>>>>>>>> brokers are running 0.10.1.1. >>>>>>>>>> >>>>>>>>>> The stream state is RUNNING and there are no exceptions in the >>> logs. >>>>>>>>>> >>>>>>>>>> Looking at the JMX metrics, the threads are there and running, >> just >>>>> not >>>>>>>>>> doing anything. >>>>>>>>>> The metric "consumer-coordinator-metrics > >>>>> heartbeat-response-time-max" >>>>>>>>>> (The max time taken to receive a response to a heartbeat request) >>>>> reads >>>>>>>>>> 43,361 seconds (almost 12 hours) which is consistent with the >> time >>> of >>>>>>> the >>>>>>>>>> hang. Shouldn't this trigger a failure somehow? >>>>>>>>>> >>>>>>>>>> The stream configuration looks something like this: >>>>>>>>>> >>>>>>>>>> Properties props = new Properties(); >>>>>>>>>> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, >>>>>>>>>> CustomTimestampExtractor.class.getName()); >>>>>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName); >>>>>>>>>> props.put(StreamsConfig.CLIENT_ID_CONFIG, streamName); >>>>>>>>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>>>>>>>>> myConfig.getBrokerList()); >>>>>>>>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>>>>>>>>> Serdes.String().getClass().getName()); >>>>>>>>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>>>>>>>>> Serdes.ByteArray().getClass().getName()); >>>>>>>>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, >>>>>>>>>> myConfig.getCommitIntervalMs()); // 5000 >>>>>>>>>> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, >>> "DEBUG"); >>>>>>>>>> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, >>>>>>>>>> myConfig.getStreamThreadsCount()); // 1 >>>>>>>>>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, >>>>>>>>>> myConfig.getMaxCacheBytes()); // 524_288_000L >>>>>>>>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); >>>>>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); >>>>>>>>>> >>>>>>>>>> The stream LEFT JOINs 2 topics, one of them being a KTable, and >>>>> outputs >>>>>>>>> to >>>>>>>>>> another topic. >>>>>>>>>> >>>>>>>>>> Thanks in advance for the help! >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>> >>>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature