Follow up doubt (let me know if a new question should be created). Do we always need a repartitioning topic?
If I'm reading things correctly when we change the key of a record we need make sure the new key falls on the same partition that we are processing. This makes a lot of sense if after such change we'd need to join on some other stream/table or cases where we sink to a topic. However, in cases where none of these things, the repartition topic does nothing? If this is true can we somehow not create it? On Sun, May 14, 2017 at 7:58 PM João Peixoto <joao.harti...@gmail.com> wrote: > Very useful links, thank you. > > Part of my original misunderstanding was that the at-least-once guarantee > was considered fulfilled if the record reached a sink node. > > Thanks for all the feedback, you may consider my question answered. > Feel free to ask further questions about the use case if found interesting. > On Sun, May 14, 2017 at 4:31 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Yes. >> >> It is basically "documented", as Streams guarantees at-least-once >> semantics. Thus, we make sure, you will not loose any data in case of >> failure. (ie, the overall guarantee is documented) >> >> To achieve this, we always flush before we commit offsets. (This is not >> explicitly documented as it's an implementation detail.) >> >> There is some doc's in the wiki: >> >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Internal+Data+Management#KafkaStreamsInternalDataManagement-Commits >> >> This might also help in case you want to dig into the code: >> >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture >> >> >> -Matthias >> >> On 5/14/17 4:07 PM, João Peixoto wrote: >> > I think I now understand what Matthias meant when he said "If you use a >> > global remote store, you would not need to back your changes in a >> changelog >> > topic, as the store would not be lost if case of failure". >> > >> > I had the misconception that if a state store threw an exception during >> > "flush", all messages received between now and the previous flush would >> be >> > "lost", hence the need for a changelog topic. However, it seems that the >> > "repartition" topic actually solves this problem. >> > >> > There's very little information about the latter, at least that I could >> > find, but an entry seems to be added whenever a record enters the >> > "aggregate", but the state store "consumer" of this topic only updates >> its >> > offset after the flush completes, meaning that the repartition topic >> will >> > be replayed! It seems this problem is already solved for me, I'd >> appreciate >> > if someone could point me to the documentation or code that backs up the >> > above. >> > >> > >> > On Sat, May 13, 2017 at 3:11 PM João Peixoto <joao.harti...@gmail.com> >> > wrote: >> > >> >> Replies in line as well >> >> >> >> >> >> On Sat, May 13, 2017 at 3:25 AM Eno Thereska <eno.there...@gmail.com> >> >> wrote: >> >> >> >>> Hi João, >> >>> >> >>> Some answers inline: >> >>> >> >>>> On 12 May 2017, at 18:27, João Peixoto <joao.harti...@gmail.com> >> wrote: >> >>>> >> >>>> Thanks for the comments, here are some clarifications: >> >>>> >> >>>> I did look at interactive queries, if I understood them correctly it >> >>> means >> >>>> that my state store must hold all the results in order for it to be >> >>>> queried, either in memory or through disk (RocksDB). >> >>> >> >>> Yes, that's correct. >> >>> >> >>> >> >>>> 1. The retention policy on my aggregate operations, in my case, is 90 >> >>> days, >> >>>> which is way too much data to hold in memory >> >>> >> >>> It will depend on how much data/memory you have, but perhaps it could >> be >> >>> too much to hold in memory >> >>> for that long (especially because some failure is bound to happen in >> 90 >> >>> days) >> >>> >> >>>> 2. My stream instances do no have access to disk, even if they did, >> >>>> wouldn't it mean I'd need almost twice the disk space to hold the >> same >> >>>> data? I.e. kafka brokers golding the topics + RocksDB holding the >> state? >> >>> >> >>> That's interesting, is there a reason why the streams instances don't >> >>> have access to a local file system? I'm curious >> >>> what kind of deployment you have. >> >>> >> >> >> >> My instances are deployed on Kubernetes >> >> >> >> >> >>> >> >>> It is true that twice the disk space is needed to hold the data on >> >>> RocksDb as well as in the Kafka changelog topic, >> >>> however that is no different that the current situation where the >> data is >> >>> stored to a remote database right? >> >>> I understand your point that you might not have access to local disk >> >>> though. >> >>> >> >> >> >> Not quite. In my scenario the state store would be backed by the >> changelog >> >> AND MongoDB. On bootstrap we would fetch the state stored in the >> changelog >> >> into a structure like a LoadingCache (from Guava's), which would load >> >> additional fields from MongoDB if needed. Therefore the changelog could >> >> hold say 12 hours of records and 90 days is stored in MongoDB only. >> That's >> >> exactly the whole strategy I'm trying to validate. The implications >> would >> >> be that in case of failure we'd need to recover within 12 hours or else >> >> we'd need to replay from the source topics. >> >> >> >> >> >>> >> >>> >> >>>> 3. Because a crash may happen between an entry is added to the >> changelog >> >>>> and the data store is flushed, I need to get all the changes >> everytime >> >>> if I >> >>>> want to guarantee that all data is eventually persisted. This is why >> >>>> checkpoint files may not work for me. >> >>> >> >>> The upcoming exactly-once support in 0.11 will help with these kind of >> >>> guarantees. >> >>> >> >> >> >> Not sure I understand how exactly-once would help in this case. >> >> >> >> >> >>> >> >>>> >> >>>> Standby tasks looks great, I forgot about those. >> >>>> >> >>>> I'm at the design phase so this is all tentative. Answering Matthias >> >>>> questions >> >>>> >> >>>> My state stores are local. As mentioned above I do not have access to >> >>> disk >> >>>> therefore I need to recover all data from somewhere, in this case I'm >> >>>> thinking about the changelog. >> >>> >> >>> So this is where I get confused a bit, since you mention that your >> state >> >>> stores are "local", i.e., the streams instance >> >>> does have access to a local file system. >> >>> >> >> >> >> When I said "local" I meant that the state stores are partial for each >> >> instance, i.e. they only have the partitions the task is responsible >> for >> >> (normal behavior) rather than a global store. >> >> >> >> >> >>> >> >>>> I read about Kafka Connect but have never used it, maybe that'll >> >>> simplify >> >>>> things, but I need to do some studying there. >> >>>> >> >>>> The reason why even though my stores are local but still I want to >> store >> >>>> them on a database and not use straight up RocksDB (or global >> stores) is >> >>>> because this would allow me to migrate my current processing >> pipeline to >> >>>> Kafka Streams without needing to change the frontend part of the >> >>>> application, which fetches data from MongoDB. >> >>> >> >>> Makes sense. >> >>> >> >>>> >> >>>> PS When you mention Global State Stores I'm thinking of >> >>>> >> >>> >> http://docs.confluent.io/3.2.0/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application >> >>> , >> >>>> is this correct? >> >>> >> >>> No I think Matthias is saying that if you have a remote server >> somewhere >> >>> where you store all your data (like a shared file system). >> >>> This is not something Kafka would provide. >> >>> >> >>> Eno >> >>> >> >>>> >> >>>> >> >>>> On Fri, May 12, 2017 at 10:02 AM Matthias J. Sax < >> matth...@confluent.io >> >>>> >> >>>> wrote: >> >>>> >> >>>>> Hi, >> >>>>> >> >>>>> I am not sure about your overall setup. Are your stores local >> (similar >> >>>>> to RocksDB) or are you using one global remote store? If you use a >> >>>>> global remote store, you would not need to back your changes in a >> >>>>> changelog topic, as the store would not be lost if case of failure. >> >>>>> >> >>>>> Also (in case that your stores are remote), did you consider using >> >>> Kafka >> >>>>> Connect to export your data into an external store like MySQL or >> >>> MongoDB >> >>>>> instead of writing your own custom stores for Streams? >> >>>>> >> >>>>> If your stores are local, why do you write custom stores? I am >> curious >> >>>>> to understand why RocksDB does not serve your needs. >> >>>>> >> >>>>> >> >>>>> About your two comment: >> >>>>> >> >>>>> (1) Streams uses RocksDB by default and the default implementation >> is >> >>>>> using "checkpoint files" in next release. Those checkpoint files >> track >> >>>>> the changelog offsets of the data that got flushed to disc. This >> allows >> >>>>> to reduce the startup time, as only the tail of the changelog needs >> to >> >>>>> be read to bring the store up to date. For this, you would always >> (1) >> >>>>> write to the changelog, (2) write to you store. Each time you need >> to >> >>>>> flush, you know that all data is in the changelog already. After >> each >> >>>>> flush, you can update the "local offset checkpoint file". >> >>>>> >> >>>>> I guess, if you use local stores you can apply a similar pattern in >> you >> >>>>> custom store implementation. (And as mentioned above, for global >> remote >> >>>>> store you would not need the changelog anyway. -- This also applies >> to >> >>>>> your recovery question from below.) >> >>>>> >> >>>>> (2) You can configure standby task (via StreamConfig >> >>>>> "num.standby.replicas"). This will set up standby tasks that >> passively >> >>>>> replicate your stores to another instance. In error case, state >> will be >> >>>>> migrated to those "hot standbys" reducing recovery time >> significantly. >> >>>>> >> >>>>> >> >>>>> About your question: >> >>>>> >> >>>>> (1) Yes. >> >>>>> (2) Partly parallel (ie, if you run with multiple threads -- cf. >> >>>>> StreamsConfig "num.streams.thread"). Each thread, flushes all it's >> >>>>> stores sequentially. >> >>>>> (3) Yes. There will be a store for each partition. (If store is >> local.) >> >>>>> (4) Yes. The overall processing loop is sequential (cf. >> >>>>> >> >>>>> >> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture >> >>>>> ) >> >>>>> Also, the next commit point is computed after a successful commit -- >> >>>>> thus, if one commit is delayed, all consecutive commit points are >> >>>>> "shifted" by this delay. >> >>>>> >> >>>>> >> >>>>> -Matthias >> >>>>> >> >>>>> >> >>>>> On 5/12/17 9:00 AM, João Peixoto wrote: >> >>>>>> On a stream definition I perform an "aggregate" which is configured >> >>> with >> >>>>> a >> >>>>>> state store. >> >>>>>> >> >>>>>> *Goal*: Persist the aggregation results into a database, e.g. >> MySQL or >> >>>>>> MongoDB >> >>>>>> >> >>>>>> *Working approach*: >> >>>>>> I have created a custom StateStore backed by a changelog topic like >> >>> the >> >>>>>> builtin state stores. Whenever the store gets flushed I save to the >> >>>>>> database, mark the record as being persisted and log the change in >> the >> >>>>>> changelog. >> >>>>>> >> >>>>>> If something goes wrong during processing, the changelog guarantees >> >>> that >> >>>>> I >> >>>>>> do not lose data, restores the state and if some data point was not >> >>>>>> persisted, the next stream instance will persist it on its flush >> >>>>> operation. >> >>>>>> >> >>>>>> 1. I cannot store too much data in the changelog, even with >> >>> compaction, >> >>>>> if >> >>>>>> I have too much data, bootstrapping a stream instance would take a >> >>> long >> >>>>> time >> >>>>>> 2. On the other hand, if I take too long to recover from a >> failure, I >> >>> may >> >>>>>> lose data. So there seems to be a delicate tradeoff here >> >>>>>> >> >>>>>> *Questions*: >> >>>>>> >> >>>>>> 1. Is this a reasonable use case? >> >>>>>> 2. In a scenario where my stream would have a fanout (multiple >> >>>>> sub-streams >> >>>>>> based on the same stream), each branch would perform different >> >>>>> "aggregate" >> >>>>>> operations, each with its own state store. Are state stores >> flushed in >> >>>>>> parallel or sequentially? >> >>>>>> 3. The above also applies per-partition. As a stream definition is >> >>>>>> parallelized by partition, will one instance hold different store >> >>>>> instances >> >>>>>> for each one? >> >>>>>> 4. Through synthetic sleeps I simulated slow flushes, slower than >> the >> >>>>>> commit interval. The stream seems to be ok with it and didn't >> throw, I >> >>>>>> assume the Kafka consumer does not poll more records until all of >> the >> >>>>>> previous poll's are committed, but I couldn't find documentation to >> >>> back >> >>>>>> this statement. Is there a timeout for "commit" operations? >> >>>>>> >> >>>>>> >> >>>>>> Sample code >> >>>>>> >> >>>>>> public class AggregateHolder { >> >>>>>> >> >>>>>> private Long commonKey; >> >>>>>> private List<Double> rawValues = new ArrayList<>(); >> >>>>>> private boolean persisted; >> >>>>>> >> >>>>>> // ... >> >>>>>> } >> >>>>>> >> >>>>>> And stream definition >> >>>>>> >> >>>>>> source.groupByKey(Serdes.String(), recordSerdes) >> >>>>>> .aggregate( >> >>>>>> AggregateHolder::new, >> >>>>>> (aggKey, value, aggregate) -> >> >>>>>> aggregate.addValue(value.getValue()), >> >>>>>> new DemoStoreSupplier<>(/* ... */) >> >>>>>> ) >> >>>>>> .foreach((key, agg) -> log.debug("Aggregate: {}={} >> ({})", >> >>>>>> key, agg.getAverage(), agg)); >> >>>>>> >> >>>>> >> >>>>> >> >>> >> >>> >> > >> >>