Thanks Matthias for this information.  But it seems you are talking about a
logged store, since you mention the changelog topic and replaying it and 
whatnot.

But my question specifically was about *unlogged* state stores, where there is 
no
such changelog topic available.  Sorry if that wasn't clear before.  Or am I 
misunderstanding?

> On Feb 28, 2017, at 9:12 AM, Matthias J. Sax <matth...@confluent.io> wrote:
> 
> If a store is backed by a changelog topic, the changelog topic is
> responsible to hold the latest state of the store. Thus, the topic must
> store the latest value per key. For this, we use a compacted topic.
> 
> If case of restore, the local RocksDB store is cleared so it is empty,
> and we read the complete changelog topic an apply those updates to the
> store.
> 
> This allows a fast recovery, because no source topic rewind and not
> reprocessing is required. Furthermore, because the changelog topic is
> compacted, it is roughly the size of the number of distinct keys in the
> store -- this also reduced recovery time as you don't need to replay
> every update to the store.
> 
> We are currently working on an optimization, that allows us to only
> reply the tail to the changelog topic in certain cases to get the store
> back into a valid state: See
> https://issues.apache.org/jira/browse/KAFKA-4317
> 
> Furthermore, changelog topic allow to maintain StandbyTask -- those
> tasks only apply all updates to the changelog topic (that are written by
> the main task maintaining the store) to a local copy of the store. Thus,
> in case of fail-over those StandbyTasks can replace a failed task and
> because they have a copy of the state, they can take over even more
> quickly than a newly created tasks that needs to reply the changelog to
> rebuild the state first.
> 
> 
> 
> -Matthias
> 
> On 2/28/17 8:17 AM, Steven Schlansker wrote:
>> 
>>> On Feb 28, 2017, at 12:17 AM, Michael Noll <mich...@confluent.io> wrote:
>>> 
>>> Sachin,
>>> 
>>> disabling (change)logging for state stores disables the fault-tolerance of
>>> the state store -- i.e. changes to the state store will not be backed up to
>>> Kafka, regardless of whether the store uses a RocksDB store, an in-memory
>>> store, or something else
>> 
>> One thing I've wanted is a more concrete description of this failure mode.
>> What exactly is the process to recover from such a "failed" state store?
>> 
>> Does Kafka Streams rewind the source topic and replay?  (Including any 
>> Processors you may have wired up?)
>> Does the state store remain faulted?  Can an administrator fix it by 
>> resetting some offsets?
>> 
>> I looked around both in the project and Confluent documentation and didn't 
>> really find
>> an answer to how non-logged state stores fail or recover.
>> 
>> Thanks for any insight!
>> 
>>> 
>>> 
>>>> When disabling this in 0.10.2 what does this exactly means.
>>> 
>>> See above.
>>> 
>>> 
>>>> Does this means no longer any rocksdb state store would get created?
>>> 
>>> No, local state stores will still be created.  By default, the storage
>>> engine is RocksDB, so if you disable changelogging then you will still have
>>> local RocksDB stores (as usual) but those stores will not be backed up to
>>> Kafka behind the scenes.  If, in this situation, you lose a machine that
>>> has local RocksDB stores, then this state data is lost, too.
>>> 
>>> So there are two different things at play here:
>>> 
>>> 1. Whether you want to enable or disable (change)logging of state store,
>>> and thus to enable/disable fault-tolerant state stores.
>>> 
>>> 2. Which storage engine you want to use for the state stores.  The default
>>> is RocksDB.
>>> 
>>> If, for (2), you do not want to have RocksDB state stores, you can switch
>>> the storage engine to e.g. the in-memory store.  However, when you do
>>> switch from RocksDB to in-memory then all your state store's data must fit
>>> into memory (obviously), otherwise you'll run OOM.
>>> 
>>> In summary, you can have either of the following:
>>> 
>>> a. RocksDB state stores with changelogging enabled (= fault-tolerant
>>> stores).
>>> 
>>> b. RocksDB state stores with changelogging disabled (= stores are not
>>> fault-tolerant, you may suffer from data loss during e.g. machine failures).
>>> 
>>> c. In-memory state stores with changelogging enabled (= fault-tolerant
>>> stores). But careful: you may run OOM if the state data does not fit into
>>> the available memory.
>>> 
>>> d. In-memory state stores with changelogging disabled (= stores are not
>>> fault-tolerant, you may suffer from data loss during e.g. machine
>>> failures). But careful: you may run OOM if the state data does not fit into
>>> the available memory.
>>> 
>>> 
>>> Hope this helps,
>>> Michael
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Feb 28, 2017 at 8:01 AM, Sachin Mittal <sjmit...@gmail.com> wrote:
>>> 
>>>> I had a question regarding
>>>> http://docs.confluent.io/3.1.2/streams/developer-guide.
>>>> html#enable-disable-state-store-changelogs
>>>> 
>>>> When disabling this in 0.10.2 what does this exactly means.
>>>> Dos this means no longer any rocksdb state store would get created?
>>>> 
>>>> On this subject we had started with spark streaming, but we ran into memory
>>>> issues and the hardware we have got is not so fantastic to support spark
>>>> streaming.
>>>> 
>>>> So we switched to high level DSL kafka streaming .
>>>> 
>>>> I think if your source is kafka queues, kafka streaming is good and simple
>>>> to use. However you need to plan ahead as anticipate the (max) load and
>>>> create adequate partitions based on some key on which aggregations can be
>>>> performed independently.
>>>> 
>>>> Then you can run cluster of stream threads (same and multiple machines),
>>>> each processing a partition.
>>>> 
>>>> Having said this, we however run into lot of issues of frequent stream
>>>> re-balance, especially when we have multiple instances of rocks db running
>>>> on a single machine.
>>>> Now we don't know if this is some bad VM configuration issue or some
>>>> problem with kafka streams/rocks db integration, we are still working on
>>>> that.
>>>> 
>>>> So I would suggest if you partition your data well enough and have single
>>>> streams thread consuming only one partition and not many instances of
>>>> rocksdb created on a single machine, the overall applications runs fine.
>>>> Also make sure not to create big time windows and set a not so long
>>>> retention time, so that state stores size is limited.
>>>> 
>>>> We use a sliding 5 minutes window of size 10 minutes and retention of 30
>>>> minutes and see overall performance much better than say 30 minutes sliding
>>>> of size 1 hour and retention of 3 hours.
>>>> 
>>>> So to conclude if you can manage rocks db, then kafka streams is good to
>>>> start with, its simple and very intuitive to use.
>>>> 
>>>> Again on rocksdb side, is there a way to eliminate that and is
>>>> 
>>>> disableLogging
>>>> 
>>>> for that?
>>>> 
>>>> Thanks
>>>> Sachin
>>>> 
>>>> 
>>>> 
>>>> On Mon, Feb 27, 2017 at 7:47 PM, Michael Noll <mich...@confluent.io>
>>>> wrote:
>>>> 
>>>>>> Also, is it possible to stop the syncing between state stores to
>>>> brokers,
>>>>> if I am fine with failures?
>>>>> 
>>>>> Yes, you can disable the syncing (or the "changelog" feature) of state
>>>>> stores:
>>>>> http://docs.confluent.io/current/streams/developer-
>>>>> guide.html#enable-disable-state-store-changelogs
>>>>> 
>>>>>> I do have a Spark Cluster, but I am not convince how Spark Streaming
>>>> can
>>>>> do this differently.
>>>>>> Guozhang, could you comment anything regarding Kafka Streams vs Spark
>>>>> Streaming, especially
>>>>>> in terms of aggregations/groupbys/joins implementation logic?
>>>>> 
>>>>> As you are hinting at yourself, if you want fault-tolerant state, then
>>>> this
>>>>> fault tolerance comes at a price (in Kafka Streams, this is achieved by
>>>>> changelog-ing state stores).  Other tools such as Flink or Spark work in
>>>> a
>>>>> similar fashion, there's no free lunch.
>>>>> 
>>>>> One option, which you brought up above, is to disable the fault tolerance
>>>>> functionality for state by disabling the changelogs of state stores (see
>>>>> above).  Another option is to leverage Kafka's record caching for Kafka
>>>>> Streams, which does lower the amount of data that is sent across the
>>>>> network (from your app's state store changelogs to the Kafka cluster and
>>>>> vice versa), though you may need to tune some parameters in your
>>>> situation
>>>>> because your key space has high cardinality and message volume per key is
>>>>> relatively low (= you don't benefit as much from record caching as most
>>>>> other users/use cases).
>>>>> 
>>>>> 
>>>>> -Michael
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Mon, Feb 27, 2017 at 2:42 PM, Tianji Li <skyah...@gmail.com> wrote:
>>>>> 
>>>>>> Hi Guozhang and Kohki,
>>>>>> 
>>>>>> Thanks for your replies.
>>>>>> 
>>>>>> I think I know how to deal with partitioning now, but I am still not
>>>> sure
>>>>>> how to deal with the traffic between the hidden state store sizes and
>>>>> Kafka
>>>>>> Brokers (same as Kohki).
>>>>>> 
>>>>>> I feel like the easiest thing to do is to set a larger commit window,
>>>> so
>>>>>> that the state stores are synced to brokers slower than default.
>>>>>> 
>>>>>> I do have a Spark Cluster, but I am not convince how Spark Streaming
>>>> can
>>>>>> do this differently. Guozhang, could you comment anything regarding
>>>> Kafka
>>>>>> Streams vs Spark Streaming, especially in terms of
>>>>>> aggregations/groupbys/joins implementation logic?
>>>>>> 
>>>>>> Also, is it possible to stop the syncing between state stores to
>>>> brokers,
>>>>>> if I am fine with failures?
>>>>>> 
>>>>>> Thanks
>>>>>> Tianji
>>>>>> 
>>>>>> 
>>>>>> On 2017-02-26 23:52 (-0500), Guozhang Wang <wangg...@gmail.com> wrote:
>>>>>>> Hello Tianji,
>>>>>>> 
>>>>>>> As Kohki mentioned, in Streams joins and aggregations are always done
>>>>>>> pre-partitioned, and hence locally. So there won't be any inter-node
>>>>>>> communications needed to execute the join / aggregations. Also they
>>>> can
>>>>>> be
>>>>>>> hosted as persistent local state stores so you don't need to keep
>>>> them
>>>>> in
>>>>>>> memory. So for example if you partition your data with K1 / K2, then
>>>>> data
>>>>>>> with the same values in combo (K1, K2) will always goes to the same
>>>>>>> partition, and hence good for aggregations / joins on either K1, K2,
>>>> or
>>>>>>> combo(K1, K2), but not sufficient for combo(K1, K2, K3, K4), as data
>>>>> with
>>>>>>> the same values of K3 / K4 might still goes to different partitions
>>>>>>> processed by different Streams instances.
>>>>>>> 
>>>>>>> So what you want is really to partition based on the "maximum
>>>> superset"
>>>>>> of
>>>>>>> all the involved keys. Note that with the superset of all the keys
>>>> one
>>>>>>> thing to watch out is the even distribution of the partitions. If it
>>>> is
>>>>>> not
>>>>>>> evenly distributed, then some instance might become hot points. This
>>>>> can
>>>>>> be
>>>>>>> tackled by customizing the "PartitionGrouper" interface in Streams,
>>>>> which
>>>>>>> indicates which set of partitions will be assigned to each of the
>>>> tasks
>>>>>> (by
>>>>>>> default each one partition from the source topics will form a task,
>>>> and
>>>>>>> task is the unit of parallelism in Streams).
>>>>>>> 
>>>>>>> Hope this helps.
>>>>>>> 
>>>>>>> Guozhang
>>>>>>> 
>>>>>>> 
>>>>>>> On Sun, Feb 26, 2017 at 10:57 AM, Kohki Nishio <tarop...@gmail.com>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> Tianji,
>>>>>>>> KStream is indeed Append mode as long as I do stateless processing,
>>>>> but
>>>>>>>> when you do aggregation that is a stateful operation and it turns
>>>> to
>>>>>> KTable
>>>>>>>> and that does Update mode.
>>>>>>>> 
>>>>>>>> In regard to your aggregation, I believe Kafka's aggregation works
>>>>> for
>>>>>> a
>>>>>>>> single partition not over multiple partitions, are you doing 100
>>>>>>>> different aggregation against record key ? Then you should have a
>>>>>> single
>>>>>>>> data object for those 100 values, anyway it sounds like we have
>>>>> similar
>>>>>>>> problem ..
>>>>>>>> 
>>>>>>>> -Kohki
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Sat, Feb 25, 2017 at 1:11 PM, Tianji Li <skyah...@gmail.com>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Kohki,
>>>>>>>>> 
>>>>>>>>> Thanks very much for providing your investigation results.
>>>>> Regarding
>>>>>>>>> 'append' mode with Kafka Streams, isn't KStream the thing you
>>>> want?
>>>>>>>>> 
>>>>>>>>> Hi Guozhang,
>>>>>>>>> 
>>>>>>>>> Thanks for the pointers to the two blogs. I read one of them
>>>> before
>>>>>> and
>>>>>>>>> just had a look at the other one.
>>>>>>>>> 
>>>>>>>>> What I am hoping to do is below, can you help me decide if Kafka
>>>>>> Stream
>>>>>>>> is
>>>>>>>>> a good fit?
>>>>>>>>> 
>>>>>>>>> We have a few data sources, and we are hoping to correlate these
>>>>>> sources,
>>>>>>>>> and then do aggregations, as *a stream in real-time*.
>>>>>>>>> 
>>>>>>>>> The number of aggregations is around 100 which means, if using
>>>>> Kafka
>>>>>>>>> Streams, we need to maintain around 100 state stores with 100
>>>>>> change-log
>>>>>>>>> topics behind
>>>>>>>>> the scene when joining and aggregations.
>>>>>>>>> 
>>>>>>>>> The number of unique entries in each of these state stores is
>>>>>> expected to
>>>>>>>>> be at the level of < 100M. The size of each record is around 1K
>>>>>> bytes and
>>>>>>>>> so,
>>>>>>>>> each state is expected to be ~100G bytes in size. The total
>>>> number
>>>>> of
>>>>>>>>> bytes in all these state stores is thus around 10T bytes.
>>>>>>>>> 
>>>>>>>>> If keeping all these stores in memory, this translates into
>>>> around
>>>>> 50
>>>>>>>>> machines with 256Gbytes for this purpose alone.
>>>>>>>>> 
>>>>>>>>> Plus, the incoming raw data rate could reach 10M records per
>>>> second
>>>>>> in
>>>>>>>>> peak hours. So, during aggregation, data movement between Kafka
>>>>>> Streams
>>>>>>>>> instances
>>>>>>>>> will be heavy, i.e., 10M records per second in the cluster for
>>>>>> joining
>>>>>>>> and
>>>>>>>>> aggregations.
>>>>>>>>> 
>>>>>>>>> Is Kafka Streams good for this? My gut feeling is Kafka Streams
>>>> is
>>>>>> fine.
>>>>>>>>> But I'd like to run this by you.
>>>>>>>>> 
>>>>>>>>> And, I am hoping to minimize data movement (to saving bandwidth)
>>>>>> during
>>>>>>>>> joins/groupBys. If I partition the raw data with the minimum
>>>> subset
>>>>>> of
>>>>>>>>> aggregation keys (say K1 and K2),  then I wonder if the following
>>>>>>>>> joins/groupBys (say on keys K1, K2, K3, K4) happen on local data,
>>>>> if
>>>>>>>> using
>>>>>>>>> DSL?
>>>>>>>>> 
>>>>>>>>> Thanks
>>>>>>>>> Tianji
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On 2017-02-25 13:49 (-0500), Guozhang Wang <w...@gmail.com>
>>>> wrote:
>>>>>>>>>> Hello Kohki,>
>>>>>>>>>> 
>>>>>>>>>> Thanks for the email. I'd like to learn what's your concern of
>>>>> the
>>>>>> size
>>>>>>>>> of>
>>>>>>>>>> the state store? From your description it's a bit hard to
>>>> figure
>>>>>> out
>>>>>>>> but>
>>>>>>>>>> I'd guess you have lots of state stores while each of them are
>>>>>>>>> relatively>
>>>>>>>>>> small?>
>>>>>>>>>> 
>>>>>>>>>> Hello Tianji,>
>>>>>>>>>> 
>>>>>>>>>> Regarding your question about maturity and users of Streams,
>>>> you
>>>>>> can
>>>>>>>>> take a>
>>>>>>>>>> look at a bunch of the blog posts written about their Streams
>>>>>> usage in>
>>>>>>>>>> production, for example:>
>>>>>>>>>> 
>>>>>>>>>> http://engineering.skybettingandgaming.com/2017/01/23/
>>>>>>>>> streaming-architectures/>
>>>>>>>>>> 
>>>>>>>>>> http://developers.linecorp.com/blog/?p=3960>
>>>>>>>>>> 
>>>>>>>>>> Guozhang>
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Sat, Feb 25, 2017 at 7:52 AM, Kohki Nishio <ta...@gmail.com
>>>>> 
>>>>>>>> wrote:>
>>>>>>>>>> 
>>>>>>>>>>> I did a bit of research on that matter recently, the
>>>> comparison
>>>>>> is
>>>>>>>>> between>
>>>>>>>>>>> Spark Structured Streaming(SSS) and Kafka Streams,>
>>>>>>>>>>>> 
>>>>>>>>>>> Both are relatively new (~1y) and trying to solve similar
>>>>>> problems,
>>>>>>>>> however>
>>>>>>>>>>> if you go with Spark, you have to go with a cluster, if your
>>>>>>>>> environment>
>>>>>>>>>>> already have a cluster, then it's good. However our team
>>>>> doesn't
>>>>>> do
>>>>>>>>> any>
>>>>>>>>>>> Spark, so the initial cost would be very high. On the other
>>>>> hand,
>>>>>>>>> Kafka>
>>>>>>>>>>> Streams is a java library, since we have a service framework,
>>>>>> doing
>>>>>>>>> stream>
>>>>>>>>>>> inside a service is super easy.>
>>>>>>>>>>>> 
>>>>>>>>>>> However for some reason, people see SSS is more mature and
>>>>> Kafka
>>>>>>>>> Streams is>
>>>>>>>>>>> not so mature (like Beta). But old fashion stream is both
>>>>> mature
>>>>>>>>> enough (in>
>>>>>>>>>>> my opinion), I didn't see any difference in DStream(Spark)
>>>> and>
>>>>>>>>>>> KStream(Kafka)>
>>>>>>>>>>>> 
>>>>>>>>>>> DataFrame (Structured Streaming) and KTable, I found it quite
>>>>>>>>> different.>
>>>>>>>>>>> Kafka's model is more like a change log, that means you need
>>>> to
>>>>>> see
>>>>>>>>> the>
>>>>>>>>>>> latest entry to make a final decision. I would call this as
>>>>>> 'Update'
>>>>>>>>> model,>
>>>>>>>>>>> whereas Spark does 'Append' model and it doesn't support
>>>>> 'Update'
>>>>>>>>> model>
>>>>>>>>>>> yet. (it's coming to 2.2)>
>>>>>>>>>>>> 
>>>>>>>>>>> http://spark.apache.org/docs/latest/structured-streaming-pro
>>>>> 
>>>>>>>>>>> gramming-guide.html#output-modes>
>>>>>>>>>>>> 
>>>>>>>>>>> I wanted to have 'Append' model with Kafka, but it seems it's
>>>>> not
>>>>>>>> easy>
>>>>>>>>>>> thing to do, also Kafka Streams uses an internal topic to
>>>> keep
>>>>>> state>
>>>>>>>>>>> changes for fail-over scenario, but I'm dealing with a lots
>>>> of
>>>>>> tiny>
>>>>>>>>>>> information and I have a big concern about the size of the
>>>>> state
>>>>>>>> store
>>>>>>>>> />
>>>>>>>>>>> topic, so my decision is that I'm going with my own handling
>>>> of
>>>>>> Kafka
>>>>>>>>> API>
>>>>>>>>>>> ..>
>>>>>>>>>>>> 
>>>>>>>>>>> If you do stateless operation and don't have a spark cluster,
>>>>>> yeah
>>>>>>>>> Kafka>
>>>>>>>>>>> Streams is perfect.>
>>>>>>>>>>> If you do stateful complicated operation and happen to have a
>>>>>> spark>
>>>>>>>>>>> cluster, give Spark a try>
>>>>>>>>>>> else you have to write a code which is optimized for your use
>>>>>> case>
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> thanks>
>>>>>>>>>>> -Kohki>
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Feb 24, 2017 at 6:22 PM, Tianji Li <sk...@gmail.com>
>>>>>> wrote:>
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi there,>
>>>>>>>>>>>>> 
>>>>>>>>>>>> Can anyone give a good explanation in what cases Kafka
>>>>> Streams
>>>>>> is>
>>>>>>>>>>>> preferred, and in what cases Sparking Streaming is better?>
>>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks>
>>>>>>>>>>>> Tianji>
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> -->
>>>>>>>>>>> Kohki Nishio>
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> -- >
>>>>>>>>>> -- Guozhang>
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> Kohki Nishio
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 
> 

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to