Just to add to this:

Streams create re-partitioning topics "lazy", meaning when the key is
(potentially) changes, we only set a flag but don't add the
re-partitioning topic. On groupByKey() or join() we check is this
"re-partitioning required flag" is set in add the topic.

Thus, if you have a

stream.map().map().selectKey().groupBy()

we only re-partition on groupBy, but not after the first two map() steps.


-Matthias

On 5/16/17 8:18 AM, Eno Thereska wrote:
> (it's preferred to create another email thread for a different topic to make 
> it easier to look back)
> 
> Yes, there could be room for optimizations, e.g., see this: 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3cCAJikTEUHR=r0ika6vlf_y+qajxg8f_q19og_-s+q-gozpqb...@mail.gmail.com%3e
>  
> <http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3CCAJikTEUHR=r0ika6vlf_y+qajxg8f_q19og_-s+q-gozpqb...@mail.gmail.com%3E>
> 
> Eno
> 
>> On 16 May 2017, at 16:01, João Peixoto <joao.harti...@gmail.com> wrote:
>>
>> Follow up doubt (let me know if a new question should be created).
>>
>> Do we always need a repartitioning topic?
>>
>> If I'm reading things correctly when we change the key of a record we need
>> make sure the new key falls on the same partition that we are processing.
>> This makes a lot of sense if after such change we'd need to join on some
>> other stream/table or cases where we sink to a topic.
>> However, in cases where none of these things, the repartition topic does
>> nothing? If this is true can we somehow not create it?
>>
>> On Sun, May 14, 2017 at 7:58 PM João Peixoto <joao.harti...@gmail.com>
>> wrote:
>>
>>> Very useful links, thank you.
>>>
>>> Part of my original misunderstanding was that the at-least-once guarantee
>>> was considered fulfilled if the record reached a sink node.
>>>
>>> Thanks for all the feedback, you may consider my question answered.
>>> Feel free to ask further questions about the use case if found interesting.
>>> On Sun, May 14, 2017 at 4:31 PM Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> Yes.
>>>>
>>>> It is basically "documented", as Streams guarantees at-least-once
>>>> semantics. Thus, we make sure, you will not loose any data in case of
>>>> failure. (ie, the overall guarantee is documented)
>>>>
>>>> To achieve this, we always flush before we commit offsets. (This is not
>>>> explicitly documented as it's an implementation detail.)
>>>>
>>>> There is some doc's in the wiki:
>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Internal+Data+Management#KafkaStreamsInternalDataManagement-Commits
>>>>
>>>> This might also help in case you want to dig into the code:
>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 5/14/17 4:07 PM, João Peixoto wrote:
>>>>> I think I now understand what Matthias meant when he said "If you use a
>>>>> global remote store, you would not need to back your changes in a
>>>> changelog
>>>>> topic, as the store would not be lost if case of failure".
>>>>>
>>>>> I had the misconception that if a state store threw an exception during
>>>>> "flush", all messages received between now and the previous flush would
>>>> be
>>>>> "lost", hence the need for a changelog topic. However, it seems that the
>>>>> "repartition" topic actually solves this problem.
>>>>>
>>>>> There's very little information about the latter, at least that I could
>>>>> find, but an entry seems to be added whenever a record enters the
>>>>> "aggregate", but the state store "consumer" of this topic only updates
>>>> its
>>>>> offset after the flush completes, meaning that the repartition topic
>>>> will
>>>>> be replayed! It seems this problem is already solved for me, I'd
>>>> appreciate
>>>>> if someone could point me to the documentation or code that backs up the
>>>>> above.
>>>>>
>>>>>
>>>>> On Sat, May 13, 2017 at 3:11 PM João Peixoto <joao.harti...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Replies in line as well
>>>>>>
>>>>>>
>>>>>> On Sat, May 13, 2017 at 3:25 AM Eno Thereska <eno.there...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi João,
>>>>>>>
>>>>>>> Some answers inline:
>>>>>>>
>>>>>>>> On 12 May 2017, at 18:27, João Peixoto <joao.harti...@gmail.com>
>>>> wrote:
>>>>>>>>
>>>>>>>> Thanks for the comments, here are some clarifications:
>>>>>>>>
>>>>>>>> I did look at interactive queries, if I understood them correctly it
>>>>>>> means
>>>>>>>> that my state store must hold all the results in order for it to be
>>>>>>>> queried, either in memory or through disk (RocksDB).
>>>>>>>
>>>>>>> Yes, that's correct.
>>>>>>>
>>>>>>>
>>>>>>>> 1. The retention policy on my aggregate operations, in my case, is 90
>>>>>>> days,
>>>>>>>> which is way too much data to hold in memory
>>>>>>>
>>>>>>> It will depend on how much data/memory you have, but perhaps it could
>>>> be
>>>>>>> too much to hold in memory
>>>>>>> for that long (especially because some failure is bound to happen in
>>>> 90
>>>>>>> days)
>>>>>>>
>>>>>>>> 2. My stream instances do no have access to disk, even if they did,
>>>>>>>> wouldn't it mean I'd need almost twice the disk space to hold the
>>>> same
>>>>>>>> data? I.e. kafka brokers golding the topics + RocksDB holding the
>>>> state?
>>>>>>>
>>>>>>> That's interesting, is there a reason why the streams instances don't
>>>>>>> have access to a local file system? I'm curious
>>>>>>> what kind of deployment you have.
>>>>>>>
>>>>>>
>>>>>> My instances are deployed on Kubernetes
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> It is true that twice the disk space is needed to hold the data on
>>>>>>> RocksDb as well as in the Kafka changelog topic,
>>>>>>> however that is no different that the current situation where the
>>>> data is
>>>>>>> stored to a remote database right?
>>>>>>> I understand your point that you might not have access to local disk
>>>>>>> though.
>>>>>>>
>>>>>>
>>>>>> Not quite. In my scenario the state store would be backed by the
>>>> changelog
>>>>>> AND MongoDB. On bootstrap we would fetch the state stored in the
>>>> changelog
>>>>>> into a structure like a LoadingCache (from Guava's), which would load
>>>>>> additional fields from MongoDB if needed. Therefore the changelog could
>>>>>> hold say 12 hours of records and 90 days is stored in MongoDB only.
>>>> That's
>>>>>> exactly the whole strategy I'm trying to validate. The implications
>>>> would
>>>>>> be that in case of failure we'd need to recover within 12 hours or else
>>>>>> we'd need to replay from the source topics.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> 3. Because a crash may happen between an entry is added to the
>>>> changelog
>>>>>>>> and the data store is flushed, I need to get all the changes
>>>> everytime
>>>>>>> if I
>>>>>>>> want to guarantee that all data is eventually persisted. This is why
>>>>>>>> checkpoint files may not work for me.
>>>>>>>
>>>>>>> The upcoming exactly-once support in 0.11 will help with these kind of
>>>>>>> guarantees.
>>>>>>>
>>>>>>
>>>>>> Not sure I understand how exactly-once would help in this case.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> Standby tasks looks great, I forgot about those.
>>>>>>>>
>>>>>>>> I'm at the design phase so this is all tentative. Answering Matthias
>>>>>>>> questions
>>>>>>>>
>>>>>>>> My state stores are local. As mentioned above I do not have access to
>>>>>>> disk
>>>>>>>> therefore I need to recover all data from somewhere, in this case I'm
>>>>>>>> thinking about the changelog.
>>>>>>>
>>>>>>> So this is where I get confused a bit, since you mention that your
>>>> state
>>>>>>> stores are "local", i.e., the streams instance
>>>>>>> does have access to a local file system.
>>>>>>>
>>>>>>
>>>>>> When I said "local" I meant that the state stores are partial for each
>>>>>> instance, i.e. they only have the partitions the task is responsible
>>>> for
>>>>>> (normal behavior) rather than a global store.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>> I read about Kafka Connect but have never used it, maybe that'll
>>>>>>> simplify
>>>>>>>> things, but I need to do some studying there.
>>>>>>>>
>>>>>>>> The reason why even though my stores are local but still I want to
>>>> store
>>>>>>>> them on a database and not use straight up RocksDB (or global
>>>> stores) is
>>>>>>>> because this would allow me to migrate my current processing
>>>> pipeline to
>>>>>>>> Kafka Streams without needing to change the frontend part of the
>>>>>>>> application, which fetches data from MongoDB.
>>>>>>>
>>>>>>> Makes sense.
>>>>>>>
>>>>>>>>
>>>>>>>> PS When you mention Global State Stores I'm thinking of
>>>>>>>>
>>>>>>>
>>>> http://docs.confluent.io/3.2.0/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application
>>>>>>> ,
>>>>>>>> is this correct?
>>>>>>>
>>>>>>> No I think Matthias is saying that if you have a remote server
>>>> somewhere
>>>>>>> where you store all your data (like a shared file system).
>>>>>>> This is not something Kafka would provide.
>>>>>>>
>>>>>>> Eno
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, May 12, 2017 at 10:02 AM Matthias J. Sax <
>>>> matth...@confluent.io
>>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am not sure about your overall setup. Are your stores local
>>>> (similar
>>>>>>>>> to RocksDB) or are you using one global remote store? If you use a
>>>>>>>>> global remote store, you would not need to back your changes in a
>>>>>>>>> changelog topic, as the store would not be lost if case of failure.
>>>>>>>>>
>>>>>>>>> Also (in case that your stores are remote), did you consider using
>>>>>>> Kafka
>>>>>>>>> Connect to export your data into an external store like MySQL or
>>>>>>> MongoDB
>>>>>>>>> instead of writing your own custom stores for Streams?
>>>>>>>>>
>>>>>>>>> If your stores are local, why do you write custom stores? I am
>>>> curious
>>>>>>>>> to understand why RocksDB does not serve your needs.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> About your two comment:
>>>>>>>>>
>>>>>>>>> (1) Streams uses RocksDB by default and the default implementation
>>>> is
>>>>>>>>> using "checkpoint files" in next release. Those checkpoint files
>>>> track
>>>>>>>>> the changelog offsets of the data that got flushed to disc. This
>>>> allows
>>>>>>>>> to reduce the startup time, as only the tail of the changelog needs
>>>> to
>>>>>>>>> be read to bring the store up to date. For this, you would always
>>>> (1)
>>>>>>>>> write to the changelog, (2) write to you store. Each time you need
>>>> to
>>>>>>>>> flush, you know that all data is in the changelog already. After
>>>> each
>>>>>>>>> flush, you can update the "local offset checkpoint file".
>>>>>>>>>
>>>>>>>>> I guess, if you use local stores you can apply a similar pattern in
>>>> you
>>>>>>>>> custom store implementation. (And as mentioned above, for global
>>>> remote
>>>>>>>>> store you would not need the changelog anyway. -- This also applies
>>>> to
>>>>>>>>> your recovery question from below.)
>>>>>>>>>
>>>>>>>>> (2) You can configure standby task (via StreamConfig
>>>>>>>>> "num.standby.replicas"). This will set up standby tasks that
>>>> passively
>>>>>>>>> replicate your stores to another instance. In error case, state
>>>> will be
>>>>>>>>> migrated to those "hot standbys" reducing recovery time
>>>> significantly.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> About your question:
>>>>>>>>>
>>>>>>>>> (1) Yes.
>>>>>>>>> (2) Partly parallel (ie, if you run with multiple threads -- cf.
>>>>>>>>> StreamsConfig "num.streams.thread"). Each thread, flushes all it's
>>>>>>>>> stores sequentially.
>>>>>>>>> (3) Yes. There will be a store for each partition. (If store is
>>>> local.)
>>>>>>>>> (4) Yes. The overall processing loop is sequential (cf.
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture
>>>>>>>>> )
>>>>>>>>> Also, the next commit point is computed after a successful commit --
>>>>>>>>> thus, if one commit is delayed, all consecutive commit points are
>>>>>>>>> "shifted" by this delay.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 5/12/17 9:00 AM, João Peixoto wrote:
>>>>>>>>>> On a stream definition I perform an "aggregate" which is configured
>>>>>>> with
>>>>>>>>> a
>>>>>>>>>> state store.
>>>>>>>>>>
>>>>>>>>>> *Goal*: Persist the aggregation results into a database, e.g.
>>>> MySQL or
>>>>>>>>>> MongoDB
>>>>>>>>>>
>>>>>>>>>> *Working approach*:
>>>>>>>>>> I have created a custom StateStore backed by a changelog topic like
>>>>>>> the
>>>>>>>>>> builtin state stores. Whenever the store gets flushed I save to the
>>>>>>>>>> database, mark the record as being persisted and log the change in
>>>> the
>>>>>>>>>> changelog.
>>>>>>>>>>
>>>>>>>>>> If something goes wrong during processing, the changelog guarantees
>>>>>>> that
>>>>>>>>> I
>>>>>>>>>> do not lose data, restores the state and if some data point was not
>>>>>>>>>> persisted, the next stream instance will persist it on its flush
>>>>>>>>> operation.
>>>>>>>>>>
>>>>>>>>>> 1. I cannot store too much data in the changelog, even with
>>>>>>> compaction,
>>>>>>>>> if
>>>>>>>>>> I have too much data, bootstrapping a stream instance would take a
>>>>>>> long
>>>>>>>>> time
>>>>>>>>>> 2. On the other hand, if I take too long to recover from a
>>>> failure, I
>>>>>>> may
>>>>>>>>>> lose data. So there seems to be a delicate tradeoff here
>>>>>>>>>>
>>>>>>>>>> *Questions*:
>>>>>>>>>>
>>>>>>>>>> 1. Is this a reasonable use case?
>>>>>>>>>> 2. In a scenario where my stream would have a fanout (multiple
>>>>>>>>> sub-streams
>>>>>>>>>> based on the same stream), each branch would perform different
>>>>>>>>> "aggregate"
>>>>>>>>>> operations, each with its own state store. Are state stores
>>>> flushed in
>>>>>>>>>> parallel or sequentially?
>>>>>>>>>> 3. The above also applies per-partition. As a stream definition is
>>>>>>>>>> parallelized by partition, will one instance hold different store
>>>>>>>>> instances
>>>>>>>>>> for each one?
>>>>>>>>>> 4. Through synthetic sleeps I simulated slow flushes, slower than
>>>> the
>>>>>>>>>> commit interval. The stream seems to be ok with it and didn't
>>>> throw, I
>>>>>>>>>> assume the Kafka consumer does not poll more records until all of
>>>> the
>>>>>>>>>> previous poll's are committed, but I couldn't find documentation to
>>>>>>> back
>>>>>>>>>> this statement. Is there a timeout for "commit" operations?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Sample code
>>>>>>>>>>
>>>>>>>>>> public class AggregateHolder {
>>>>>>>>>>
>>>>>>>>>>   private Long commonKey;
>>>>>>>>>>   private List<Double> rawValues = new ArrayList<>();
>>>>>>>>>>   private boolean persisted;
>>>>>>>>>>
>>>>>>>>>> // ...
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> And stream definition
>>>>>>>>>>
>>>>>>>>>> source.groupByKey(Serdes.String(), recordSerdes)
>>>>>>>>>>             .aggregate(
>>>>>>>>>>                     AggregateHolder::new,
>>>>>>>>>>                     (aggKey, value, aggregate) ->
>>>>>>>>>> aggregate.addValue(value.getValue()),
>>>>>>>>>>                     new DemoStoreSupplier<>(/* ... */)
>>>>>>>>>>             )
>>>>>>>>>>             .foreach((key, agg) -> log.debug("Aggregate: {}={}
>>>> ({})",
>>>>>>>>>> key, agg.getAverage(), agg));
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to