Sorry. Miss understood your question. For a non-logged store, in case of failure, we wipe out the entire state (IIRC) -- thus, you will start with an empty state after recovery.
-Matthias On 2/28/17 1:36 PM, Steven Schlansker wrote: > Thanks Matthias for this information. But it seems you are talking about a > logged store, since you mention the changelog topic and replaying it and > whatnot. > > But my question specifically was about *unlogged* state stores, where there > is no > such changelog topic available. Sorry if that wasn't clear before. Or am I > misunderstanding? > >> On Feb 28, 2017, at 9:12 AM, Matthias J. Sax <matth...@confluent.io> wrote: >> >> If a store is backed by a changelog topic, the changelog topic is >> responsible to hold the latest state of the store. Thus, the topic must >> store the latest value per key. For this, we use a compacted topic. >> >> If case of restore, the local RocksDB store is cleared so it is empty, >> and we read the complete changelog topic an apply those updates to the >> store. >> >> This allows a fast recovery, because no source topic rewind and not >> reprocessing is required. Furthermore, because the changelog topic is >> compacted, it is roughly the size of the number of distinct keys in the >> store -- this also reduced recovery time as you don't need to replay >> every update to the store. >> >> We are currently working on an optimization, that allows us to only >> reply the tail to the changelog topic in certain cases to get the store >> back into a valid state: See >> https://issues.apache.org/jira/browse/KAFKA-4317 >> >> Furthermore, changelog topic allow to maintain StandbyTask -- those >> tasks only apply all updates to the changelog topic (that are written by >> the main task maintaining the store) to a local copy of the store. Thus, >> in case of fail-over those StandbyTasks can replace a failed task and >> because they have a copy of the state, they can take over even more >> quickly than a newly created tasks that needs to reply the changelog to >> rebuild the state first. >> >> >> >> -Matthias >> >> On 2/28/17 8:17 AM, Steven Schlansker wrote: >>> >>>> On Feb 28, 2017, at 12:17 AM, Michael Noll <mich...@confluent.io> wrote: >>>> >>>> Sachin, >>>> >>>> disabling (change)logging for state stores disables the fault-tolerance of >>>> the state store -- i.e. changes to the state store will not be backed up to >>>> Kafka, regardless of whether the store uses a RocksDB store, an in-memory >>>> store, or something else >>> >>> One thing I've wanted is a more concrete description of this failure mode. >>> What exactly is the process to recover from such a "failed" state store? >>> >>> Does Kafka Streams rewind the source topic and replay? (Including any >>> Processors you may have wired up?) >>> Does the state store remain faulted? Can an administrator fix it by >>> resetting some offsets? >>> >>> I looked around both in the project and Confluent documentation and didn't >>> really find >>> an answer to how non-logged state stores fail or recover. >>> >>> Thanks for any insight! >>> >>>> >>>> >>>>> When disabling this in 0.10.2 what does this exactly means. >>>> >>>> See above. >>>> >>>> >>>>> Does this means no longer any rocksdb state store would get created? >>>> >>>> No, local state stores will still be created. By default, the storage >>>> engine is RocksDB, so if you disable changelogging then you will still have >>>> local RocksDB stores (as usual) but those stores will not be backed up to >>>> Kafka behind the scenes. If, in this situation, you lose a machine that >>>> has local RocksDB stores, then this state data is lost, too. >>>> >>>> So there are two different things at play here: >>>> >>>> 1. Whether you want to enable or disable (change)logging of state store, >>>> and thus to enable/disable fault-tolerant state stores. >>>> >>>> 2. Which storage engine you want to use for the state stores. The default >>>> is RocksDB. >>>> >>>> If, for (2), you do not want to have RocksDB state stores, you can switch >>>> the storage engine to e.g. the in-memory store. However, when you do >>>> switch from RocksDB to in-memory then all your state store's data must fit >>>> into memory (obviously), otherwise you'll run OOM. >>>> >>>> In summary, you can have either of the following: >>>> >>>> a. RocksDB state stores with changelogging enabled (= fault-tolerant >>>> stores). >>>> >>>> b. RocksDB state stores with changelogging disabled (= stores are not >>>> fault-tolerant, you may suffer from data loss during e.g. machine >>>> failures). >>>> >>>> c. In-memory state stores with changelogging enabled (= fault-tolerant >>>> stores). But careful: you may run OOM if the state data does not fit into >>>> the available memory. >>>> >>>> d. In-memory state stores with changelogging disabled (= stores are not >>>> fault-tolerant, you may suffer from data loss during e.g. machine >>>> failures). But careful: you may run OOM if the state data does not fit into >>>> the available memory. >>>> >>>> >>>> Hope this helps, >>>> Michael >>>> >>>> >>>> >>>> >>>> On Tue, Feb 28, 2017 at 8:01 AM, Sachin Mittal <sjmit...@gmail.com> wrote: >>>> >>>>> I had a question regarding >>>>> http://docs.confluent.io/3.1.2/streams/developer-guide. >>>>> html#enable-disable-state-store-changelogs >>>>> >>>>> When disabling this in 0.10.2 what does this exactly means. >>>>> Dos this means no longer any rocksdb state store would get created? >>>>> >>>>> On this subject we had started with spark streaming, but we ran into >>>>> memory >>>>> issues and the hardware we have got is not so fantastic to support spark >>>>> streaming. >>>>> >>>>> So we switched to high level DSL kafka streaming . >>>>> >>>>> I think if your source is kafka queues, kafka streaming is good and simple >>>>> to use. However you need to plan ahead as anticipate the (max) load and >>>>> create adequate partitions based on some key on which aggregations can be >>>>> performed independently. >>>>> >>>>> Then you can run cluster of stream threads (same and multiple machines), >>>>> each processing a partition. >>>>> >>>>> Having said this, we however run into lot of issues of frequent stream >>>>> re-balance, especially when we have multiple instances of rocks db running >>>>> on a single machine. >>>>> Now we don't know if this is some bad VM configuration issue or some >>>>> problem with kafka streams/rocks db integration, we are still working on >>>>> that. >>>>> >>>>> So I would suggest if you partition your data well enough and have single >>>>> streams thread consuming only one partition and not many instances of >>>>> rocksdb created on a single machine, the overall applications runs fine. >>>>> Also make sure not to create big time windows and set a not so long >>>>> retention time, so that state stores size is limited. >>>>> >>>>> We use a sliding 5 minutes window of size 10 minutes and retention of 30 >>>>> minutes and see overall performance much better than say 30 minutes >>>>> sliding >>>>> of size 1 hour and retention of 3 hours. >>>>> >>>>> So to conclude if you can manage rocks db, then kafka streams is good to >>>>> start with, its simple and very intuitive to use. >>>>> >>>>> Again on rocksdb side, is there a way to eliminate that and is >>>>> >>>>> disableLogging >>>>> >>>>> for that? >>>>> >>>>> Thanks >>>>> Sachin >>>>> >>>>> >>>>> >>>>> On Mon, Feb 27, 2017 at 7:47 PM, Michael Noll <mich...@confluent.io> >>>>> wrote: >>>>> >>>>>>> Also, is it possible to stop the syncing between state stores to >>>>> brokers, >>>>>> if I am fine with failures? >>>>>> >>>>>> Yes, you can disable the syncing (or the "changelog" feature) of state >>>>>> stores: >>>>>> http://docs.confluent.io/current/streams/developer- >>>>>> guide.html#enable-disable-state-store-changelogs >>>>>> >>>>>>> I do have a Spark Cluster, but I am not convince how Spark Streaming >>>>> can >>>>>> do this differently. >>>>>>> Guozhang, could you comment anything regarding Kafka Streams vs Spark >>>>>> Streaming, especially >>>>>>> in terms of aggregations/groupbys/joins implementation logic? >>>>>> >>>>>> As you are hinting at yourself, if you want fault-tolerant state, then >>>>> this >>>>>> fault tolerance comes at a price (in Kafka Streams, this is achieved by >>>>>> changelog-ing state stores). Other tools such as Flink or Spark work in >>>>> a >>>>>> similar fashion, there's no free lunch. >>>>>> >>>>>> One option, which you brought up above, is to disable the fault tolerance >>>>>> functionality for state by disabling the changelogs of state stores (see >>>>>> above). Another option is to leverage Kafka's record caching for Kafka >>>>>> Streams, which does lower the amount of data that is sent across the >>>>>> network (from your app's state store changelogs to the Kafka cluster and >>>>>> vice versa), though you may need to tune some parameters in your >>>>> situation >>>>>> because your key space has high cardinality and message volume per key is >>>>>> relatively low (= you don't benefit as much from record caching as most >>>>>> other users/use cases). >>>>>> >>>>>> >>>>>> -Michael >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Feb 27, 2017 at 2:42 PM, Tianji Li <skyah...@gmail.com> wrote: >>>>>> >>>>>>> Hi Guozhang and Kohki, >>>>>>> >>>>>>> Thanks for your replies. >>>>>>> >>>>>>> I think I know how to deal with partitioning now, but I am still not >>>>> sure >>>>>>> how to deal with the traffic between the hidden state store sizes and >>>>>> Kafka >>>>>>> Brokers (same as Kohki). >>>>>>> >>>>>>> I feel like the easiest thing to do is to set a larger commit window, >>>>> so >>>>>>> that the state stores are synced to brokers slower than default. >>>>>>> >>>>>>> I do have a Spark Cluster, but I am not convince how Spark Streaming >>>>> can >>>>>>> do this differently. Guozhang, could you comment anything regarding >>>>> Kafka >>>>>>> Streams vs Spark Streaming, especially in terms of >>>>>>> aggregations/groupbys/joins implementation logic? >>>>>>> >>>>>>> Also, is it possible to stop the syncing between state stores to >>>>> brokers, >>>>>>> if I am fine with failures? >>>>>>> >>>>>>> Thanks >>>>>>> Tianji >>>>>>> >>>>>>> >>>>>>> On 2017-02-26 23:52 (-0500), Guozhang Wang <wangg...@gmail.com> wrote: >>>>>>>> Hello Tianji, >>>>>>>> >>>>>>>> As Kohki mentioned, in Streams joins and aggregations are always done >>>>>>>> pre-partitioned, and hence locally. So there won't be any inter-node >>>>>>>> communications needed to execute the join / aggregations. Also they >>>>> can >>>>>>> be >>>>>>>> hosted as persistent local state stores so you don't need to keep >>>>> them >>>>>> in >>>>>>>> memory. So for example if you partition your data with K1 / K2, then >>>>>> data >>>>>>>> with the same values in combo (K1, K2) will always goes to the same >>>>>>>> partition, and hence good for aggregations / joins on either K1, K2, >>>>> or >>>>>>>> combo(K1, K2), but not sufficient for combo(K1, K2, K3, K4), as data >>>>>> with >>>>>>>> the same values of K3 / K4 might still goes to different partitions >>>>>>>> processed by different Streams instances. >>>>>>>> >>>>>>>> So what you want is really to partition based on the "maximum >>>>> superset" >>>>>>> of >>>>>>>> all the involved keys. Note that with the superset of all the keys >>>>> one >>>>>>>> thing to watch out is the even distribution of the partitions. If it >>>>> is >>>>>>> not >>>>>>>> evenly distributed, then some instance might become hot points. This >>>>>> can >>>>>>> be >>>>>>>> tackled by customizing the "PartitionGrouper" interface in Streams, >>>>>> which >>>>>>>> indicates which set of partitions will be assigned to each of the >>>>> tasks >>>>>>> (by >>>>>>>> default each one partition from the source topics will form a task, >>>>> and >>>>>>>> task is the unit of parallelism in Streams). >>>>>>>> >>>>>>>> Hope this helps. >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Feb 26, 2017 at 10:57 AM, Kohki Nishio <tarop...@gmail.com> >>>>>>> wrote: >>>>>>>> >>>>>>>>> Tianji, >>>>>>>>> KStream is indeed Append mode as long as I do stateless processing, >>>>>> but >>>>>>>>> when you do aggregation that is a stateful operation and it turns >>>>> to >>>>>>> KTable >>>>>>>>> and that does Update mode. >>>>>>>>> >>>>>>>>> In regard to your aggregation, I believe Kafka's aggregation works >>>>>> for >>>>>>> a >>>>>>>>> single partition not over multiple partitions, are you doing 100 >>>>>>>>> different aggregation against record key ? Then you should have a >>>>>>> single >>>>>>>>> data object for those 100 values, anyway it sounds like we have >>>>>> similar >>>>>>>>> problem .. >>>>>>>>> >>>>>>>>> -Kohki >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sat, Feb 25, 2017 at 1:11 PM, Tianji Li <skyah...@gmail.com> >>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Kohki, >>>>>>>>>> >>>>>>>>>> Thanks very much for providing your investigation results. >>>>>> Regarding >>>>>>>>>> 'append' mode with Kafka Streams, isn't KStream the thing you >>>>> want? >>>>>>>>>> >>>>>>>>>> Hi Guozhang, >>>>>>>>>> >>>>>>>>>> Thanks for the pointers to the two blogs. I read one of them >>>>> before >>>>>>> and >>>>>>>>>> just had a look at the other one. >>>>>>>>>> >>>>>>>>>> What I am hoping to do is below, can you help me decide if Kafka >>>>>>> Stream >>>>>>>>> is >>>>>>>>>> a good fit? >>>>>>>>>> >>>>>>>>>> We have a few data sources, and we are hoping to correlate these >>>>>>> sources, >>>>>>>>>> and then do aggregations, as *a stream in real-time*. >>>>>>>>>> >>>>>>>>>> The number of aggregations is around 100 which means, if using >>>>>> Kafka >>>>>>>>>> Streams, we need to maintain around 100 state stores with 100 >>>>>>> change-log >>>>>>>>>> topics behind >>>>>>>>>> the scene when joining and aggregations. >>>>>>>>>> >>>>>>>>>> The number of unique entries in each of these state stores is >>>>>>> expected to >>>>>>>>>> be at the level of < 100M. The size of each record is around 1K >>>>>>> bytes and >>>>>>>>>> so, >>>>>>>>>> each state is expected to be ~100G bytes in size. The total >>>>> number >>>>>> of >>>>>>>>>> bytes in all these state stores is thus around 10T bytes. >>>>>>>>>> >>>>>>>>>> If keeping all these stores in memory, this translates into >>>>> around >>>>>> 50 >>>>>>>>>> machines with 256Gbytes for this purpose alone. >>>>>>>>>> >>>>>>>>>> Plus, the incoming raw data rate could reach 10M records per >>>>> second >>>>>>> in >>>>>>>>>> peak hours. So, during aggregation, data movement between Kafka >>>>>>> Streams >>>>>>>>>> instances >>>>>>>>>> will be heavy, i.e., 10M records per second in the cluster for >>>>>>> joining >>>>>>>>> and >>>>>>>>>> aggregations. >>>>>>>>>> >>>>>>>>>> Is Kafka Streams good for this? My gut feeling is Kafka Streams >>>>> is >>>>>>> fine. >>>>>>>>>> But I'd like to run this by you. >>>>>>>>>> >>>>>>>>>> And, I am hoping to minimize data movement (to saving bandwidth) >>>>>>> during >>>>>>>>>> joins/groupBys. If I partition the raw data with the minimum >>>>> subset >>>>>>> of >>>>>>>>>> aggregation keys (say K1 and K2), then I wonder if the following >>>>>>>>>> joins/groupBys (say on keys K1, K2, K3, K4) happen on local data, >>>>>> if >>>>>>>>> using >>>>>>>>>> DSL? >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Tianji >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 2017-02-25 13:49 (-0500), Guozhang Wang <w...@gmail.com> >>>>> wrote: >>>>>>>>>>> Hello Kohki,> >>>>>>>>>>> >>>>>>>>>>> Thanks for the email. I'd like to learn what's your concern of >>>>>> the >>>>>>> size >>>>>>>>>> of> >>>>>>>>>>> the state store? From your description it's a bit hard to >>>>> figure >>>>>>> out >>>>>>>>> but> >>>>>>>>>>> I'd guess you have lots of state stores while each of them are >>>>>>>>>> relatively> >>>>>>>>>>> small?> >>>>>>>>>>> >>>>>>>>>>> Hello Tianji,> >>>>>>>>>>> >>>>>>>>>>> Regarding your question about maturity and users of Streams, >>>>> you >>>>>>> can >>>>>>>>>> take a> >>>>>>>>>>> look at a bunch of the blog posts written about their Streams >>>>>>> usage in> >>>>>>>>>>> production, for example:> >>>>>>>>>>> >>>>>>>>>>> http://engineering.skybettingandgaming.com/2017/01/23/ >>>>>>>>>> streaming-architectures/> >>>>>>>>>>> >>>>>>>>>>> http://developers.linecorp.com/blog/?p=3960> >>>>>>>>>>> >>>>>>>>>>> Guozhang> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sat, Feb 25, 2017 at 7:52 AM, Kohki Nishio <ta...@gmail.com >>>>>> >>>>>>>>> wrote:> >>>>>>>>>>> >>>>>>>>>>>> I did a bit of research on that matter recently, the >>>>> comparison >>>>>>> is >>>>>>>>>> between> >>>>>>>>>>>> Spark Structured Streaming(SSS) and Kafka Streams,> >>>>>>>>>>>>> >>>>>>>>>>>> Both are relatively new (~1y) and trying to solve similar >>>>>>> problems, >>>>>>>>>> however> >>>>>>>>>>>> if you go with Spark, you have to go with a cluster, if your >>>>>>>>>> environment> >>>>>>>>>>>> already have a cluster, then it's good. However our team >>>>>> doesn't >>>>>>> do >>>>>>>>>> any> >>>>>>>>>>>> Spark, so the initial cost would be very high. On the other >>>>>> hand, >>>>>>>>>> Kafka> >>>>>>>>>>>> Streams is a java library, since we have a service framework, >>>>>>> doing >>>>>>>>>> stream> >>>>>>>>>>>> inside a service is super easy.> >>>>>>>>>>>>> >>>>>>>>>>>> However for some reason, people see SSS is more mature and >>>>>> Kafka >>>>>>>>>> Streams is> >>>>>>>>>>>> not so mature (like Beta). But old fashion stream is both >>>>>> mature >>>>>>>>>> enough (in> >>>>>>>>>>>> my opinion), I didn't see any difference in DStream(Spark) >>>>> and> >>>>>>>>>>>> KStream(Kafka)> >>>>>>>>>>>>> >>>>>>>>>>>> DataFrame (Structured Streaming) and KTable, I found it quite >>>>>>>>>> different.> >>>>>>>>>>>> Kafka's model is more like a change log, that means you need >>>>> to >>>>>>> see >>>>>>>>>> the> >>>>>>>>>>>> latest entry to make a final decision. I would call this as >>>>>>> 'Update' >>>>>>>>>> model,> >>>>>>>>>>>> whereas Spark does 'Append' model and it doesn't support >>>>>> 'Update' >>>>>>>>>> model> >>>>>>>>>>>> yet. (it's coming to 2.2)> >>>>>>>>>>>>> >>>>>>>>>>>> http://spark.apache.org/docs/latest/structured-streaming-pro >>>>>> >>>>>>>>>>>> gramming-guide.html#output-modes> >>>>>>>>>>>>> >>>>>>>>>>>> I wanted to have 'Append' model with Kafka, but it seems it's >>>>>> not >>>>>>>>> easy> >>>>>>>>>>>> thing to do, also Kafka Streams uses an internal topic to >>>>> keep >>>>>>> state> >>>>>>>>>>>> changes for fail-over scenario, but I'm dealing with a lots >>>>> of >>>>>>> tiny> >>>>>>>>>>>> information and I have a big concern about the size of the >>>>>> state >>>>>>>>> store >>>>>>>>>> /> >>>>>>>>>>>> topic, so my decision is that I'm going with my own handling >>>>> of >>>>>>> Kafka >>>>>>>>>> API> >>>>>>>>>>>> ..> >>>>>>>>>>>>> >>>>>>>>>>>> If you do stateless operation and don't have a spark cluster, >>>>>>> yeah >>>>>>>>>> Kafka> >>>>>>>>>>>> Streams is perfect.> >>>>>>>>>>>> If you do stateful complicated operation and happen to have a >>>>>>> spark> >>>>>>>>>>>> cluster, give Spark a try> >>>>>>>>>>>> else you have to write a code which is optimized for your use >>>>>>> case> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> thanks> >>>>>>>>>>>> -Kohki> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> On Fri, Feb 24, 2017 at 6:22 PM, Tianji Li <sk...@gmail.com> >>>>>>> wrote:> >>>>>>>>>>>>> >>>>>>>>>>>>> Hi there,> >>>>>>>>>>>>>> >>>>>>>>>>>>> Can anyone give a good explanation in what cases Kafka >>>>>> Streams >>>>>>> is> >>>>>>>>>>>>> preferred, and in what cases Sparking Streaming is better?> >>>>>>>>>>>>>> >>>>>>>>>>>>> Thanks> >>>>>>>>>>>>> Tianji> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> --> >>>>>>>>>>>> Kohki Nishio> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- > >>>>>>>>>>> -- Guozhang> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Kohki Nishio >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> >
signature.asc
Description: OpenPGP digital signature