If Im doing a KStream.leftJoin(KTable) how would I set this configuration for just the KTable portion?
IE I have KStream = KStreamBuilder.stream() KTable = KStreamBuilder.table() ... (join occurs.. data flows.. ppl are brought closer together.. there is peace in the valley.. for me... ) ... KafkaStreams = new KafkaStream(KStreamBuilder, config_with_cleanup_policy_or_not?) KafkaStream.start On Wed, Feb 8, 2017 at 12:30 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Yeah makes sense. I was looking at it from the point of view of keeping > all data forever. > > Eno > > > On 8 Feb 2017, at 20:27, Matthias J. Sax <matth...@confluent.io> wrote: > > > > Yes, that could happen if a key was not updated for a longer period than > > topic retention time. > > > > If you want to force a changelog creation, you can do a dummy aggregate > > instead of using KStreamBuilder#table() > > > > > >> KTable table = KStreamBuilder.stream("topic").groupByKey().reduce(new > Reducer() { > >> @Override > >> public Object apply(Object oldValue, Object newValue) { > >> return newValue; > >> } > >> }, "someStoreName"); > > > > > > -Matthias > > > > > > On 2/8/17 11:39 AM, Mathieu Fenniak wrote: > >> I think there could be correctness implications... the default > >> cleanup.policy of delete would mean that topic entries past the > retention > >> policy might have been removed. If you scale up the application, new > >> application instances won't be able to restore a complete table into its > >> local state store. An operation like a join against that KTable would > find > >> no records where there should be record. > >> > >> Mathieu > >> > >> > >> On Wed, Feb 8, 2017 at 12:15 PM, Eno Thereska <eno.there...@gmail.com> > >> wrote: > >> > >>> If you fail to set the policy to compact, there shouldn't be any > >>> correctness implications, however your topics will grow larger than > >>> necessary. > >>> > >>> Eno > >>> > >>>> On 8 Feb 2017, at 18:56, Jon Yeargers <jon.yearg...@cedexis.com> > wrote: > >>>> > >>>> What are the ramifications of failing to do this? > >>>> > >>>> On Tue, Feb 7, 2017 at 9:16 PM, Matthias J. Sax < > matth...@confluent.io> > >>>> wrote: > >>>> > >>>>> Yes, that is correct. > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> > >>>>> On 2/7/17 6:39 PM, Mathieu Fenniak wrote: > >>>>>> Hey kafka users, > >>>>>> > >>>>>> Is it correct that a Kafka topic that is used for a KTable should be > >>> set > >>>>> to > >>>>>> cleanup.policy=compact? > >>>>>> > >>>>>> I've never noticed until today that the KStreamBuilder#table() > >>>>>> documentation says: "However, no internal changelog topic is created > >>>>> since > >>>>>> the original input topic can be used for recovery"... [1], which > seems > >>>>> like > >>>>>> it is only true if the topic is configured for compaction. > Otherwise > >>> the > >>>>>> original input topic won't necessarily contain the data necessary > for > >>>>>> recovery of the state store. > >>>>>> > >>>>>> [1] > >>>>>> https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf07951 > >>>>> 7523c83060/streams/src/main/java/org/apache/kafka/streams/ > >>>>> kstream/KStreamBuilder.java#L355 > >>>>>> > >>>>>> Thanks, > >>>>>> > >>>>>> Mathieu > >>>>>> > >>>>> > >>>>> > >>> > >>> > >> > > > >