Yes.

It is basically "documented", as Streams guarantees at-least-once
semantics. Thus, we make sure, you will not loose any data in case of
failure. (ie, the overall guarantee is documented)

To achieve this, we always flush before we commit offsets. (This is not
explicitly documented as it's an implementation detail.)

There is some doc's in the wiki:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Internal+Data+Management#KafkaStreamsInternalDataManagement-Commits

This might also help in case you want to dig into the code:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture


-Matthias

On 5/14/17 4:07 PM, João Peixoto wrote:
> I think I now understand what Matthias meant when he said "If you use a
> global remote store, you would not need to back your changes in a changelog
> topic, as the store would not be lost if case of failure".
> 
> I had the misconception that if a state store threw an exception during
> "flush", all messages received between now and the previous flush would be
> "lost", hence the need for a changelog topic. However, it seems that the
> "repartition" topic actually solves this problem.
> 
> There's very little information about the latter, at least that I could
> find, but an entry seems to be added whenever a record enters the
> "aggregate", but the state store "consumer" of this topic only updates its
> offset after the flush completes, meaning that the repartition topic will
> be replayed! It seems this problem is already solved for me, I'd appreciate
> if someone could point me to the documentation or code that backs up the
> above.
> 
> 
> On Sat, May 13, 2017 at 3:11 PM João Peixoto <joao.harti...@gmail.com>
> wrote:
> 
>> Replies in line as well
>>
>>
>> On Sat, May 13, 2017 at 3:25 AM Eno Thereska <eno.there...@gmail.com>
>> wrote:
>>
>>> Hi João,
>>>
>>> Some answers inline:
>>>
>>>> On 12 May 2017, at 18:27, João Peixoto <joao.harti...@gmail.com> wrote:
>>>>
>>>> Thanks for the comments, here are some clarifications:
>>>>
>>>> I did look at interactive queries, if I understood them correctly it
>>> means
>>>> that my state store must hold all the results in order for it to be
>>>> queried, either in memory or through disk (RocksDB).
>>>
>>> Yes, that's correct.
>>>
>>>
>>>> 1. The retention policy on my aggregate operations, in my case, is 90
>>> days,
>>>> which is way too much data to hold in memory
>>>
>>> It will depend on how much data/memory you have, but perhaps it could be
>>> too much to hold in memory
>>> for that long (especially because some failure is bound to happen in 90
>>> days)
>>>
>>>> 2. My stream instances do no have access to disk, even if they did,
>>>> wouldn't it mean I'd need almost twice the disk space to hold the same
>>>> data? I.e. kafka brokers golding the topics + RocksDB holding the state?
>>>
>>> That's interesting, is there a reason why the streams instances don't
>>> have access to a local file system? I'm curious
>>> what kind of deployment you have.
>>>
>>
>> My instances are deployed on Kubernetes
>>
>>
>>>
>>> It is true that twice the disk space is needed to hold the data on
>>> RocksDb as well as in the Kafka changelog topic,
>>> however that is no different that the current situation where the data is
>>> stored to a remote database right?
>>> I understand your point that you might not have access to local disk
>>> though.
>>>
>>
>> Not quite. In my scenario the state store would be backed by the changelog
>> AND MongoDB. On bootstrap we would fetch the state stored in the changelog
>> into a structure like a LoadingCache (from Guava's), which would load
>> additional fields from MongoDB if needed. Therefore the changelog could
>> hold say 12 hours of records and 90 days is stored in MongoDB only. That's
>> exactly the whole strategy I'm trying to validate. The implications would
>> be that in case of failure we'd need to recover within 12 hours or else
>> we'd need to replay from the source topics.
>>
>>
>>>
>>>
>>>> 3. Because a crash may happen between an entry is added to the changelog
>>>> and the data store is flushed, I need to get all the changes everytime
>>> if I
>>>> want to guarantee that all data is eventually persisted. This is why
>>>> checkpoint files may not work for me.
>>>
>>> The upcoming exactly-once support in 0.11 will help with these kind of
>>> guarantees.
>>>
>>
>> Not sure I understand how exactly-once would help in this case.
>>
>>
>>>
>>>>
>>>> Standby tasks looks great, I forgot about those.
>>>>
>>>> I'm at the design phase so this is all tentative. Answering Matthias
>>>> questions
>>>>
>>>> My state stores are local. As mentioned above I do not have access to
>>> disk
>>>> therefore I need to recover all data from somewhere, in this case I'm
>>>> thinking about the changelog.
>>>
>>> So this is where I get confused a bit, since you mention that your state
>>> stores are "local", i.e., the streams instance
>>> does have access to a local file system.
>>>
>>
>> When I said "local" I meant that the state stores are partial for each
>> instance, i.e. they only have the partitions the task is responsible for
>> (normal behavior) rather than a global store.
>>
>>
>>>
>>>> I read about Kafka Connect but have never used it, maybe that'll
>>> simplify
>>>> things, but I need to do some studying there.
>>>>
>>>> The reason why even though my stores are local but still I want to store
>>>> them on a database and not use straight up RocksDB (or global stores) is
>>>> because this would allow me to migrate my current processing pipeline to
>>>> Kafka Streams without needing to change the frontend part of the
>>>> application, which fetches data from MongoDB.
>>>
>>> Makes sense.
>>>
>>>>
>>>> PS When you mention Global State Stores I'm thinking of
>>>>
>>> http://docs.confluent.io/3.2.0/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application
>>> ,
>>>> is this correct?
>>>
>>> No I think Matthias is saying that if you have a remote server somewhere
>>> where you store all your data (like a shared file system).
>>> This is not something Kafka would provide.
>>>
>>> Eno
>>>
>>>>
>>>>
>>>> On Fri, May 12, 2017 at 10:02 AM Matthias J. Sax <matth...@confluent.io
>>>>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am not sure about your overall setup. Are your stores local (similar
>>>>> to RocksDB) or are you using one global remote store? If you use a
>>>>> global remote store, you would not need to back your changes in a
>>>>> changelog topic, as the store would not be lost if case of failure.
>>>>>
>>>>> Also (in case that your stores are remote), did you consider using
>>> Kafka
>>>>> Connect to export your data into an external store like MySQL or
>>> MongoDB
>>>>> instead of writing your own custom stores for Streams?
>>>>>
>>>>> If your stores are local, why do you write custom stores? I am curious
>>>>> to understand why RocksDB does not serve your needs.
>>>>>
>>>>>
>>>>> About your two comment:
>>>>>
>>>>> (1) Streams uses RocksDB by default and the default implementation is
>>>>> using "checkpoint files" in next release. Those checkpoint files track
>>>>> the changelog offsets of the data that got flushed to disc. This allows
>>>>> to reduce the startup time, as only the tail of the changelog needs to
>>>>> be read to bring the store up to date. For this, you would always (1)
>>>>> write to the changelog, (2) write to you store. Each time you need to
>>>>> flush, you know that all data is in the changelog already. After each
>>>>> flush, you can update the "local offset checkpoint file".
>>>>>
>>>>> I guess, if you use local stores you can apply a similar pattern in you
>>>>> custom store implementation. (And as mentioned above, for global remote
>>>>> store you would not need the changelog anyway. -- This also applies to
>>>>> your recovery question from below.)
>>>>>
>>>>> (2) You can configure standby task (via StreamConfig
>>>>> "num.standby.replicas"). This will set up standby tasks that passively
>>>>> replicate your stores to another instance. In error case, state will be
>>>>> migrated to those "hot standbys" reducing recovery time significantly.
>>>>>
>>>>>
>>>>> About your question:
>>>>>
>>>>> (1) Yes.
>>>>> (2) Partly parallel (ie, if you run with multiple threads -- cf.
>>>>> StreamsConfig "num.streams.thread"). Each thread, flushes all it's
>>>>> stores sequentially.
>>>>> (3) Yes. There will be a store for each partition. (If store is local.)
>>>>> (4) Yes. The overall processing loop is sequential (cf.
>>>>>
>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture
>>>>> )
>>>>> Also, the next commit point is computed after a successful commit --
>>>>> thus, if one commit is delayed, all consecutive commit points are
>>>>> "shifted" by this delay.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 5/12/17 9:00 AM, João Peixoto wrote:
>>>>>> On a stream definition I perform an "aggregate" which is configured
>>> with
>>>>> a
>>>>>> state store.
>>>>>>
>>>>>> *Goal*: Persist the aggregation results into a database, e.g. MySQL or
>>>>>> MongoDB
>>>>>>
>>>>>> *Working approach*:
>>>>>> I have created a custom StateStore backed by a changelog topic like
>>> the
>>>>>> builtin state stores. Whenever the store gets flushed I save to the
>>>>>> database, mark the record as being persisted and log the change in the
>>>>>> changelog.
>>>>>>
>>>>>> If something goes wrong during processing, the changelog guarantees
>>> that
>>>>> I
>>>>>> do not lose data, restores the state and if some data point was not
>>>>>> persisted, the next stream instance will persist it on its flush
>>>>> operation.
>>>>>>
>>>>>> 1. I cannot store too much data in the changelog, even with
>>> compaction,
>>>>> if
>>>>>> I have too much data, bootstrapping a stream instance would take a
>>> long
>>>>> time
>>>>>> 2. On the other hand, if I take too long to recover from a failure, I
>>> may
>>>>>> lose data. So there seems to be a delicate tradeoff here
>>>>>>
>>>>>> *Questions*:
>>>>>>
>>>>>> 1. Is this a reasonable use case?
>>>>>> 2. In a scenario where my stream would have a fanout (multiple
>>>>> sub-streams
>>>>>> based on the same stream), each branch would perform different
>>>>> "aggregate"
>>>>>> operations, each with its own state store. Are state stores flushed in
>>>>>> parallel or sequentially?
>>>>>> 3. The above also applies per-partition. As a stream definition is
>>>>>> parallelized by partition, will one instance hold different store
>>>>> instances
>>>>>> for each one?
>>>>>> 4. Through synthetic sleeps I simulated slow flushes, slower than the
>>>>>> commit interval. The stream seems to be ok with it and didn't throw, I
>>>>>> assume the Kafka consumer does not poll more records until all of the
>>>>>> previous poll's are committed, but I couldn't find documentation to
>>> back
>>>>>> this statement. Is there a timeout for "commit" operations?
>>>>>>
>>>>>>
>>>>>> Sample code
>>>>>>
>>>>>> public class AggregateHolder {
>>>>>>
>>>>>>    private Long commonKey;
>>>>>>    private List<Double> rawValues = new ArrayList<>();
>>>>>>    private boolean persisted;
>>>>>>
>>>>>> // ...
>>>>>> }
>>>>>>
>>>>>> And stream definition
>>>>>>
>>>>>> source.groupByKey(Serdes.String(), recordSerdes)
>>>>>>              .aggregate(
>>>>>>                      AggregateHolder::new,
>>>>>>                      (aggKey, value, aggregate) ->
>>>>>> aggregate.addValue(value.getValue()),
>>>>>>                      new DemoStoreSupplier<>(/* ... */)
>>>>>>              )
>>>>>>              .foreach((key, agg) -> log.debug("Aggregate: {}={} ({})",
>>>>>> key, agg.getAverage(), agg));
>>>>>>
>>>>>
>>>>>
>>>
>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to