Yes, that could happen if a key was not updated for a longer period than
topic retention time.

If you want to force a changelog creation, you can do a dummy aggregate
instead of using KStreamBuilder#table()


> KTable table = KStreamBuilder.stream("topic").groupByKey().reduce(new 
> Reducer() {
>     @Override
>     public Object apply(Object oldValue, Object newValue) {
>         return newValue;
>     }
> }, "someStoreName");


-Matthias


On 2/8/17 11:39 AM, Mathieu Fenniak wrote:
> I think there could be correctness implications... the default
> cleanup.policy of delete would mean that topic entries past the retention
> policy might have been removed.  If you scale up the application, new
> application instances won't be able to restore a complete table into its
> local state store.  An operation like a join against that KTable would find
> no records where there should be record.
> 
> Mathieu
> 
> 
> On Wed, Feb 8, 2017 at 12:15 PM, Eno Thereska <eno.there...@gmail.com>
> wrote:
> 
>> If you fail to set the policy to compact, there shouldn't be any
>> correctness implications, however your topics will grow larger than
>> necessary.
>>
>> Eno
>>
>>> On 8 Feb 2017, at 18:56, Jon Yeargers <jon.yearg...@cedexis.com> wrote:
>>>
>>> What are the ramifications of failing to do this?
>>>
>>> On Tue, Feb 7, 2017 at 9:16 PM, Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> Yes, that is correct.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 2/7/17 6:39 PM, Mathieu Fenniak wrote:
>>>>> Hey kafka users,
>>>>>
>>>>> Is it correct that a Kafka topic that is used for a KTable should be
>> set
>>>> to
>>>>> cleanup.policy=compact?
>>>>>
>>>>> I've never noticed until today that the KStreamBuilder#table()
>>>>> documentation says: "However, no internal changelog topic is created
>>>> since
>>>>> the original input topic can be used for recovery"... [1], which seems
>>>> like
>>>>> it is only true if the topic is configured for compaction.  Otherwise
>> the
>>>>> original input topic won't necessarily contain the data necessary for
>>>>> recovery of the state store.
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf07951
>>>> 7523c83060/streams/src/main/java/org/apache/kafka/streams/
>>>> kstream/KStreamBuilder.java#L355
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Mathieu
>>>>>
>>>>
>>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to