Thanks Matthias for this information. But it seems you are talking about a logged store, since you mention the changelog topic and replaying it and whatnot.
But my question specifically was about *unlogged* state stores, where there is no such changelog topic available. Sorry if that wasn't clear before. Or am I misunderstanding? > On Feb 28, 2017, at 9:12 AM, Matthias J. Sax <matth...@confluent.io> wrote: > > If a store is backed by a changelog topic, the changelog topic is > responsible to hold the latest state of the store. Thus, the topic must > store the latest value per key. For this, we use a compacted topic. > > If case of restore, the local RocksDB store is cleared so it is empty, > and we read the complete changelog topic an apply those updates to the > store. > > This allows a fast recovery, because no source topic rewind and not > reprocessing is required. Furthermore, because the changelog topic is > compacted, it is roughly the size of the number of distinct keys in the > store -- this also reduced recovery time as you don't need to replay > every update to the store. > > We are currently working on an optimization, that allows us to only > reply the tail to the changelog topic in certain cases to get the store > back into a valid state: See > https://issues.apache.org/jira/browse/KAFKA-4317 > > Furthermore, changelog topic allow to maintain StandbyTask -- those > tasks only apply all updates to the changelog topic (that are written by > the main task maintaining the store) to a local copy of the store. Thus, > in case of fail-over those StandbyTasks can replace a failed task and > because they have a copy of the state, they can take over even more > quickly than a newly created tasks that needs to reply the changelog to > rebuild the state first. > > > > -Matthias > > On 2/28/17 8:17 AM, Steven Schlansker wrote: >> >>> On Feb 28, 2017, at 12:17 AM, Michael Noll <mich...@confluent.io> wrote: >>> >>> Sachin, >>> >>> disabling (change)logging for state stores disables the fault-tolerance of >>> the state store -- i.e. changes to the state store will not be backed up to >>> Kafka, regardless of whether the store uses a RocksDB store, an in-memory >>> store, or something else >> >> One thing I've wanted is a more concrete description of this failure mode. >> What exactly is the process to recover from such a "failed" state store? >> >> Does Kafka Streams rewind the source topic and replay? (Including any >> Processors you may have wired up?) >> Does the state store remain faulted? Can an administrator fix it by >> resetting some offsets? >> >> I looked around both in the project and Confluent documentation and didn't >> really find >> an answer to how non-logged state stores fail or recover. >> >> Thanks for any insight! >> >>> >>> >>>> When disabling this in 0.10.2 what does this exactly means. >>> >>> See above. >>> >>> >>>> Does this means no longer any rocksdb state store would get created? >>> >>> No, local state stores will still be created. By default, the storage >>> engine is RocksDB, so if you disable changelogging then you will still have >>> local RocksDB stores (as usual) but those stores will not be backed up to >>> Kafka behind the scenes. If, in this situation, you lose a machine that >>> has local RocksDB stores, then this state data is lost, too. >>> >>> So there are two different things at play here: >>> >>> 1. Whether you want to enable or disable (change)logging of state store, >>> and thus to enable/disable fault-tolerant state stores. >>> >>> 2. Which storage engine you want to use for the state stores. The default >>> is RocksDB. >>> >>> If, for (2), you do not want to have RocksDB state stores, you can switch >>> the storage engine to e.g. the in-memory store. However, when you do >>> switch from RocksDB to in-memory then all your state store's data must fit >>> into memory (obviously), otherwise you'll run OOM. >>> >>> In summary, you can have either of the following: >>> >>> a. RocksDB state stores with changelogging enabled (= fault-tolerant >>> stores). >>> >>> b. RocksDB state stores with changelogging disabled (= stores are not >>> fault-tolerant, you may suffer from data loss during e.g. machine failures). >>> >>> c. In-memory state stores with changelogging enabled (= fault-tolerant >>> stores). But careful: you may run OOM if the state data does not fit into >>> the available memory. >>> >>> d. In-memory state stores with changelogging disabled (= stores are not >>> fault-tolerant, you may suffer from data loss during e.g. machine >>> failures). But careful: you may run OOM if the state data does not fit into >>> the available memory. >>> >>> >>> Hope this helps, >>> Michael >>> >>> >>> >>> >>> On Tue, Feb 28, 2017 at 8:01 AM, Sachin Mittal <sjmit...@gmail.com> wrote: >>> >>>> I had a question regarding >>>> http://docs.confluent.io/3.1.2/streams/developer-guide. >>>> html#enable-disable-state-store-changelogs >>>> >>>> When disabling this in 0.10.2 what does this exactly means. >>>> Dos this means no longer any rocksdb state store would get created? >>>> >>>> On this subject we had started with spark streaming, but we ran into memory >>>> issues and the hardware we have got is not so fantastic to support spark >>>> streaming. >>>> >>>> So we switched to high level DSL kafka streaming . >>>> >>>> I think if your source is kafka queues, kafka streaming is good and simple >>>> to use. However you need to plan ahead as anticipate the (max) load and >>>> create adequate partitions based on some key on which aggregations can be >>>> performed independently. >>>> >>>> Then you can run cluster of stream threads (same and multiple machines), >>>> each processing a partition. >>>> >>>> Having said this, we however run into lot of issues of frequent stream >>>> re-balance, especially when we have multiple instances of rocks db running >>>> on a single machine. >>>> Now we don't know if this is some bad VM configuration issue or some >>>> problem with kafka streams/rocks db integration, we are still working on >>>> that. >>>> >>>> So I would suggest if you partition your data well enough and have single >>>> streams thread consuming only one partition and not many instances of >>>> rocksdb created on a single machine, the overall applications runs fine. >>>> Also make sure not to create big time windows and set a not so long >>>> retention time, so that state stores size is limited. >>>> >>>> We use a sliding 5 minutes window of size 10 minutes and retention of 30 >>>> minutes and see overall performance much better than say 30 minutes sliding >>>> of size 1 hour and retention of 3 hours. >>>> >>>> So to conclude if you can manage rocks db, then kafka streams is good to >>>> start with, its simple and very intuitive to use. >>>> >>>> Again on rocksdb side, is there a way to eliminate that and is >>>> >>>> disableLogging >>>> >>>> for that? >>>> >>>> Thanks >>>> Sachin >>>> >>>> >>>> >>>> On Mon, Feb 27, 2017 at 7:47 PM, Michael Noll <mich...@confluent.io> >>>> wrote: >>>> >>>>>> Also, is it possible to stop the syncing between state stores to >>>> brokers, >>>>> if I am fine with failures? >>>>> >>>>> Yes, you can disable the syncing (or the "changelog" feature) of state >>>>> stores: >>>>> http://docs.confluent.io/current/streams/developer- >>>>> guide.html#enable-disable-state-store-changelogs >>>>> >>>>>> I do have a Spark Cluster, but I am not convince how Spark Streaming >>>> can >>>>> do this differently. >>>>>> Guozhang, could you comment anything regarding Kafka Streams vs Spark >>>>> Streaming, especially >>>>>> in terms of aggregations/groupbys/joins implementation logic? >>>>> >>>>> As you are hinting at yourself, if you want fault-tolerant state, then >>>> this >>>>> fault tolerance comes at a price (in Kafka Streams, this is achieved by >>>>> changelog-ing state stores). Other tools such as Flink or Spark work in >>>> a >>>>> similar fashion, there's no free lunch. >>>>> >>>>> One option, which you brought up above, is to disable the fault tolerance >>>>> functionality for state by disabling the changelogs of state stores (see >>>>> above). Another option is to leverage Kafka's record caching for Kafka >>>>> Streams, which does lower the amount of data that is sent across the >>>>> network (from your app's state store changelogs to the Kafka cluster and >>>>> vice versa), though you may need to tune some parameters in your >>>> situation >>>>> because your key space has high cardinality and message volume per key is >>>>> relatively low (= you don't benefit as much from record caching as most >>>>> other users/use cases). >>>>> >>>>> >>>>> -Michael >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Feb 27, 2017 at 2:42 PM, Tianji Li <skyah...@gmail.com> wrote: >>>>> >>>>>> Hi Guozhang and Kohki, >>>>>> >>>>>> Thanks for your replies. >>>>>> >>>>>> I think I know how to deal with partitioning now, but I am still not >>>> sure >>>>>> how to deal with the traffic between the hidden state store sizes and >>>>> Kafka >>>>>> Brokers (same as Kohki). >>>>>> >>>>>> I feel like the easiest thing to do is to set a larger commit window, >>>> so >>>>>> that the state stores are synced to brokers slower than default. >>>>>> >>>>>> I do have a Spark Cluster, but I am not convince how Spark Streaming >>>> can >>>>>> do this differently. Guozhang, could you comment anything regarding >>>> Kafka >>>>>> Streams vs Spark Streaming, especially in terms of >>>>>> aggregations/groupbys/joins implementation logic? >>>>>> >>>>>> Also, is it possible to stop the syncing between state stores to >>>> brokers, >>>>>> if I am fine with failures? >>>>>> >>>>>> Thanks >>>>>> Tianji >>>>>> >>>>>> >>>>>> On 2017-02-26 23:52 (-0500), Guozhang Wang <wangg...@gmail.com> wrote: >>>>>>> Hello Tianji, >>>>>>> >>>>>>> As Kohki mentioned, in Streams joins and aggregations are always done >>>>>>> pre-partitioned, and hence locally. So there won't be any inter-node >>>>>>> communications needed to execute the join / aggregations. Also they >>>> can >>>>>> be >>>>>>> hosted as persistent local state stores so you don't need to keep >>>> them >>>>> in >>>>>>> memory. So for example if you partition your data with K1 / K2, then >>>>> data >>>>>>> with the same values in combo (K1, K2) will always goes to the same >>>>>>> partition, and hence good for aggregations / joins on either K1, K2, >>>> or >>>>>>> combo(K1, K2), but not sufficient for combo(K1, K2, K3, K4), as data >>>>> with >>>>>>> the same values of K3 / K4 might still goes to different partitions >>>>>>> processed by different Streams instances. >>>>>>> >>>>>>> So what you want is really to partition based on the "maximum >>>> superset" >>>>>> of >>>>>>> all the involved keys. Note that with the superset of all the keys >>>> one >>>>>>> thing to watch out is the even distribution of the partitions. If it >>>> is >>>>>> not >>>>>>> evenly distributed, then some instance might become hot points. This >>>>> can >>>>>> be >>>>>>> tackled by customizing the "PartitionGrouper" interface in Streams, >>>>> which >>>>>>> indicates which set of partitions will be assigned to each of the >>>> tasks >>>>>> (by >>>>>>> default each one partition from the source topics will form a task, >>>> and >>>>>>> task is the unit of parallelism in Streams). >>>>>>> >>>>>>> Hope this helps. >>>>>>> >>>>>>> Guozhang >>>>>>> >>>>>>> >>>>>>> On Sun, Feb 26, 2017 at 10:57 AM, Kohki Nishio <tarop...@gmail.com> >>>>>> wrote: >>>>>>> >>>>>>>> Tianji, >>>>>>>> KStream is indeed Append mode as long as I do stateless processing, >>>>> but >>>>>>>> when you do aggregation that is a stateful operation and it turns >>>> to >>>>>> KTable >>>>>>>> and that does Update mode. >>>>>>>> >>>>>>>> In regard to your aggregation, I believe Kafka's aggregation works >>>>> for >>>>>> a >>>>>>>> single partition not over multiple partitions, are you doing 100 >>>>>>>> different aggregation against record key ? Then you should have a >>>>>> single >>>>>>>> data object for those 100 values, anyway it sounds like we have >>>>> similar >>>>>>>> problem .. >>>>>>>> >>>>>>>> -Kohki >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sat, Feb 25, 2017 at 1:11 PM, Tianji Li <skyah...@gmail.com> >>>>> wrote: >>>>>>>> >>>>>>>>> Hi Kohki, >>>>>>>>> >>>>>>>>> Thanks very much for providing your investigation results. >>>>> Regarding >>>>>>>>> 'append' mode with Kafka Streams, isn't KStream the thing you >>>> want? >>>>>>>>> >>>>>>>>> Hi Guozhang, >>>>>>>>> >>>>>>>>> Thanks for the pointers to the two blogs. I read one of them >>>> before >>>>>> and >>>>>>>>> just had a look at the other one. >>>>>>>>> >>>>>>>>> What I am hoping to do is below, can you help me decide if Kafka >>>>>> Stream >>>>>>>> is >>>>>>>>> a good fit? >>>>>>>>> >>>>>>>>> We have a few data sources, and we are hoping to correlate these >>>>>> sources, >>>>>>>>> and then do aggregations, as *a stream in real-time*. >>>>>>>>> >>>>>>>>> The number of aggregations is around 100 which means, if using >>>>> Kafka >>>>>>>>> Streams, we need to maintain around 100 state stores with 100 >>>>>> change-log >>>>>>>>> topics behind >>>>>>>>> the scene when joining and aggregations. >>>>>>>>> >>>>>>>>> The number of unique entries in each of these state stores is >>>>>> expected to >>>>>>>>> be at the level of < 100M. The size of each record is around 1K >>>>>> bytes and >>>>>>>>> so, >>>>>>>>> each state is expected to be ~100G bytes in size. The total >>>> number >>>>> of >>>>>>>>> bytes in all these state stores is thus around 10T bytes. >>>>>>>>> >>>>>>>>> If keeping all these stores in memory, this translates into >>>> around >>>>> 50 >>>>>>>>> machines with 256Gbytes for this purpose alone. >>>>>>>>> >>>>>>>>> Plus, the incoming raw data rate could reach 10M records per >>>> second >>>>>> in >>>>>>>>> peak hours. So, during aggregation, data movement between Kafka >>>>>> Streams >>>>>>>>> instances >>>>>>>>> will be heavy, i.e., 10M records per second in the cluster for >>>>>> joining >>>>>>>> and >>>>>>>>> aggregations. >>>>>>>>> >>>>>>>>> Is Kafka Streams good for this? My gut feeling is Kafka Streams >>>> is >>>>>> fine. >>>>>>>>> But I'd like to run this by you. >>>>>>>>> >>>>>>>>> And, I am hoping to minimize data movement (to saving bandwidth) >>>>>> during >>>>>>>>> joins/groupBys. If I partition the raw data with the minimum >>>> subset >>>>>> of >>>>>>>>> aggregation keys (say K1 and K2), then I wonder if the following >>>>>>>>> joins/groupBys (say on keys K1, K2, K3, K4) happen on local data, >>>>> if >>>>>>>> using >>>>>>>>> DSL? >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Tianji >>>>>>>>> >>>>>>>>> >>>>>>>>> On 2017-02-25 13:49 (-0500), Guozhang Wang <w...@gmail.com> >>>> wrote: >>>>>>>>>> Hello Kohki,> >>>>>>>>>> >>>>>>>>>> Thanks for the email. I'd like to learn what's your concern of >>>>> the >>>>>> size >>>>>>>>> of> >>>>>>>>>> the state store? From your description it's a bit hard to >>>> figure >>>>>> out >>>>>>>> but> >>>>>>>>>> I'd guess you have lots of state stores while each of them are >>>>>>>>> relatively> >>>>>>>>>> small?> >>>>>>>>>> >>>>>>>>>> Hello Tianji,> >>>>>>>>>> >>>>>>>>>> Regarding your question about maturity and users of Streams, >>>> you >>>>>> can >>>>>>>>> take a> >>>>>>>>>> look at a bunch of the blog posts written about their Streams >>>>>> usage in> >>>>>>>>>> production, for example:> >>>>>>>>>> >>>>>>>>>> http://engineering.skybettingandgaming.com/2017/01/23/ >>>>>>>>> streaming-architectures/> >>>>>>>>>> >>>>>>>>>> http://developers.linecorp.com/blog/?p=3960> >>>>>>>>>> >>>>>>>>>> Guozhang> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sat, Feb 25, 2017 at 7:52 AM, Kohki Nishio <ta...@gmail.com >>>>> >>>>>>>> wrote:> >>>>>>>>>> >>>>>>>>>>> I did a bit of research on that matter recently, the >>>> comparison >>>>>> is >>>>>>>>> between> >>>>>>>>>>> Spark Structured Streaming(SSS) and Kafka Streams,> >>>>>>>>>>>> >>>>>>>>>>> Both are relatively new (~1y) and trying to solve similar >>>>>> problems, >>>>>>>>> however> >>>>>>>>>>> if you go with Spark, you have to go with a cluster, if your >>>>>>>>> environment> >>>>>>>>>>> already have a cluster, then it's good. However our team >>>>> doesn't >>>>>> do >>>>>>>>> any> >>>>>>>>>>> Spark, so the initial cost would be very high. On the other >>>>> hand, >>>>>>>>> Kafka> >>>>>>>>>>> Streams is a java library, since we have a service framework, >>>>>> doing >>>>>>>>> stream> >>>>>>>>>>> inside a service is super easy.> >>>>>>>>>>>> >>>>>>>>>>> However for some reason, people see SSS is more mature and >>>>> Kafka >>>>>>>>> Streams is> >>>>>>>>>>> not so mature (like Beta). But old fashion stream is both >>>>> mature >>>>>>>>> enough (in> >>>>>>>>>>> my opinion), I didn't see any difference in DStream(Spark) >>>> and> >>>>>>>>>>> KStream(Kafka)> >>>>>>>>>>>> >>>>>>>>>>> DataFrame (Structured Streaming) and KTable, I found it quite >>>>>>>>> different.> >>>>>>>>>>> Kafka's model is more like a change log, that means you need >>>> to >>>>>> see >>>>>>>>> the> >>>>>>>>>>> latest entry to make a final decision. I would call this as >>>>>> 'Update' >>>>>>>>> model,> >>>>>>>>>>> whereas Spark does 'Append' model and it doesn't support >>>>> 'Update' >>>>>>>>> model> >>>>>>>>>>> yet. (it's coming to 2.2)> >>>>>>>>>>>> >>>>>>>>>>> http://spark.apache.org/docs/latest/structured-streaming-pro >>>>> >>>>>>>>>>> gramming-guide.html#output-modes> >>>>>>>>>>>> >>>>>>>>>>> I wanted to have 'Append' model with Kafka, but it seems it's >>>>> not >>>>>>>> easy> >>>>>>>>>>> thing to do, also Kafka Streams uses an internal topic to >>>> keep >>>>>> state> >>>>>>>>>>> changes for fail-over scenario, but I'm dealing with a lots >>>> of >>>>>> tiny> >>>>>>>>>>> information and I have a big concern about the size of the >>>>> state >>>>>>>> store >>>>>>>>> /> >>>>>>>>>>> topic, so my decision is that I'm going with my own handling >>>> of >>>>>> Kafka >>>>>>>>> API> >>>>>>>>>>> ..> >>>>>>>>>>>> >>>>>>>>>>> If you do stateless operation and don't have a spark cluster, >>>>>> yeah >>>>>>>>> Kafka> >>>>>>>>>>> Streams is perfect.> >>>>>>>>>>> If you do stateful complicated operation and happen to have a >>>>>> spark> >>>>>>>>>>> cluster, give Spark a try> >>>>>>>>>>> else you have to write a code which is optimized for your use >>>>>> case> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> thanks> >>>>>>>>>>> -Kohki> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> On Fri, Feb 24, 2017 at 6:22 PM, Tianji Li <sk...@gmail.com> >>>>>> wrote:> >>>>>>>>>>>> >>>>>>>>>>>> Hi there,> >>>>>>>>>>>>> >>>>>>>>>>>> Can anyone give a good explanation in what cases Kafka >>>>> Streams >>>>>> is> >>>>>>>>>>>> preferred, and in what cases Sparking Streaming is better?> >>>>>>>>>>>>> >>>>>>>>>>>> Thanks> >>>>>>>>>>>> Tianji> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> --> >>>>>>>>>>> Kohki Nishio> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- > >>>>>>>>>> -- Guozhang> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Kohki Nishio >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> -- Guozhang >>>>>>> >>>>>> >>>>> >>>> >> >
signature.asc
Description: Message signed with OpenPGP using GPGMail