(it's preferred to create another email thread for a different topic to make it easier to look back)
Yes, there could be room for optimizations, e.g., see this: http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3cCAJikTEUHR=r0ika6vlf_y+qajxg8f_q19og_-s+q-gozpqb...@mail.gmail.com%3e <http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3CCAJikTEUHR=r0ika6vlf_y+qajxg8f_q19og_-s+q-gozpqb...@mail.gmail.com%3E> Eno > On 16 May 2017, at 16:01, João Peixoto <joao.harti...@gmail.com> wrote: > > Follow up doubt (let me know if a new question should be created). > > Do we always need a repartitioning topic? > > If I'm reading things correctly when we change the key of a record we need > make sure the new key falls on the same partition that we are processing. > This makes a lot of sense if after such change we'd need to join on some > other stream/table or cases where we sink to a topic. > However, in cases where none of these things, the repartition topic does > nothing? If this is true can we somehow not create it? > > On Sun, May 14, 2017 at 7:58 PM João Peixoto <joao.harti...@gmail.com> > wrote: > >> Very useful links, thank you. >> >> Part of my original misunderstanding was that the at-least-once guarantee >> was considered fulfilled if the record reached a sink node. >> >> Thanks for all the feedback, you may consider my question answered. >> Feel free to ask further questions about the use case if found interesting. >> On Sun, May 14, 2017 at 4:31 PM Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Yes. >>> >>> It is basically "documented", as Streams guarantees at-least-once >>> semantics. Thus, we make sure, you will not loose any data in case of >>> failure. (ie, the overall guarantee is documented) >>> >>> To achieve this, we always flush before we commit offsets. (This is not >>> explicitly documented as it's an implementation detail.) >>> >>> There is some doc's in the wiki: >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Internal+Data+Management#KafkaStreamsInternalDataManagement-Commits >>> >>> This might also help in case you want to dig into the code: >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture >>> >>> >>> -Matthias >>> >>> On 5/14/17 4:07 PM, João Peixoto wrote: >>>> I think I now understand what Matthias meant when he said "If you use a >>>> global remote store, you would not need to back your changes in a >>> changelog >>>> topic, as the store would not be lost if case of failure". >>>> >>>> I had the misconception that if a state store threw an exception during >>>> "flush", all messages received between now and the previous flush would >>> be >>>> "lost", hence the need for a changelog topic. However, it seems that the >>>> "repartition" topic actually solves this problem. >>>> >>>> There's very little information about the latter, at least that I could >>>> find, but an entry seems to be added whenever a record enters the >>>> "aggregate", but the state store "consumer" of this topic only updates >>> its >>>> offset after the flush completes, meaning that the repartition topic >>> will >>>> be replayed! It seems this problem is already solved for me, I'd >>> appreciate >>>> if someone could point me to the documentation or code that backs up the >>>> above. >>>> >>>> >>>> On Sat, May 13, 2017 at 3:11 PM João Peixoto <joao.harti...@gmail.com> >>>> wrote: >>>> >>>>> Replies in line as well >>>>> >>>>> >>>>> On Sat, May 13, 2017 at 3:25 AM Eno Thereska <eno.there...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi João, >>>>>> >>>>>> Some answers inline: >>>>>> >>>>>>> On 12 May 2017, at 18:27, João Peixoto <joao.harti...@gmail.com> >>> wrote: >>>>>>> >>>>>>> Thanks for the comments, here are some clarifications: >>>>>>> >>>>>>> I did look at interactive queries, if I understood them correctly it >>>>>> means >>>>>>> that my state store must hold all the results in order for it to be >>>>>>> queried, either in memory or through disk (RocksDB). >>>>>> >>>>>> Yes, that's correct. >>>>>> >>>>>> >>>>>>> 1. The retention policy on my aggregate operations, in my case, is 90 >>>>>> days, >>>>>>> which is way too much data to hold in memory >>>>>> >>>>>> It will depend on how much data/memory you have, but perhaps it could >>> be >>>>>> too much to hold in memory >>>>>> for that long (especially because some failure is bound to happen in >>> 90 >>>>>> days) >>>>>> >>>>>>> 2. My stream instances do no have access to disk, even if they did, >>>>>>> wouldn't it mean I'd need almost twice the disk space to hold the >>> same >>>>>>> data? I.e. kafka brokers golding the topics + RocksDB holding the >>> state? >>>>>> >>>>>> That's interesting, is there a reason why the streams instances don't >>>>>> have access to a local file system? I'm curious >>>>>> what kind of deployment you have. >>>>>> >>>>> >>>>> My instances are deployed on Kubernetes >>>>> >>>>> >>>>>> >>>>>> It is true that twice the disk space is needed to hold the data on >>>>>> RocksDb as well as in the Kafka changelog topic, >>>>>> however that is no different that the current situation where the >>> data is >>>>>> stored to a remote database right? >>>>>> I understand your point that you might not have access to local disk >>>>>> though. >>>>>> >>>>> >>>>> Not quite. In my scenario the state store would be backed by the >>> changelog >>>>> AND MongoDB. On bootstrap we would fetch the state stored in the >>> changelog >>>>> into a structure like a LoadingCache (from Guava's), which would load >>>>> additional fields from MongoDB if needed. Therefore the changelog could >>>>> hold say 12 hours of records and 90 days is stored in MongoDB only. >>> That's >>>>> exactly the whole strategy I'm trying to validate. The implications >>> would >>>>> be that in case of failure we'd need to recover within 12 hours or else >>>>> we'd need to replay from the source topics. >>>>> >>>>> >>>>>> >>>>>> >>>>>>> 3. Because a crash may happen between an entry is added to the >>> changelog >>>>>>> and the data store is flushed, I need to get all the changes >>> everytime >>>>>> if I >>>>>>> want to guarantee that all data is eventually persisted. This is why >>>>>>> checkpoint files may not work for me. >>>>>> >>>>>> The upcoming exactly-once support in 0.11 will help with these kind of >>>>>> guarantees. >>>>>> >>>>> >>>>> Not sure I understand how exactly-once would help in this case. >>>>> >>>>> >>>>>> >>>>>>> >>>>>>> Standby tasks looks great, I forgot about those. >>>>>>> >>>>>>> I'm at the design phase so this is all tentative. Answering Matthias >>>>>>> questions >>>>>>> >>>>>>> My state stores are local. As mentioned above I do not have access to >>>>>> disk >>>>>>> therefore I need to recover all data from somewhere, in this case I'm >>>>>>> thinking about the changelog. >>>>>> >>>>>> So this is where I get confused a bit, since you mention that your >>> state >>>>>> stores are "local", i.e., the streams instance >>>>>> does have access to a local file system. >>>>>> >>>>> >>>>> When I said "local" I meant that the state stores are partial for each >>>>> instance, i.e. they only have the partitions the task is responsible >>> for >>>>> (normal behavior) rather than a global store. >>>>> >>>>> >>>>>> >>>>>>> I read about Kafka Connect but have never used it, maybe that'll >>>>>> simplify >>>>>>> things, but I need to do some studying there. >>>>>>> >>>>>>> The reason why even though my stores are local but still I want to >>> store >>>>>>> them on a database and not use straight up RocksDB (or global >>> stores) is >>>>>>> because this would allow me to migrate my current processing >>> pipeline to >>>>>>> Kafka Streams without needing to change the frontend part of the >>>>>>> application, which fetches data from MongoDB. >>>>>> >>>>>> Makes sense. >>>>>> >>>>>>> >>>>>>> PS When you mention Global State Stores I'm thinking of >>>>>>> >>>>>> >>> http://docs.confluent.io/3.2.0/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application >>>>>> , >>>>>>> is this correct? >>>>>> >>>>>> No I think Matthias is saying that if you have a remote server >>> somewhere >>>>>> where you store all your data (like a shared file system). >>>>>> This is not something Kafka would provide. >>>>>> >>>>>> Eno >>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, May 12, 2017 at 10:02 AM Matthias J. Sax < >>> matth...@confluent.io >>>>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I am not sure about your overall setup. Are your stores local >>> (similar >>>>>>>> to RocksDB) or are you using one global remote store? If you use a >>>>>>>> global remote store, you would not need to back your changes in a >>>>>>>> changelog topic, as the store would not be lost if case of failure. >>>>>>>> >>>>>>>> Also (in case that your stores are remote), did you consider using >>>>>> Kafka >>>>>>>> Connect to export your data into an external store like MySQL or >>>>>> MongoDB >>>>>>>> instead of writing your own custom stores for Streams? >>>>>>>> >>>>>>>> If your stores are local, why do you write custom stores? I am >>> curious >>>>>>>> to understand why RocksDB does not serve your needs. >>>>>>>> >>>>>>>> >>>>>>>> About your two comment: >>>>>>>> >>>>>>>> (1) Streams uses RocksDB by default and the default implementation >>> is >>>>>>>> using "checkpoint files" in next release. Those checkpoint files >>> track >>>>>>>> the changelog offsets of the data that got flushed to disc. This >>> allows >>>>>>>> to reduce the startup time, as only the tail of the changelog needs >>> to >>>>>>>> be read to bring the store up to date. For this, you would always >>> (1) >>>>>>>> write to the changelog, (2) write to you store. Each time you need >>> to >>>>>>>> flush, you know that all data is in the changelog already. After >>> each >>>>>>>> flush, you can update the "local offset checkpoint file". >>>>>>>> >>>>>>>> I guess, if you use local stores you can apply a similar pattern in >>> you >>>>>>>> custom store implementation. (And as mentioned above, for global >>> remote >>>>>>>> store you would not need the changelog anyway. -- This also applies >>> to >>>>>>>> your recovery question from below.) >>>>>>>> >>>>>>>> (2) You can configure standby task (via StreamConfig >>>>>>>> "num.standby.replicas"). This will set up standby tasks that >>> passively >>>>>>>> replicate your stores to another instance. In error case, state >>> will be >>>>>>>> migrated to those "hot standbys" reducing recovery time >>> significantly. >>>>>>>> >>>>>>>> >>>>>>>> About your question: >>>>>>>> >>>>>>>> (1) Yes. >>>>>>>> (2) Partly parallel (ie, if you run with multiple threads -- cf. >>>>>>>> StreamsConfig "num.streams.thread"). Each thread, flushes all it's >>>>>>>> stores sequentially. >>>>>>>> (3) Yes. There will be a store for each partition. (If store is >>> local.) >>>>>>>> (4) Yes. The overall processing loop is sequential (cf. >>>>>>>> >>>>>>>> >>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture >>>>>>>> ) >>>>>>>> Also, the next commit point is computed after a successful commit -- >>>>>>>> thus, if one commit is delayed, all consecutive commit points are >>>>>>>> "shifted" by this delay. >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> On 5/12/17 9:00 AM, João Peixoto wrote: >>>>>>>>> On a stream definition I perform an "aggregate" which is configured >>>>>> with >>>>>>>> a >>>>>>>>> state store. >>>>>>>>> >>>>>>>>> *Goal*: Persist the aggregation results into a database, e.g. >>> MySQL or >>>>>>>>> MongoDB >>>>>>>>> >>>>>>>>> *Working approach*: >>>>>>>>> I have created a custom StateStore backed by a changelog topic like >>>>>> the >>>>>>>>> builtin state stores. Whenever the store gets flushed I save to the >>>>>>>>> database, mark the record as being persisted and log the change in >>> the >>>>>>>>> changelog. >>>>>>>>> >>>>>>>>> If something goes wrong during processing, the changelog guarantees >>>>>> that >>>>>>>> I >>>>>>>>> do not lose data, restores the state and if some data point was not >>>>>>>>> persisted, the next stream instance will persist it on its flush >>>>>>>> operation. >>>>>>>>> >>>>>>>>> 1. I cannot store too much data in the changelog, even with >>>>>> compaction, >>>>>>>> if >>>>>>>>> I have too much data, bootstrapping a stream instance would take a >>>>>> long >>>>>>>> time >>>>>>>>> 2. On the other hand, if I take too long to recover from a >>> failure, I >>>>>> may >>>>>>>>> lose data. So there seems to be a delicate tradeoff here >>>>>>>>> >>>>>>>>> *Questions*: >>>>>>>>> >>>>>>>>> 1. Is this a reasonable use case? >>>>>>>>> 2. In a scenario where my stream would have a fanout (multiple >>>>>>>> sub-streams >>>>>>>>> based on the same stream), each branch would perform different >>>>>>>> "aggregate" >>>>>>>>> operations, each with its own state store. Are state stores >>> flushed in >>>>>>>>> parallel or sequentially? >>>>>>>>> 3. The above also applies per-partition. As a stream definition is >>>>>>>>> parallelized by partition, will one instance hold different store >>>>>>>> instances >>>>>>>>> for each one? >>>>>>>>> 4. Through synthetic sleeps I simulated slow flushes, slower than >>> the >>>>>>>>> commit interval. The stream seems to be ok with it and didn't >>> throw, I >>>>>>>>> assume the Kafka consumer does not poll more records until all of >>> the >>>>>>>>> previous poll's are committed, but I couldn't find documentation to >>>>>> back >>>>>>>>> this statement. Is there a timeout for "commit" operations? >>>>>>>>> >>>>>>>>> >>>>>>>>> Sample code >>>>>>>>> >>>>>>>>> public class AggregateHolder { >>>>>>>>> >>>>>>>>> private Long commonKey; >>>>>>>>> private List<Double> rawValues = new ArrayList<>(); >>>>>>>>> private boolean persisted; >>>>>>>>> >>>>>>>>> // ... >>>>>>>>> } >>>>>>>>> >>>>>>>>> And stream definition >>>>>>>>> >>>>>>>>> source.groupByKey(Serdes.String(), recordSerdes) >>>>>>>>> .aggregate( >>>>>>>>> AggregateHolder::new, >>>>>>>>> (aggKey, value, aggregate) -> >>>>>>>>> aggregate.addValue(value.getValue()), >>>>>>>>> new DemoStoreSupplier<>(/* ... */) >>>>>>>>> ) >>>>>>>>> .foreach((key, agg) -> log.debug("Aggregate: {}={} >>> ({})", >>>>>>>>> key, agg.getAverage(), agg)); >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>> >>> >>>