Follow up doubt (let me know if a new question should be created).

Do we always need a repartitioning topic?

If I'm reading things correctly when we change the key of a record we need
make sure the new key falls on the same partition that we are processing.
This makes a lot of sense if after such change we'd need to join on some
other stream/table or cases where we sink to a topic.
However, in cases where none of these things, the repartition topic does
nothing? If this is true can we somehow not create it?

On Sun, May 14, 2017 at 7:58 PM João Peixoto <joao.harti...@gmail.com>
wrote:

> Very useful links, thank you.
>
> Part of my original misunderstanding was that the at-least-once guarantee
> was considered fulfilled if the record reached a sink node.
>
> Thanks for all the feedback, you may consider my question answered.
> Feel free to ask further questions about the use case if found interesting.
> On Sun, May 14, 2017 at 4:31 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> Yes.
>>
>> It is basically "documented", as Streams guarantees at-least-once
>> semantics. Thus, we make sure, you will not loose any data in case of
>> failure. (ie, the overall guarantee is documented)
>>
>> To achieve this, we always flush before we commit offsets. (This is not
>> explicitly documented as it's an implementation detail.)
>>
>> There is some doc's in the wiki:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Internal+Data+Management#KafkaStreamsInternalDataManagement-Commits
>>
>> This might also help in case you want to dig into the code:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture
>>
>>
>> -Matthias
>>
>> On 5/14/17 4:07 PM, João Peixoto wrote:
>> > I think I now understand what Matthias meant when he said "If you use a
>> > global remote store, you would not need to back your changes in a
>> changelog
>> > topic, as the store would not be lost if case of failure".
>> >
>> > I had the misconception that if a state store threw an exception during
>> > "flush", all messages received between now and the previous flush would
>> be
>> > "lost", hence the need for a changelog topic. However, it seems that the
>> > "repartition" topic actually solves this problem.
>> >
>> > There's very little information about the latter, at least that I could
>> > find, but an entry seems to be added whenever a record enters the
>> > "aggregate", but the state store "consumer" of this topic only updates
>> its
>> > offset after the flush completes, meaning that the repartition topic
>> will
>> > be replayed! It seems this problem is already solved for me, I'd
>> appreciate
>> > if someone could point me to the documentation or code that backs up the
>> > above.
>> >
>> >
>> > On Sat, May 13, 2017 at 3:11 PM João Peixoto <joao.harti...@gmail.com>
>> > wrote:
>> >
>> >> Replies in line as well
>> >>
>> >>
>> >> On Sat, May 13, 2017 at 3:25 AM Eno Thereska <eno.there...@gmail.com>
>> >> wrote:
>> >>
>> >>> Hi João,
>> >>>
>> >>> Some answers inline:
>> >>>
>> >>>> On 12 May 2017, at 18:27, João Peixoto <joao.harti...@gmail.com>
>> wrote:
>> >>>>
>> >>>> Thanks for the comments, here are some clarifications:
>> >>>>
>> >>>> I did look at interactive queries, if I understood them correctly it
>> >>> means
>> >>>> that my state store must hold all the results in order for it to be
>> >>>> queried, either in memory or through disk (RocksDB).
>> >>>
>> >>> Yes, that's correct.
>> >>>
>> >>>
>> >>>> 1. The retention policy on my aggregate operations, in my case, is 90
>> >>> days,
>> >>>> which is way too much data to hold in memory
>> >>>
>> >>> It will depend on how much data/memory you have, but perhaps it could
>> be
>> >>> too much to hold in memory
>> >>> for that long (especially because some failure is bound to happen in
>> 90
>> >>> days)
>> >>>
>> >>>> 2. My stream instances do no have access to disk, even if they did,
>> >>>> wouldn't it mean I'd need almost twice the disk space to hold the
>> same
>> >>>> data? I.e. kafka brokers golding the topics + RocksDB holding the
>> state?
>> >>>
>> >>> That's interesting, is there a reason why the streams instances don't
>> >>> have access to a local file system? I'm curious
>> >>> what kind of deployment you have.
>> >>>
>> >>
>> >> My instances are deployed on Kubernetes
>> >>
>> >>
>> >>>
>> >>> It is true that twice the disk space is needed to hold the data on
>> >>> RocksDb as well as in the Kafka changelog topic,
>> >>> however that is no different that the current situation where the
>> data is
>> >>> stored to a remote database right?
>> >>> I understand your point that you might not have access to local disk
>> >>> though.
>> >>>
>> >>
>> >> Not quite. In my scenario the state store would be backed by the
>> changelog
>> >> AND MongoDB. On bootstrap we would fetch the state stored in the
>> changelog
>> >> into a structure like a LoadingCache (from Guava's), which would load
>> >> additional fields from MongoDB if needed. Therefore the changelog could
>> >> hold say 12 hours of records and 90 days is stored in MongoDB only.
>> That's
>> >> exactly the whole strategy I'm trying to validate. The implications
>> would
>> >> be that in case of failure we'd need to recover within 12 hours or else
>> >> we'd need to replay from the source topics.
>> >>
>> >>
>> >>>
>> >>>
>> >>>> 3. Because a crash may happen between an entry is added to the
>> changelog
>> >>>> and the data store is flushed, I need to get all the changes
>> everytime
>> >>> if I
>> >>>> want to guarantee that all data is eventually persisted. This is why
>> >>>> checkpoint files may not work for me.
>> >>>
>> >>> The upcoming exactly-once support in 0.11 will help with these kind of
>> >>> guarantees.
>> >>>
>> >>
>> >> Not sure I understand how exactly-once would help in this case.
>> >>
>> >>
>> >>>
>> >>>>
>> >>>> Standby tasks looks great, I forgot about those.
>> >>>>
>> >>>> I'm at the design phase so this is all tentative. Answering Matthias
>> >>>> questions
>> >>>>
>> >>>> My state stores are local. As mentioned above I do not have access to
>> >>> disk
>> >>>> therefore I need to recover all data from somewhere, in this case I'm
>> >>>> thinking about the changelog.
>> >>>
>> >>> So this is where I get confused a bit, since you mention that your
>> state
>> >>> stores are "local", i.e., the streams instance
>> >>> does have access to a local file system.
>> >>>
>> >>
>> >> When I said "local" I meant that the state stores are partial for each
>> >> instance, i.e. they only have the partitions the task is responsible
>> for
>> >> (normal behavior) rather than a global store.
>> >>
>> >>
>> >>>
>> >>>> I read about Kafka Connect but have never used it, maybe that'll
>> >>> simplify
>> >>>> things, but I need to do some studying there.
>> >>>>
>> >>>> The reason why even though my stores are local but still I want to
>> store
>> >>>> them on a database and not use straight up RocksDB (or global
>> stores) is
>> >>>> because this would allow me to migrate my current processing
>> pipeline to
>> >>>> Kafka Streams without needing to change the frontend part of the
>> >>>> application, which fetches data from MongoDB.
>> >>>
>> >>> Makes sense.
>> >>>
>> >>>>
>> >>>> PS When you mention Global State Stores I'm thinking of
>> >>>>
>> >>>
>> http://docs.confluent.io/3.2.0/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application
>> >>> ,
>> >>>> is this correct?
>> >>>
>> >>> No I think Matthias is saying that if you have a remote server
>> somewhere
>> >>> where you store all your data (like a shared file system).
>> >>> This is not something Kafka would provide.
>> >>>
>> >>> Eno
>> >>>
>> >>>>
>> >>>>
>> >>>> On Fri, May 12, 2017 at 10:02 AM Matthias J. Sax <
>> matth...@confluent.io
>> >>>>
>> >>>> wrote:
>> >>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> I am not sure about your overall setup. Are your stores local
>> (similar
>> >>>>> to RocksDB) or are you using one global remote store? If you use a
>> >>>>> global remote store, you would not need to back your changes in a
>> >>>>> changelog topic, as the store would not be lost if case of failure.
>> >>>>>
>> >>>>> Also (in case that your stores are remote), did you consider using
>> >>> Kafka
>> >>>>> Connect to export your data into an external store like MySQL or
>> >>> MongoDB
>> >>>>> instead of writing your own custom stores for Streams?
>> >>>>>
>> >>>>> If your stores are local, why do you write custom stores? I am
>> curious
>> >>>>> to understand why RocksDB does not serve your needs.
>> >>>>>
>> >>>>>
>> >>>>> About your two comment:
>> >>>>>
>> >>>>> (1) Streams uses RocksDB by default and the default implementation
>> is
>> >>>>> using "checkpoint files" in next release. Those checkpoint files
>> track
>> >>>>> the changelog offsets of the data that got flushed to disc. This
>> allows
>> >>>>> to reduce the startup time, as only the tail of the changelog needs
>> to
>> >>>>> be read to bring the store up to date. For this, you would always
>> (1)
>> >>>>> write to the changelog, (2) write to you store. Each time you need
>> to
>> >>>>> flush, you know that all data is in the changelog already. After
>> each
>> >>>>> flush, you can update the "local offset checkpoint file".
>> >>>>>
>> >>>>> I guess, if you use local stores you can apply a similar pattern in
>> you
>> >>>>> custom store implementation. (And as mentioned above, for global
>> remote
>> >>>>> store you would not need the changelog anyway. -- This also applies
>> to
>> >>>>> your recovery question from below.)
>> >>>>>
>> >>>>> (2) You can configure standby task (via StreamConfig
>> >>>>> "num.standby.replicas"). This will set up standby tasks that
>> passively
>> >>>>> replicate your stores to another instance. In error case, state
>> will be
>> >>>>> migrated to those "hot standbys" reducing recovery time
>> significantly.
>> >>>>>
>> >>>>>
>> >>>>> About your question:
>> >>>>>
>> >>>>> (1) Yes.
>> >>>>> (2) Partly parallel (ie, if you run with multiple threads -- cf.
>> >>>>> StreamsConfig "num.streams.thread"). Each thread, flushes all it's
>> >>>>> stores sequentially.
>> >>>>> (3) Yes. There will be a store for each partition. (If store is
>> local.)
>> >>>>> (4) Yes. The overall processing loop is sequential (cf.
>> >>>>>
>> >>>>>
>> >>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture
>> >>>>> )
>> >>>>> Also, the next commit point is computed after a successful commit --
>> >>>>> thus, if one commit is delayed, all consecutive commit points are
>> >>>>> "shifted" by this delay.
>> >>>>>
>> >>>>>
>> >>>>> -Matthias
>> >>>>>
>> >>>>>
>> >>>>> On 5/12/17 9:00 AM, João Peixoto wrote:
>> >>>>>> On a stream definition I perform an "aggregate" which is configured
>> >>> with
>> >>>>> a
>> >>>>>> state store.
>> >>>>>>
>> >>>>>> *Goal*: Persist the aggregation results into a database, e.g.
>> MySQL or
>> >>>>>> MongoDB
>> >>>>>>
>> >>>>>> *Working approach*:
>> >>>>>> I have created a custom StateStore backed by a changelog topic like
>> >>> the
>> >>>>>> builtin state stores. Whenever the store gets flushed I save to the
>> >>>>>> database, mark the record as being persisted and log the change in
>> the
>> >>>>>> changelog.
>> >>>>>>
>> >>>>>> If something goes wrong during processing, the changelog guarantees
>> >>> that
>> >>>>> I
>> >>>>>> do not lose data, restores the state and if some data point was not
>> >>>>>> persisted, the next stream instance will persist it on its flush
>> >>>>> operation.
>> >>>>>>
>> >>>>>> 1. I cannot store too much data in the changelog, even with
>> >>> compaction,
>> >>>>> if
>> >>>>>> I have too much data, bootstrapping a stream instance would take a
>> >>> long
>> >>>>> time
>> >>>>>> 2. On the other hand, if I take too long to recover from a
>> failure, I
>> >>> may
>> >>>>>> lose data. So there seems to be a delicate tradeoff here
>> >>>>>>
>> >>>>>> *Questions*:
>> >>>>>>
>> >>>>>> 1. Is this a reasonable use case?
>> >>>>>> 2. In a scenario where my stream would have a fanout (multiple
>> >>>>> sub-streams
>> >>>>>> based on the same stream), each branch would perform different
>> >>>>> "aggregate"
>> >>>>>> operations, each with its own state store. Are state stores
>> flushed in
>> >>>>>> parallel or sequentially?
>> >>>>>> 3. The above also applies per-partition. As a stream definition is
>> >>>>>> parallelized by partition, will one instance hold different store
>> >>>>> instances
>> >>>>>> for each one?
>> >>>>>> 4. Through synthetic sleeps I simulated slow flushes, slower than
>> the
>> >>>>>> commit interval. The stream seems to be ok with it and didn't
>> throw, I
>> >>>>>> assume the Kafka consumer does not poll more records until all of
>> the
>> >>>>>> previous poll's are committed, but I couldn't find documentation to
>> >>> back
>> >>>>>> this statement. Is there a timeout for "commit" operations?
>> >>>>>>
>> >>>>>>
>> >>>>>> Sample code
>> >>>>>>
>> >>>>>> public class AggregateHolder {
>> >>>>>>
>> >>>>>>    private Long commonKey;
>> >>>>>>    private List<Double> rawValues = new ArrayList<>();
>> >>>>>>    private boolean persisted;
>> >>>>>>
>> >>>>>> // ...
>> >>>>>> }
>> >>>>>>
>> >>>>>> And stream definition
>> >>>>>>
>> >>>>>> source.groupByKey(Serdes.String(), recordSerdes)
>> >>>>>>              .aggregate(
>> >>>>>>                      AggregateHolder::new,
>> >>>>>>                      (aggKey, value, aggregate) ->
>> >>>>>> aggregate.addValue(value.getValue()),
>> >>>>>>                      new DemoStoreSupplier<>(/* ... */)
>> >>>>>>              )
>> >>>>>>              .foreach((key, agg) -> log.debug("Aggregate: {}={}
>> ({})",
>> >>>>>> key, agg.getAverage(), agg));
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>
>> >>>
>> >
>>
>>

Reply via email to