Yes, that could happen if a key was not updated for a longer period than topic retention time.
If you want to force a changelog creation, you can do a dummy aggregate instead of using KStreamBuilder#table() > KTable table = KStreamBuilder.stream("topic").groupByKey().reduce(new > Reducer() { > @Override > public Object apply(Object oldValue, Object newValue) { > return newValue; > } > }, "someStoreName"); -Matthias On 2/8/17 11:39 AM, Mathieu Fenniak wrote: > I think there could be correctness implications... the default > cleanup.policy of delete would mean that topic entries past the retention > policy might have been removed. If you scale up the application, new > application instances won't be able to restore a complete table into its > local state store. An operation like a join against that KTable would find > no records where there should be record. > > Mathieu > > > On Wed, Feb 8, 2017 at 12:15 PM, Eno Thereska <eno.there...@gmail.com> > wrote: > >> If you fail to set the policy to compact, there shouldn't be any >> correctness implications, however your topics will grow larger than >> necessary. >> >> Eno >> >>> On 8 Feb 2017, at 18:56, Jon Yeargers <jon.yearg...@cedexis.com> wrote: >>> >>> What are the ramifications of failing to do this? >>> >>> On Tue, Feb 7, 2017 at 9:16 PM, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> Yes, that is correct. >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 2/7/17 6:39 PM, Mathieu Fenniak wrote: >>>>> Hey kafka users, >>>>> >>>>> Is it correct that a Kafka topic that is used for a KTable should be >> set >>>> to >>>>> cleanup.policy=compact? >>>>> >>>>> I've never noticed until today that the KStreamBuilder#table() >>>>> documentation says: "However, no internal changelog topic is created >>>> since >>>>> the original input topic can be used for recovery"... [1], which seems >>>> like >>>>> it is only true if the topic is configured for compaction. Otherwise >> the >>>>> original input topic won't necessarily contain the data necessary for >>>>> recovery of the state store. >>>>> >>>>> [1] >>>>> https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf07951 >>>> 7523c83060/streams/src/main/java/org/apache/kafka/streams/ >>>> kstream/KStreamBuilder.java#L355 >>>>> >>>>> Thanks, >>>>> >>>>> Mathieu >>>>> >>>> >>>> >> >> >
signature.asc
Description: OpenPGP digital signature