If Im doing a KStream.leftJoin(KTable) how would I set this configuration
for just the KTable portion?

IE I have

KStream = KStreamBuilder.stream()
KTable = KStreamBuilder.table()

...
(join occurs.. data flows.. ppl are brought closer together.. there is
peace in the valley.. for me... )
...

KafkaStreams = new KafkaStream(KStreamBuilder,
config_with_cleanup_policy_or_not?)
KafkaStream.start

On Wed, Feb 8, 2017 at 12:30 PM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Yeah makes sense. I was looking at it from the point of view of keeping
> all data forever.
>
> Eno
>
> > On 8 Feb 2017, at 20:27, Matthias J. Sax <matth...@confluent.io> wrote:
> >
> > Yes, that could happen if a key was not updated for a longer period than
> > topic retention time.
> >
> > If you want to force a changelog creation, you can do a dummy aggregate
> > instead of using KStreamBuilder#table()
> >
> >
> >> KTable table = KStreamBuilder.stream("topic").groupByKey().reduce(new
> Reducer() {
> >>    @Override
> >>    public Object apply(Object oldValue, Object newValue) {
> >>        return newValue;
> >>    }
> >> }, "someStoreName");
> >
> >
> > -Matthias
> >
> >
> > On 2/8/17 11:39 AM, Mathieu Fenniak wrote:
> >> I think there could be correctness implications... the default
> >> cleanup.policy of delete would mean that topic entries past the
> retention
> >> policy might have been removed.  If you scale up the application, new
> >> application instances won't be able to restore a complete table into its
> >> local state store.  An operation like a join against that KTable would
> find
> >> no records where there should be record.
> >>
> >> Mathieu
> >>
> >>
> >> On Wed, Feb 8, 2017 at 12:15 PM, Eno Thereska <eno.there...@gmail.com>
> >> wrote:
> >>
> >>> If you fail to set the policy to compact, there shouldn't be any
> >>> correctness implications, however your topics will grow larger than
> >>> necessary.
> >>>
> >>> Eno
> >>>
> >>>> On 8 Feb 2017, at 18:56, Jon Yeargers <jon.yearg...@cedexis.com>
> wrote:
> >>>>
> >>>> What are the ramifications of failing to do this?
> >>>>
> >>>> On Tue, Feb 7, 2017 at 9:16 PM, Matthias J. Sax <
> matth...@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> Yes, that is correct.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 2/7/17 6:39 PM, Mathieu Fenniak wrote:
> >>>>>> Hey kafka users,
> >>>>>>
> >>>>>> Is it correct that a Kafka topic that is used for a KTable should be
> >>> set
> >>>>> to
> >>>>>> cleanup.policy=compact?
> >>>>>>
> >>>>>> I've never noticed until today that the KStreamBuilder#table()
> >>>>>> documentation says: "However, no internal changelog topic is created
> >>>>> since
> >>>>>> the original input topic can be used for recovery"... [1], which
> seems
> >>>>> like
> >>>>>> it is only true if the topic is configured for compaction.
> Otherwise
> >>> the
> >>>>>> original input topic won't necessarily contain the data necessary
> for
> >>>>>> recovery of the state store.
> >>>>>>
> >>>>>> [1]
> >>>>>> https://github.com/apache/kafka/blob/e108a8b4ed4512b021f9326cf07951
> >>>>> 7523c83060/streams/src/main/java/org/apache/kafka/streams/
> >>>>> kstream/KStreamBuilder.java#L355
> >>>>>>
> >>>>>> Thanks,
> >>>>>>
> >>>>>> Mathieu
> >>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
> >>
> >
>
>

Reply via email to