[ https://issues.apache.org/jira/browse/KAFKA-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16900693#comment-16900693 ]
Bruno Cadonna commented on KAFKA-8755: -------------------------------------- In a first analysis, I narrowed down the issue to the setting of the offset limits in the following two code snippets. {{AbstractTask#updateOffsetLimits()}} {code:java} final OffsetAndMetadata metadata = consumer.committed(partition); final long offset = metadata != null ? metadata.offset() : 0L; {code} {{ProcessorStateManager#offsetLimit}} {code:java} final Long limit = offsetLimits.get(partition); return limit != null ? limit : Long.MAX_VALUE; {code} Currently, my guess is that {{AbstractTask#updateOffsetLimits()}} is called during initialization and closing of the stand-by task but it is not called during normal processing. The reason is that during normal processing {{StandByTask#commit()}} is never called because the flag {{commitNeeded}} is never set to true. This occurs only for stand-by tasks for optimized source tables. Further investigation is needed to confirm my guess. > Stand-by Task of an Optimized Source Table Does Not Write Anything to its > State Store > ------------------------------------------------------------------------------------- > > Key: KAFKA-8755 > URL: https://issues.apache.org/jira/browse/KAFKA-8755 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.0 > Reporter: Bruno Cadonna > Priority: Major > Labels: newbie > Attachments: StandbyTaskTest.java > > > With the following topology: > {code:java} > builder.table( > INPUT_TOPIC, > Consumed.with(Serdes.Integer(), Serdes.Integer()), > Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(stateName) > ) > {code} > and with topology optimization turned on, Kafka Streams uses the input topic > {{INPUT_TOPIC}} as the change log topic for state store {{stateName}}. A > stand-by task for such a topology should read from {{INPUT_TOPIC}} and should > write the records to its state store so that the streams client that runs the > stand-by task can take over the execution of the topology in case of a > failure with an up-to-date replica of the state. > Currently, the stand-by task described above reads from the input topic but > does not write the records to its state store. Thus, after a failure the > stand-by task cannot provide any up-to-date state store and the streams > client needs to construct the state from scratch before it can take over the > execution. > The described behaviour can be reproduced with the attached test. -- This message was sent by Atlassian JIRA (v7.6.14#76016)