(it's preferred to create another email thread for a different topic to make it 
easier to look back)

Yes, there could be room for optimizations, e.g., see this: 


> On 16 May 2017, at 16:01, João Peixoto <joao.harti...@gmail.com> wrote:
> Follow up doubt (let me know if a new question should be created).
> Do we always need a repartitioning topic?
> If I'm reading things correctly when we change the key of a record we need
> make sure the new key falls on the same partition that we are processing.
> This makes a lot of sense if after such change we'd need to join on some
> other stream/table or cases where we sink to a topic.
> However, in cases where none of these things, the repartition topic does
> nothing? If this is true can we somehow not create it?
> On Sun, May 14, 2017 at 7:58 PM João Peixoto <joao.harti...@gmail.com>
> wrote:
>> Very useful links, thank you.
>> Part of my original misunderstanding was that the at-least-once guarantee
>> was considered fulfilled if the record reached a sink node.
>> Thanks for all the feedback, you may consider my question answered.
>> Feel free to ask further questions about the use case if found interesting.
>> On Sun, May 14, 2017 at 4:31 PM Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>> Yes.
>>> It is basically "documented", as Streams guarantees at-least-once
>>> semantics. Thus, we make sure, you will not loose any data in case of
>>> failure. (ie, the overall guarantee is documented)
>>> To achieve this, we always flush before we commit offsets. (This is not
>>> explicitly documented as it's an implementation detail.)
>>> There is some doc's in the wiki:
>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Internal+Data+Management#KafkaStreamsInternalDataManagement-Commits
>>> This might also help in case you want to dig into the code:
>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture
>>> -Matthias
>>> On 5/14/17 4:07 PM, João Peixoto wrote:
>>>> I think I now understand what Matthias meant when he said "If you use a
>>>> global remote store, you would not need to back your changes in a
>>> changelog
>>>> topic, as the store would not be lost if case of failure".
>>>> I had the misconception that if a state store threw an exception during
>>>> "flush", all messages received between now and the previous flush would
>>> be
>>>> "lost", hence the need for a changelog topic. However, it seems that the
>>>> "repartition" topic actually solves this problem.
>>>> There's very little information about the latter, at least that I could
>>>> find, but an entry seems to be added whenever a record enters the
>>>> "aggregate", but the state store "consumer" of this topic only updates
>>> its
>>>> offset after the flush completes, meaning that the repartition topic
>>> will
>>>> be replayed! It seems this problem is already solved for me, I'd
>>> appreciate
>>>> if someone could point me to the documentation or code that backs up the
>>>> above.
>>>> On Sat, May 13, 2017 at 3:11 PM João Peixoto <joao.harti...@gmail.com>
>>>> wrote:
>>>>> Replies in line as well
>>>>> On Sat, May 13, 2017 at 3:25 AM Eno Thereska <eno.there...@gmail.com>
>>>>> wrote:
>>>>>> Hi João,
>>>>>> Some answers inline:
>>>>>>> On 12 May 2017, at 18:27, João Peixoto <joao.harti...@gmail.com>
>>> wrote:
>>>>>>> Thanks for the comments, here are some clarifications:
>>>>>>> I did look at interactive queries, if I understood them correctly it
>>>>>> means
>>>>>>> that my state store must hold all the results in order for it to be
>>>>>>> queried, either in memory or through disk (RocksDB).
>>>>>> Yes, that's correct.
>>>>>>> 1. The retention policy on my aggregate operations, in my case, is 90
>>>>>> days,
>>>>>>> which is way too much data to hold in memory
>>>>>> It will depend on how much data/memory you have, but perhaps it could
>>> be
>>>>>> too much to hold in memory
>>>>>> for that long (especially because some failure is bound to happen in
>>> 90
>>>>>> days)
>>>>>>> 2. My stream instances do no have access to disk, even if they did,
>>>>>>> wouldn't it mean I'd need almost twice the disk space to hold the
>>> same
>>>>>>> data? I.e. kafka brokers golding the topics + RocksDB holding the
>>> state?
>>>>>> That's interesting, is there a reason why the streams instances don't
>>>>>> have access to a local file system? I'm curious
>>>>>> what kind of deployment you have.
>>>>> My instances are deployed on Kubernetes
>>>>>> It is true that twice the disk space is needed to hold the data on
>>>>>> RocksDb as well as in the Kafka changelog topic,
>>>>>> however that is no different that the current situation where the
>>> data is
>>>>>> stored to a remote database right?
>>>>>> I understand your point that you might not have access to local disk
>>>>>> though.
>>>>> Not quite. In my scenario the state store would be backed by the
>>> changelog
>>>>> AND MongoDB. On bootstrap we would fetch the state stored in the
>>> changelog
>>>>> into a structure like a LoadingCache (from Guava's), which would load
>>>>> additional fields from MongoDB if needed. Therefore the changelog could
>>>>> hold say 12 hours of records and 90 days is stored in MongoDB only.
>>> That's
>>>>> exactly the whole strategy I'm trying to validate. The implications
>>> would
>>>>> be that in case of failure we'd need to recover within 12 hours or else
>>>>> we'd need to replay from the source topics.
>>>>>>> 3. Because a crash may happen between an entry is added to the
>>> changelog
>>>>>>> and the data store is flushed, I need to get all the changes
>>> everytime
>>>>>> if I
>>>>>>> want to guarantee that all data is eventually persisted. This is why
>>>>>>> checkpoint files may not work for me.
>>>>>> The upcoming exactly-once support in 0.11 will help with these kind of
>>>>>> guarantees.
>>>>> Not sure I understand how exactly-once would help in this case.
>>>>>>> Standby tasks looks great, I forgot about those.
>>>>>>> I'm at the design phase so this is all tentative. Answering Matthias
>>>>>>> questions
>>>>>>> My state stores are local. As mentioned above I do not have access to
>>>>>> disk
>>>>>>> therefore I need to recover all data from somewhere, in this case I'm
>>>>>>> thinking about the changelog.
>>>>>> So this is where I get confused a bit, since you mention that your
>>> state
>>>>>> stores are "local", i.e., the streams instance
>>>>>> does have access to a local file system.
>>>>> When I said "local" I meant that the state stores are partial for each
>>>>> instance, i.e. they only have the partitions the task is responsible
>>> for
>>>>> (normal behavior) rather than a global store.
>>>>>>> I read about Kafka Connect but have never used it, maybe that'll
>>>>>> simplify
>>>>>>> things, but I need to do some studying there.
>>>>>>> The reason why even though my stores are local but still I want to
>>> store
>>>>>>> them on a database and not use straight up RocksDB (or global
>>> stores) is
>>>>>>> because this would allow me to migrate my current processing
>>> pipeline to
>>>>>>> Kafka Streams without needing to change the frontend part of the
>>>>>>> application, which fetches data from MongoDB.
>>>>>> Makes sense.
>>>>>>> PS When you mention Global State Stores I'm thinking of
>>> http://docs.confluent.io/3.2.0/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application
>>>>>> ,
>>>>>>> is this correct?
>>>>>> No I think Matthias is saying that if you have a remote server
>>> somewhere
>>>>>> where you store all your data (like a shared file system).
>>>>>> This is not something Kafka would provide.
>>>>>> Eno
>>>>>>> On Fri, May 12, 2017 at 10:02 AM Matthias J. Sax <
>>> matth...@confluent.io
>>>>>>> wrote:
>>>>>>>> Hi,
>>>>>>>> I am not sure about your overall setup. Are your stores local
>>> (similar
>>>>>>>> to RocksDB) or are you using one global remote store? If you use a
>>>>>>>> global remote store, you would not need to back your changes in a
>>>>>>>> changelog topic, as the store would not be lost if case of failure.
>>>>>>>> Also (in case that your stores are remote), did you consider using
>>>>>> Kafka
>>>>>>>> Connect to export your data into an external store like MySQL or
>>>>>> MongoDB
>>>>>>>> instead of writing your own custom stores for Streams?
>>>>>>>> If your stores are local, why do you write custom stores? I am
>>> curious
>>>>>>>> to understand why RocksDB does not serve your needs.
>>>>>>>> About your two comment:
>>>>>>>> (1) Streams uses RocksDB by default and the default implementation
>>> is
>>>>>>>> using "checkpoint files" in next release. Those checkpoint files
>>> track
>>>>>>>> the changelog offsets of the data that got flushed to disc. This
>>> allows
>>>>>>>> to reduce the startup time, as only the tail of the changelog needs
>>> to
>>>>>>>> be read to bring the store up to date. For this, you would always
>>> (1)
>>>>>>>> write to the changelog, (2) write to you store. Each time you need
>>> to
>>>>>>>> flush, you know that all data is in the changelog already. After
>>> each
>>>>>>>> flush, you can update the "local offset checkpoint file".
>>>>>>>> I guess, if you use local stores you can apply a similar pattern in
>>> you
>>>>>>>> custom store implementation. (And as mentioned above, for global
>>> remote
>>>>>>>> store you would not need the changelog anyway. -- This also applies
>>> to
>>>>>>>> your recovery question from below.)
>>>>>>>> (2) You can configure standby task (via StreamConfig
>>>>>>>> "num.standby.replicas"). This will set up standby tasks that
>>> passively
>>>>>>>> replicate your stores to another instance. In error case, state
>>> will be
>>>>>>>> migrated to those "hot standbys" reducing recovery time
>>> significantly.
>>>>>>>> About your question:
>>>>>>>> (1) Yes.
>>>>>>>> (2) Partly parallel (ie, if you run with multiple threads -- cf.
>>>>>>>> StreamsConfig "num.streams.thread"). Each thread, flushes all it's
>>>>>>>> stores sequentially.
>>>>>>>> (3) Yes. There will be a store for each partition. (If store is
>>> local.)
>>>>>>>> (4) Yes. The overall processing loop is sequential (cf.
>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture
>>>>>>>> )
>>>>>>>> Also, the next commit point is computed after a successful commit --
>>>>>>>> thus, if one commit is delayed, all consecutive commit points are
>>>>>>>> "shifted" by this delay.
>>>>>>>> -Matthias
>>>>>>>> On 5/12/17 9:00 AM, João Peixoto wrote:
>>>>>>>>> On a stream definition I perform an "aggregate" which is configured
>>>>>> with
>>>>>>>> a
>>>>>>>>> state store.
>>>>>>>>> *Goal*: Persist the aggregation results into a database, e.g.
>>> MySQL or
>>>>>>>>> MongoDB
>>>>>>>>> *Working approach*:
>>>>>>>>> I have created a custom StateStore backed by a changelog topic like
>>>>>> the
>>>>>>>>> builtin state stores. Whenever the store gets flushed I save to the
>>>>>>>>> database, mark the record as being persisted and log the change in
>>> the
>>>>>>>>> changelog.
>>>>>>>>> If something goes wrong during processing, the changelog guarantees
>>>>>> that
>>>>>>>> I
>>>>>>>>> do not lose data, restores the state and if some data point was not
>>>>>>>>> persisted, the next stream instance will persist it on its flush
>>>>>>>> operation.
>>>>>>>>> 1. I cannot store too much data in the changelog, even with
>>>>>> compaction,
>>>>>>>> if
>>>>>>>>> I have too much data, bootstrapping a stream instance would take a
>>>>>> long
>>>>>>>> time
>>>>>>>>> 2. On the other hand, if I take too long to recover from a
>>> failure, I
>>>>>> may
>>>>>>>>> lose data. So there seems to be a delicate tradeoff here
>>>>>>>>> *Questions*:
>>>>>>>>> 1. Is this a reasonable use case?
>>>>>>>>> 2. In a scenario where my stream would have a fanout (multiple
>>>>>>>> sub-streams
>>>>>>>>> based on the same stream), each branch would perform different
>>>>>>>> "aggregate"
>>>>>>>>> operations, each with its own state store. Are state stores
>>> flushed in
>>>>>>>>> parallel or sequentially?
>>>>>>>>> 3. The above also applies per-partition. As a stream definition is
>>>>>>>>> parallelized by partition, will one instance hold different store
>>>>>>>> instances
>>>>>>>>> for each one?
>>>>>>>>> 4. Through synthetic sleeps I simulated slow flushes, slower than
>>> the
>>>>>>>>> commit interval. The stream seems to be ok with it and didn't
>>> throw, I
>>>>>>>>> assume the Kafka consumer does not poll more records until all of
>>> the
>>>>>>>>> previous poll's are committed, but I couldn't find documentation to
>>>>>> back
>>>>>>>>> this statement. Is there a timeout for "commit" operations?
>>>>>>>>> Sample code
>>>>>>>>> public class AggregateHolder {
>>>>>>>>>   private Long commonKey;
>>>>>>>>>   private List<Double> rawValues = new ArrayList<>();
>>>>>>>>>   private boolean persisted;
>>>>>>>>> // ...
>>>>>>>>> }
>>>>>>>>> And stream definition
>>>>>>>>> source.groupByKey(Serdes.String(), recordSerdes)
>>>>>>>>>             .aggregate(
>>>>>>>>>                     AggregateHolder::new,
>>>>>>>>>                     (aggKey, value, aggregate) ->
>>>>>>>>> aggregate.addValue(value.getValue()),
>>>>>>>>>                     new DemoStoreSupplier<>(/* ... */)
>>>>>>>>>             )
>>>>>>>>>             .foreach((key, agg) -> log.debug("Aggregate: {}={}
>>> ({})",
>>>>>>>>> key, agg.getAverage(), agg));

Reply via email to