Just to add to this: Streams create re-partitioning topics "lazy", meaning when the key is (potentially) changes, we only set a flag but don't add the re-partitioning topic. On groupByKey() or join() we check is this "re-partitioning required flag" is set in add the topic.
Thus, if you have a stream.map().map().selectKey().groupBy() we only re-partition on groupBy, but not after the first two map() steps. -Matthias On 5/16/17 8:18 AM, Eno Thereska wrote: > (it's preferred to create another email thread for a different topic to make > it easier to look back) > > Yes, there could be room for optimizations, e.g., see this: > http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3cCAJikTEUHR=r0ika6vlf_y+qajxg8f_q19og_-s+q-gozpqb...@mail.gmail.com%3e > > <http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3CCAJikTEUHR=r0ika6vlf_y+qajxg8f_q19og_-s+q-gozpqb...@mail.gmail.com%3E> > > Eno > >> On 16 May 2017, at 16:01, João Peixoto <joao.harti...@gmail.com> wrote: >> >> Follow up doubt (let me know if a new question should be created). >> >> Do we always need a repartitioning topic? >> >> If I'm reading things correctly when we change the key of a record we need >> make sure the new key falls on the same partition that we are processing. >> This makes a lot of sense if after such change we'd need to join on some >> other stream/table or cases where we sink to a topic. >> However, in cases where none of these things, the repartition topic does >> nothing? If this is true can we somehow not create it? >> >> On Sun, May 14, 2017 at 7:58 PM João Peixoto <joao.harti...@gmail.com> >> wrote: >> >>> Very useful links, thank you. >>> >>> Part of my original misunderstanding was that the at-least-once guarantee >>> was considered fulfilled if the record reached a sink node. >>> >>> Thanks for all the feedback, you may consider my question answered. >>> Feel free to ask further questions about the use case if found interesting. >>> On Sun, May 14, 2017 at 4:31 PM Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> Yes. >>>> >>>> It is basically "documented", as Streams guarantees at-least-once >>>> semantics. Thus, we make sure, you will not loose any data in case of >>>> failure. (ie, the overall guarantee is documented) >>>> >>>> To achieve this, we always flush before we commit offsets. (This is not >>>> explicitly documented as it's an implementation detail.) >>>> >>>> There is some doc's in the wiki: >>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Internal+Data+Management#KafkaStreamsInternalDataManagement-Commits >>>> >>>> This might also help in case you want to dig into the code: >>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture >>>> >>>> >>>> -Matthias >>>> >>>> On 5/14/17 4:07 PM, João Peixoto wrote: >>>>> I think I now understand what Matthias meant when he said "If you use a >>>>> global remote store, you would not need to back your changes in a >>>> changelog >>>>> topic, as the store would not be lost if case of failure". >>>>> >>>>> I had the misconception that if a state store threw an exception during >>>>> "flush", all messages received between now and the previous flush would >>>> be >>>>> "lost", hence the need for a changelog topic. However, it seems that the >>>>> "repartition" topic actually solves this problem. >>>>> >>>>> There's very little information about the latter, at least that I could >>>>> find, but an entry seems to be added whenever a record enters the >>>>> "aggregate", but the state store "consumer" of this topic only updates >>>> its >>>>> offset after the flush completes, meaning that the repartition topic >>>> will >>>>> be replayed! It seems this problem is already solved for me, I'd >>>> appreciate >>>>> if someone could point me to the documentation or code that backs up the >>>>> above. >>>>> >>>>> >>>>> On Sat, May 13, 2017 at 3:11 PM João Peixoto <joao.harti...@gmail.com> >>>>> wrote: >>>>> >>>>>> Replies in line as well >>>>>> >>>>>> >>>>>> On Sat, May 13, 2017 at 3:25 AM Eno Thereska <eno.there...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi João, >>>>>>> >>>>>>> Some answers inline: >>>>>>> >>>>>>>> On 12 May 2017, at 18:27, João Peixoto <joao.harti...@gmail.com> >>>> wrote: >>>>>>>> >>>>>>>> Thanks for the comments, here are some clarifications: >>>>>>>> >>>>>>>> I did look at interactive queries, if I understood them correctly it >>>>>>> means >>>>>>>> that my state store must hold all the results in order for it to be >>>>>>>> queried, either in memory or through disk (RocksDB). >>>>>>> >>>>>>> Yes, that's correct. >>>>>>> >>>>>>> >>>>>>>> 1. The retention policy on my aggregate operations, in my case, is 90 >>>>>>> days, >>>>>>>> which is way too much data to hold in memory >>>>>>> >>>>>>> It will depend on how much data/memory you have, but perhaps it could >>>> be >>>>>>> too much to hold in memory >>>>>>> for that long (especially because some failure is bound to happen in >>>> 90 >>>>>>> days) >>>>>>> >>>>>>>> 2. My stream instances do no have access to disk, even if they did, >>>>>>>> wouldn't it mean I'd need almost twice the disk space to hold the >>>> same >>>>>>>> data? I.e. kafka brokers golding the topics + RocksDB holding the >>>> state? >>>>>>> >>>>>>> That's interesting, is there a reason why the streams instances don't >>>>>>> have access to a local file system? I'm curious >>>>>>> what kind of deployment you have. >>>>>>> >>>>>> >>>>>> My instances are deployed on Kubernetes >>>>>> >>>>>> >>>>>>> >>>>>>> It is true that twice the disk space is needed to hold the data on >>>>>>> RocksDb as well as in the Kafka changelog topic, >>>>>>> however that is no different that the current situation where the >>>> data is >>>>>>> stored to a remote database right? >>>>>>> I understand your point that you might not have access to local disk >>>>>>> though. >>>>>>> >>>>>> >>>>>> Not quite. In my scenario the state store would be backed by the >>>> changelog >>>>>> AND MongoDB. On bootstrap we would fetch the state stored in the >>>> changelog >>>>>> into a structure like a LoadingCache (from Guava's), which would load >>>>>> additional fields from MongoDB if needed. Therefore the changelog could >>>>>> hold say 12 hours of records and 90 days is stored in MongoDB only. >>>> That's >>>>>> exactly the whole strategy I'm trying to validate. The implications >>>> would >>>>>> be that in case of failure we'd need to recover within 12 hours or else >>>>>> we'd need to replay from the source topics. >>>>>> >>>>>> >>>>>>> >>>>>>> >>>>>>>> 3. Because a crash may happen between an entry is added to the >>>> changelog >>>>>>>> and the data store is flushed, I need to get all the changes >>>> everytime >>>>>>> if I >>>>>>>> want to guarantee that all data is eventually persisted. This is why >>>>>>>> checkpoint files may not work for me. >>>>>>> >>>>>>> The upcoming exactly-once support in 0.11 will help with these kind of >>>>>>> guarantees. >>>>>>> >>>>>> >>>>>> Not sure I understand how exactly-once would help in this case. >>>>>> >>>>>> >>>>>>> >>>>>>>> >>>>>>>> Standby tasks looks great, I forgot about those. >>>>>>>> >>>>>>>> I'm at the design phase so this is all tentative. Answering Matthias >>>>>>>> questions >>>>>>>> >>>>>>>> My state stores are local. As mentioned above I do not have access to >>>>>>> disk >>>>>>>> therefore I need to recover all data from somewhere, in this case I'm >>>>>>>> thinking about the changelog. >>>>>>> >>>>>>> So this is where I get confused a bit, since you mention that your >>>> state >>>>>>> stores are "local", i.e., the streams instance >>>>>>> does have access to a local file system. >>>>>>> >>>>>> >>>>>> When I said "local" I meant that the state stores are partial for each >>>>>> instance, i.e. they only have the partitions the task is responsible >>>> for >>>>>> (normal behavior) rather than a global store. >>>>>> >>>>>> >>>>>>> >>>>>>>> I read about Kafka Connect but have never used it, maybe that'll >>>>>>> simplify >>>>>>>> things, but I need to do some studying there. >>>>>>>> >>>>>>>> The reason why even though my stores are local but still I want to >>>> store >>>>>>>> them on a database and not use straight up RocksDB (or global >>>> stores) is >>>>>>>> because this would allow me to migrate my current processing >>>> pipeline to >>>>>>>> Kafka Streams without needing to change the frontend part of the >>>>>>>> application, which fetches data from MongoDB. >>>>>>> >>>>>>> Makes sense. >>>>>>> >>>>>>>> >>>>>>>> PS When you mention Global State Stores I'm thinking of >>>>>>>> >>>>>>> >>>> http://docs.confluent.io/3.2.0/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application >>>>>>> , >>>>>>>> is this correct? >>>>>>> >>>>>>> No I think Matthias is saying that if you have a remote server >>>> somewhere >>>>>>> where you store all your data (like a shared file system). >>>>>>> This is not something Kafka would provide. >>>>>>> >>>>>>> Eno >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, May 12, 2017 at 10:02 AM Matthias J. Sax < >>>> matth...@confluent.io >>>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I am not sure about your overall setup. Are your stores local >>>> (similar >>>>>>>>> to RocksDB) or are you using one global remote store? If you use a >>>>>>>>> global remote store, you would not need to back your changes in a >>>>>>>>> changelog topic, as the store would not be lost if case of failure. >>>>>>>>> >>>>>>>>> Also (in case that your stores are remote), did you consider using >>>>>>> Kafka >>>>>>>>> Connect to export your data into an external store like MySQL or >>>>>>> MongoDB >>>>>>>>> instead of writing your own custom stores for Streams? >>>>>>>>> >>>>>>>>> If your stores are local, why do you write custom stores? I am >>>> curious >>>>>>>>> to understand why RocksDB does not serve your needs. >>>>>>>>> >>>>>>>>> >>>>>>>>> About your two comment: >>>>>>>>> >>>>>>>>> (1) Streams uses RocksDB by default and the default implementation >>>> is >>>>>>>>> using "checkpoint files" in next release. Those checkpoint files >>>> track >>>>>>>>> the changelog offsets of the data that got flushed to disc. This >>>> allows >>>>>>>>> to reduce the startup time, as only the tail of the changelog needs >>>> to >>>>>>>>> be read to bring the store up to date. For this, you would always >>>> (1) >>>>>>>>> write to the changelog, (2) write to you store. Each time you need >>>> to >>>>>>>>> flush, you know that all data is in the changelog already. After >>>> each >>>>>>>>> flush, you can update the "local offset checkpoint file". >>>>>>>>> >>>>>>>>> I guess, if you use local stores you can apply a similar pattern in >>>> you >>>>>>>>> custom store implementation. (And as mentioned above, for global >>>> remote >>>>>>>>> store you would not need the changelog anyway. -- This also applies >>>> to >>>>>>>>> your recovery question from below.) >>>>>>>>> >>>>>>>>> (2) You can configure standby task (via StreamConfig >>>>>>>>> "num.standby.replicas"). This will set up standby tasks that >>>> passively >>>>>>>>> replicate your stores to another instance. In error case, state >>>> will be >>>>>>>>> migrated to those "hot standbys" reducing recovery time >>>> significantly. >>>>>>>>> >>>>>>>>> >>>>>>>>> About your question: >>>>>>>>> >>>>>>>>> (1) Yes. >>>>>>>>> (2) Partly parallel (ie, if you run with multiple threads -- cf. >>>>>>>>> StreamsConfig "num.streams.thread"). Each thread, flushes all it's >>>>>>>>> stores sequentially. >>>>>>>>> (3) Yes. There will be a store for each partition. (If store is >>>> local.) >>>>>>>>> (4) Yes. The overall processing loop is sequential (cf. >>>>>>>>> >>>>>>>>> >>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture >>>>>>>>> ) >>>>>>>>> Also, the next commit point is computed after a successful commit -- >>>>>>>>> thus, if one commit is delayed, all consecutive commit points are >>>>>>>>> "shifted" by this delay. >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> On 5/12/17 9:00 AM, João Peixoto wrote: >>>>>>>>>> On a stream definition I perform an "aggregate" which is configured >>>>>>> with >>>>>>>>> a >>>>>>>>>> state store. >>>>>>>>>> >>>>>>>>>> *Goal*: Persist the aggregation results into a database, e.g. >>>> MySQL or >>>>>>>>>> MongoDB >>>>>>>>>> >>>>>>>>>> *Working approach*: >>>>>>>>>> I have created a custom StateStore backed by a changelog topic like >>>>>>> the >>>>>>>>>> builtin state stores. Whenever the store gets flushed I save to the >>>>>>>>>> database, mark the record as being persisted and log the change in >>>> the >>>>>>>>>> changelog. >>>>>>>>>> >>>>>>>>>> If something goes wrong during processing, the changelog guarantees >>>>>>> that >>>>>>>>> I >>>>>>>>>> do not lose data, restores the state and if some data point was not >>>>>>>>>> persisted, the next stream instance will persist it on its flush >>>>>>>>> operation. >>>>>>>>>> >>>>>>>>>> 1. I cannot store too much data in the changelog, even with >>>>>>> compaction, >>>>>>>>> if >>>>>>>>>> I have too much data, bootstrapping a stream instance would take a >>>>>>> long >>>>>>>>> time >>>>>>>>>> 2. On the other hand, if I take too long to recover from a >>>> failure, I >>>>>>> may >>>>>>>>>> lose data. So there seems to be a delicate tradeoff here >>>>>>>>>> >>>>>>>>>> *Questions*: >>>>>>>>>> >>>>>>>>>> 1. Is this a reasonable use case? >>>>>>>>>> 2. In a scenario where my stream would have a fanout (multiple >>>>>>>>> sub-streams >>>>>>>>>> based on the same stream), each branch would perform different >>>>>>>>> "aggregate" >>>>>>>>>> operations, each with its own state store. Are state stores >>>> flushed in >>>>>>>>>> parallel or sequentially? >>>>>>>>>> 3. The above also applies per-partition. As a stream definition is >>>>>>>>>> parallelized by partition, will one instance hold different store >>>>>>>>> instances >>>>>>>>>> for each one? >>>>>>>>>> 4. Through synthetic sleeps I simulated slow flushes, slower than >>>> the >>>>>>>>>> commit interval. The stream seems to be ok with it and didn't >>>> throw, I >>>>>>>>>> assume the Kafka consumer does not poll more records until all of >>>> the >>>>>>>>>> previous poll's are committed, but I couldn't find documentation to >>>>>>> back >>>>>>>>>> this statement. Is there a timeout for "commit" operations? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Sample code >>>>>>>>>> >>>>>>>>>> public class AggregateHolder { >>>>>>>>>> >>>>>>>>>> private Long commonKey; >>>>>>>>>> private List<Double> rawValues = new ArrayList<>(); >>>>>>>>>> private boolean persisted; >>>>>>>>>> >>>>>>>>>> // ... >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> And stream definition >>>>>>>>>> >>>>>>>>>> source.groupByKey(Serdes.String(), recordSerdes) >>>>>>>>>> .aggregate( >>>>>>>>>> AggregateHolder::new, >>>>>>>>>> (aggKey, value, aggregate) -> >>>>>>>>>> aggregate.addValue(value.getValue()), >>>>>>>>>> new DemoStoreSupplier<>(/* ... */) >>>>>>>>>> ) >>>>>>>>>> .foreach((key, agg) -> log.debug("Aggregate: {}={} >>>> ({})", >>>>>>>>>> key, agg.getAverage(), agg)); >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>> >>>> >>>> > >
signature.asc
Description: OpenPGP digital signature