My stream gets stale after a while and it simply does not receive any new
messages, aka does not poll.

I'm using Kafka Streams 0.10.2.1 (same happens with 0.10.2.0) and the
brokers are running 0.10.1.1.

The stream state is RUNNING and there are no exceptions in the logs.

Looking at the JMX metrics, the threads are there and running, just not
doing anything.
The metric "consumer-coordinator-metrics > heartbeat-response-time-max"
(The max time taken to receive a response to a heartbeat request) reads
43,361 seconds (almost 12 hours) which is consistent with the time of the
hang. Shouldn't this trigger a failure somehow?

The stream configuration looks something like this:

Properties props = new Properties();
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
              CustomTimestampExtractor.class.getName());
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName);
    props.put(StreamsConfig.CLIENT_ID_CONFIG, streamName);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
myConfig.getBrokerList());
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
myConfig.getCommitIntervalMs()); // 5000
    props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
myConfig.getStreamThreadsCount()); // 1
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
myConfig.getMaxCacheBytes()); // 524_288_000L
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);

The stream LEFT JOINs 2 topics, one of them being a KTable, and outputs to
another topic.

Thanks in advance for the help!

Reply via email to