Sorry. Miss understood your question.

For a non-logged store, in case of failure, we wipe out the entire state
(IIRC) -- thus, you will start with an empty state after recovery.


-Matthias


On 2/28/17 1:36 PM, Steven Schlansker wrote:
> Thanks Matthias for this information.  But it seems you are talking about a
> logged store, since you mention the changelog topic and replaying it and 
> whatnot.
> 
> But my question specifically was about *unlogged* state stores, where there 
> is no
> such changelog topic available.  Sorry if that wasn't clear before.  Or am I 
> misunderstanding?
> 
>> On Feb 28, 2017, at 9:12 AM, Matthias J. Sax <matth...@confluent.io> wrote:
>>
>> If a store is backed by a changelog topic, the changelog topic is
>> responsible to hold the latest state of the store. Thus, the topic must
>> store the latest value per key. For this, we use a compacted topic.
>>
>> If case of restore, the local RocksDB store is cleared so it is empty,
>> and we read the complete changelog topic an apply those updates to the
>> store.
>>
>> This allows a fast recovery, because no source topic rewind and not
>> reprocessing is required. Furthermore, because the changelog topic is
>> compacted, it is roughly the size of the number of distinct keys in the
>> store -- this also reduced recovery time as you don't need to replay
>> every update to the store.
>>
>> We are currently working on an optimization, that allows us to only
>> reply the tail to the changelog topic in certain cases to get the store
>> back into a valid state: See
>> https://issues.apache.org/jira/browse/KAFKA-4317
>>
>> Furthermore, changelog topic allow to maintain StandbyTask -- those
>> tasks only apply all updates to the changelog topic (that are written by
>> the main task maintaining the store) to a local copy of the store. Thus,
>> in case of fail-over those StandbyTasks can replace a failed task and
>> because they have a copy of the state, they can take over even more
>> quickly than a newly created tasks that needs to reply the changelog to
>> rebuild the state first.
>>
>>
>>
>> -Matthias
>>
>> On 2/28/17 8:17 AM, Steven Schlansker wrote:
>>>
>>>> On Feb 28, 2017, at 12:17 AM, Michael Noll <mich...@confluent.io> wrote:
>>>>
>>>> Sachin,
>>>>
>>>> disabling (change)logging for state stores disables the fault-tolerance of
>>>> the state store -- i.e. changes to the state store will not be backed up to
>>>> Kafka, regardless of whether the store uses a RocksDB store, an in-memory
>>>> store, or something else
>>>
>>> One thing I've wanted is a more concrete description of this failure mode.
>>> What exactly is the process to recover from such a "failed" state store?
>>>
>>> Does Kafka Streams rewind the source topic and replay?  (Including any 
>>> Processors you may have wired up?)
>>> Does the state store remain faulted?  Can an administrator fix it by 
>>> resetting some offsets?
>>>
>>> I looked around both in the project and Confluent documentation and didn't 
>>> really find
>>> an answer to how non-logged state stores fail or recover.
>>>
>>> Thanks for any insight!
>>>
>>>>
>>>>
>>>>> When disabling this in 0.10.2 what does this exactly means.
>>>>
>>>> See above.
>>>>
>>>>
>>>>> Does this means no longer any rocksdb state store would get created?
>>>>
>>>> No, local state stores will still be created.  By default, the storage
>>>> engine is RocksDB, so if you disable changelogging then you will still have
>>>> local RocksDB stores (as usual) but those stores will not be backed up to
>>>> Kafka behind the scenes.  If, in this situation, you lose a machine that
>>>> has local RocksDB stores, then this state data is lost, too.
>>>>
>>>> So there are two different things at play here:
>>>>
>>>> 1. Whether you want to enable or disable (change)logging of state store,
>>>> and thus to enable/disable fault-tolerant state stores.
>>>>
>>>> 2. Which storage engine you want to use for the state stores.  The default
>>>> is RocksDB.
>>>>
>>>> If, for (2), you do not want to have RocksDB state stores, you can switch
>>>> the storage engine to e.g. the in-memory store.  However, when you do
>>>> switch from RocksDB to in-memory then all your state store's data must fit
>>>> into memory (obviously), otherwise you'll run OOM.
>>>>
>>>> In summary, you can have either of the following:
>>>>
>>>> a. RocksDB state stores with changelogging enabled (= fault-tolerant
>>>> stores).
>>>>
>>>> b. RocksDB state stores with changelogging disabled (= stores are not
>>>> fault-tolerant, you may suffer from data loss during e.g. machine 
>>>> failures).
>>>>
>>>> c. In-memory state stores with changelogging enabled (= fault-tolerant
>>>> stores). But careful: you may run OOM if the state data does not fit into
>>>> the available memory.
>>>>
>>>> d. In-memory state stores with changelogging disabled (= stores are not
>>>> fault-tolerant, you may suffer from data loss during e.g. machine
>>>> failures). But careful: you may run OOM if the state data does not fit into
>>>> the available memory.
>>>>
>>>>
>>>> Hope this helps,
>>>> Michael
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Feb 28, 2017 at 8:01 AM, Sachin Mittal <sjmit...@gmail.com> wrote:
>>>>
>>>>> I had a question regarding
>>>>> http://docs.confluent.io/3.1.2/streams/developer-guide.
>>>>> html#enable-disable-state-store-changelogs
>>>>>
>>>>> When disabling this in 0.10.2 what does this exactly means.
>>>>> Dos this means no longer any rocksdb state store would get created?
>>>>>
>>>>> On this subject we had started with spark streaming, but we ran into 
>>>>> memory
>>>>> issues and the hardware we have got is not so fantastic to support spark
>>>>> streaming.
>>>>>
>>>>> So we switched to high level DSL kafka streaming .
>>>>>
>>>>> I think if your source is kafka queues, kafka streaming is good and simple
>>>>> to use. However you need to plan ahead as anticipate the (max) load and
>>>>> create adequate partitions based on some key on which aggregations can be
>>>>> performed independently.
>>>>>
>>>>> Then you can run cluster of stream threads (same and multiple machines),
>>>>> each processing a partition.
>>>>>
>>>>> Having said this, we however run into lot of issues of frequent stream
>>>>> re-balance, especially when we have multiple instances of rocks db running
>>>>> on a single machine.
>>>>> Now we don't know if this is some bad VM configuration issue or some
>>>>> problem with kafka streams/rocks db integration, we are still working on
>>>>> that.
>>>>>
>>>>> So I would suggest if you partition your data well enough and have single
>>>>> streams thread consuming only one partition and not many instances of
>>>>> rocksdb created on a single machine, the overall applications runs fine.
>>>>> Also make sure not to create big time windows and set a not so long
>>>>> retention time, so that state stores size is limited.
>>>>>
>>>>> We use a sliding 5 minutes window of size 10 minutes and retention of 30
>>>>> minutes and see overall performance much better than say 30 minutes 
>>>>> sliding
>>>>> of size 1 hour and retention of 3 hours.
>>>>>
>>>>> So to conclude if you can manage rocks db, then kafka streams is good to
>>>>> start with, its simple and very intuitive to use.
>>>>>
>>>>> Again on rocksdb side, is there a way to eliminate that and is
>>>>>
>>>>> disableLogging
>>>>>
>>>>> for that?
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 27, 2017 at 7:47 PM, Michael Noll <mich...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>>> Also, is it possible to stop the syncing between state stores to
>>>>> brokers,
>>>>>> if I am fine with failures?
>>>>>>
>>>>>> Yes, you can disable the syncing (or the "changelog" feature) of state
>>>>>> stores:
>>>>>> http://docs.confluent.io/current/streams/developer-
>>>>>> guide.html#enable-disable-state-store-changelogs
>>>>>>
>>>>>>> I do have a Spark Cluster, but I am not convince how Spark Streaming
>>>>> can
>>>>>> do this differently.
>>>>>>> Guozhang, could you comment anything regarding Kafka Streams vs Spark
>>>>>> Streaming, especially
>>>>>>> in terms of aggregations/groupbys/joins implementation logic?
>>>>>>
>>>>>> As you are hinting at yourself, if you want fault-tolerant state, then
>>>>> this
>>>>>> fault tolerance comes at a price (in Kafka Streams, this is achieved by
>>>>>> changelog-ing state stores).  Other tools such as Flink or Spark work in
>>>>> a
>>>>>> similar fashion, there's no free lunch.
>>>>>>
>>>>>> One option, which you brought up above, is to disable the fault tolerance
>>>>>> functionality for state by disabling the changelogs of state stores (see
>>>>>> above).  Another option is to leverage Kafka's record caching for Kafka
>>>>>> Streams, which does lower the amount of data that is sent across the
>>>>>> network (from your app's state store changelogs to the Kafka cluster and
>>>>>> vice versa), though you may need to tune some parameters in your
>>>>> situation
>>>>>> because your key space has high cardinality and message volume per key is
>>>>>> relatively low (= you don't benefit as much from record caching as most
>>>>>> other users/use cases).
>>>>>>
>>>>>>
>>>>>> -Michael
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Feb 27, 2017 at 2:42 PM, Tianji Li <skyah...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Guozhang and Kohki,
>>>>>>>
>>>>>>> Thanks for your replies.
>>>>>>>
>>>>>>> I think I know how to deal with partitioning now, but I am still not
>>>>> sure
>>>>>>> how to deal with the traffic between the hidden state store sizes and
>>>>>> Kafka
>>>>>>> Brokers (same as Kohki).
>>>>>>>
>>>>>>> I feel like the easiest thing to do is to set a larger commit window,
>>>>> so
>>>>>>> that the state stores are synced to brokers slower than default.
>>>>>>>
>>>>>>> I do have a Spark Cluster, but I am not convince how Spark Streaming
>>>>> can
>>>>>>> do this differently. Guozhang, could you comment anything regarding
>>>>> Kafka
>>>>>>> Streams vs Spark Streaming, especially in terms of
>>>>>>> aggregations/groupbys/joins implementation logic?
>>>>>>>
>>>>>>> Also, is it possible to stop the syncing between state stores to
>>>>> brokers,
>>>>>>> if I am fine with failures?
>>>>>>>
>>>>>>> Thanks
>>>>>>> Tianji
>>>>>>>
>>>>>>>
>>>>>>> On 2017-02-26 23:52 (-0500), Guozhang Wang <wangg...@gmail.com> wrote:
>>>>>>>> Hello Tianji,
>>>>>>>>
>>>>>>>> As Kohki mentioned, in Streams joins and aggregations are always done
>>>>>>>> pre-partitioned, and hence locally. So there won't be any inter-node
>>>>>>>> communications needed to execute the join / aggregations. Also they
>>>>> can
>>>>>>> be
>>>>>>>> hosted as persistent local state stores so you don't need to keep
>>>>> them
>>>>>> in
>>>>>>>> memory. So for example if you partition your data with K1 / K2, then
>>>>>> data
>>>>>>>> with the same values in combo (K1, K2) will always goes to the same
>>>>>>>> partition, and hence good for aggregations / joins on either K1, K2,
>>>>> or
>>>>>>>> combo(K1, K2), but not sufficient for combo(K1, K2, K3, K4), as data
>>>>>> with
>>>>>>>> the same values of K3 / K4 might still goes to different partitions
>>>>>>>> processed by different Streams instances.
>>>>>>>>
>>>>>>>> So what you want is really to partition based on the "maximum
>>>>> superset"
>>>>>>> of
>>>>>>>> all the involved keys. Note that with the superset of all the keys
>>>>> one
>>>>>>>> thing to watch out is the even distribution of the partitions. If it
>>>>> is
>>>>>>> not
>>>>>>>> evenly distributed, then some instance might become hot points. This
>>>>>> can
>>>>>>> be
>>>>>>>> tackled by customizing the "PartitionGrouper" interface in Streams,
>>>>>> which
>>>>>>>> indicates which set of partitions will be assigned to each of the
>>>>> tasks
>>>>>>> (by
>>>>>>>> default each one partition from the source topics will form a task,
>>>>> and
>>>>>>>> task is the unit of parallelism in Streams).
>>>>>>>>
>>>>>>>> Hope this helps.
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Feb 26, 2017 at 10:57 AM, Kohki Nishio <tarop...@gmail.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Tianji,
>>>>>>>>> KStream is indeed Append mode as long as I do stateless processing,
>>>>>> but
>>>>>>>>> when you do aggregation that is a stateful operation and it turns
>>>>> to
>>>>>>> KTable
>>>>>>>>> and that does Update mode.
>>>>>>>>>
>>>>>>>>> In regard to your aggregation, I believe Kafka's aggregation works
>>>>>> for
>>>>>>> a
>>>>>>>>> single partition not over multiple partitions, are you doing 100
>>>>>>>>> different aggregation against record key ? Then you should have a
>>>>>>> single
>>>>>>>>> data object for those 100 values, anyway it sounds like we have
>>>>>> similar
>>>>>>>>> problem ..
>>>>>>>>>
>>>>>>>>> -Kohki
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Feb 25, 2017 at 1:11 PM, Tianji Li <skyah...@gmail.com>
>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Kohki,
>>>>>>>>>>
>>>>>>>>>> Thanks very much for providing your investigation results.
>>>>>> Regarding
>>>>>>>>>> 'append' mode with Kafka Streams, isn't KStream the thing you
>>>>> want?
>>>>>>>>>>
>>>>>>>>>> Hi Guozhang,
>>>>>>>>>>
>>>>>>>>>> Thanks for the pointers to the two blogs. I read one of them
>>>>> before
>>>>>>> and
>>>>>>>>>> just had a look at the other one.
>>>>>>>>>>
>>>>>>>>>> What I am hoping to do is below, can you help me decide if Kafka
>>>>>>> Stream
>>>>>>>>> is
>>>>>>>>>> a good fit?
>>>>>>>>>>
>>>>>>>>>> We have a few data sources, and we are hoping to correlate these
>>>>>>> sources,
>>>>>>>>>> and then do aggregations, as *a stream in real-time*.
>>>>>>>>>>
>>>>>>>>>> The number of aggregations is around 100 which means, if using
>>>>>> Kafka
>>>>>>>>>> Streams, we need to maintain around 100 state stores with 100
>>>>>>> change-log
>>>>>>>>>> topics behind
>>>>>>>>>> the scene when joining and aggregations.
>>>>>>>>>>
>>>>>>>>>> The number of unique entries in each of these state stores is
>>>>>>> expected to
>>>>>>>>>> be at the level of < 100M. The size of each record is around 1K
>>>>>>> bytes and
>>>>>>>>>> so,
>>>>>>>>>> each state is expected to be ~100G bytes in size. The total
>>>>> number
>>>>>> of
>>>>>>>>>> bytes in all these state stores is thus around 10T bytes.
>>>>>>>>>>
>>>>>>>>>> If keeping all these stores in memory, this translates into
>>>>> around
>>>>>> 50
>>>>>>>>>> machines with 256Gbytes for this purpose alone.
>>>>>>>>>>
>>>>>>>>>> Plus, the incoming raw data rate could reach 10M records per
>>>>> second
>>>>>>> in
>>>>>>>>>> peak hours. So, during aggregation, data movement between Kafka
>>>>>>> Streams
>>>>>>>>>> instances
>>>>>>>>>> will be heavy, i.e., 10M records per second in the cluster for
>>>>>>> joining
>>>>>>>>> and
>>>>>>>>>> aggregations.
>>>>>>>>>>
>>>>>>>>>> Is Kafka Streams good for this? My gut feeling is Kafka Streams
>>>>> is
>>>>>>> fine.
>>>>>>>>>> But I'd like to run this by you.
>>>>>>>>>>
>>>>>>>>>> And, I am hoping to minimize data movement (to saving bandwidth)
>>>>>>> during
>>>>>>>>>> joins/groupBys. If I partition the raw data with the minimum
>>>>> subset
>>>>>>> of
>>>>>>>>>> aggregation keys (say K1 and K2),  then I wonder if the following
>>>>>>>>>> joins/groupBys (say on keys K1, K2, K3, K4) happen on local data,
>>>>>> if
>>>>>>>>> using
>>>>>>>>>> DSL?
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Tianji
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 2017-02-25 13:49 (-0500), Guozhang Wang <w...@gmail.com>
>>>>> wrote:
>>>>>>>>>>> Hello Kohki,>
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the email. I'd like to learn what's your concern of
>>>>>> the
>>>>>>> size
>>>>>>>>>> of>
>>>>>>>>>>> the state store? From your description it's a bit hard to
>>>>> figure
>>>>>>> out
>>>>>>>>> but>
>>>>>>>>>>> I'd guess you have lots of state stores while each of them are
>>>>>>>>>> relatively>
>>>>>>>>>>> small?>
>>>>>>>>>>>
>>>>>>>>>>> Hello Tianji,>
>>>>>>>>>>>
>>>>>>>>>>> Regarding your question about maturity and users of Streams,
>>>>> you
>>>>>>> can
>>>>>>>>>> take a>
>>>>>>>>>>> look at a bunch of the blog posts written about their Streams
>>>>>>> usage in>
>>>>>>>>>>> production, for example:>
>>>>>>>>>>>
>>>>>>>>>>> http://engineering.skybettingandgaming.com/2017/01/23/
>>>>>>>>>> streaming-architectures/>
>>>>>>>>>>>
>>>>>>>>>>> http://developers.linecorp.com/blog/?p=3960>
>>>>>>>>>>>
>>>>>>>>>>> Guozhang>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Feb 25, 2017 at 7:52 AM, Kohki Nishio <ta...@gmail.com
>>>>>>
>>>>>>>>> wrote:>
>>>>>>>>>>>
>>>>>>>>>>>> I did a bit of research on that matter recently, the
>>>>> comparison
>>>>>>> is
>>>>>>>>>> between>
>>>>>>>>>>>> Spark Structured Streaming(SSS) and Kafka Streams,>
>>>>>>>>>>>>>
>>>>>>>>>>>> Both are relatively new (~1y) and trying to solve similar
>>>>>>> problems,
>>>>>>>>>> however>
>>>>>>>>>>>> if you go with Spark, you have to go with a cluster, if your
>>>>>>>>>> environment>
>>>>>>>>>>>> already have a cluster, then it's good. However our team
>>>>>> doesn't
>>>>>>> do
>>>>>>>>>> any>
>>>>>>>>>>>> Spark, so the initial cost would be very high. On the other
>>>>>> hand,
>>>>>>>>>> Kafka>
>>>>>>>>>>>> Streams is a java library, since we have a service framework,
>>>>>>> doing
>>>>>>>>>> stream>
>>>>>>>>>>>> inside a service is super easy.>
>>>>>>>>>>>>>
>>>>>>>>>>>> However for some reason, people see SSS is more mature and
>>>>>> Kafka
>>>>>>>>>> Streams is>
>>>>>>>>>>>> not so mature (like Beta). But old fashion stream is both
>>>>>> mature
>>>>>>>>>> enough (in>
>>>>>>>>>>>> my opinion), I didn't see any difference in DStream(Spark)
>>>>> and>
>>>>>>>>>>>> KStream(Kafka)>
>>>>>>>>>>>>>
>>>>>>>>>>>> DataFrame (Structured Streaming) and KTable, I found it quite
>>>>>>>>>> different.>
>>>>>>>>>>>> Kafka's model is more like a change log, that means you need
>>>>> to
>>>>>>> see
>>>>>>>>>> the>
>>>>>>>>>>>> latest entry to make a final decision. I would call this as
>>>>>>> 'Update'
>>>>>>>>>> model,>
>>>>>>>>>>>> whereas Spark does 'Append' model and it doesn't support
>>>>>> 'Update'
>>>>>>>>>> model>
>>>>>>>>>>>> yet. (it's coming to 2.2)>
>>>>>>>>>>>>>
>>>>>>>>>>>> http://spark.apache.org/docs/latest/structured-streaming-pro
>>>>>>
>>>>>>>>>>>> gramming-guide.html#output-modes>
>>>>>>>>>>>>>
>>>>>>>>>>>> I wanted to have 'Append' model with Kafka, but it seems it's
>>>>>> not
>>>>>>>>> easy>
>>>>>>>>>>>> thing to do, also Kafka Streams uses an internal topic to
>>>>> keep
>>>>>>> state>
>>>>>>>>>>>> changes for fail-over scenario, but I'm dealing with a lots
>>>>> of
>>>>>>> tiny>
>>>>>>>>>>>> information and I have a big concern about the size of the
>>>>>> state
>>>>>>>>> store
>>>>>>>>>> />
>>>>>>>>>>>> topic, so my decision is that I'm going with my own handling
>>>>> of
>>>>>>> Kafka
>>>>>>>>>> API>
>>>>>>>>>>>> ..>
>>>>>>>>>>>>>
>>>>>>>>>>>> If you do stateless operation and don't have a spark cluster,
>>>>>>> yeah
>>>>>>>>>> Kafka>
>>>>>>>>>>>> Streams is perfect.>
>>>>>>>>>>>> If you do stateful complicated operation and happen to have a
>>>>>>> spark>
>>>>>>>>>>>> cluster, give Spark a try>
>>>>>>>>>>>> else you have to write a code which is optimized for your use
>>>>>>> case>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>> thanks>
>>>>>>>>>>>> -Kohki>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Feb 24, 2017 at 6:22 PM, Tianji Li <sk...@gmail.com>
>>>>>>> wrote:>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi there,>
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Can anyone give a good explanation in what cases Kafka
>>>>>> Streams
>>>>>>> is>
>>>>>>>>>>>>> preferred, and in what cases Sparking Streaming is better?>
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks>
>>>>>>>>>>>>> Tianji>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>> -->
>>>>>>>>>>>> Kohki Nishio>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -- >
>>>>>>>>>>> -- Guozhang>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Kohki Nishio
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to