Steven, I guess my last answer was not completely correct. You might start with a new store, if the task gets moved to a different machine. Otherwise, we don't explicitly wipe out the store, but just reuse it in whatever state it is on restart.
-Matthias On 2/28/17 2:19 PM, Matthias J. Sax wrote: > Sorry. Miss understood your question. > > For a non-logged store, in case of failure, we wipe out the entire state > (IIRC) -- thus, you will start with an empty state after recovery. > > > -Matthias > > > On 2/28/17 1:36 PM, Steven Schlansker wrote: >> Thanks Matthias for this information. But it seems you are talking about a >> logged store, since you mention the changelog topic and replaying it and >> whatnot. >> >> But my question specifically was about *unlogged* state stores, where there >> is no >> such changelog topic available. Sorry if that wasn't clear before. Or am I >> misunderstanding? >> >>> On Feb 28, 2017, at 9:12 AM, Matthias J. Sax <matth...@confluent.io> wrote: >>> >>> If a store is backed by a changelog topic, the changelog topic is >>> responsible to hold the latest state of the store. Thus, the topic must >>> store the latest value per key. For this, we use a compacted topic. >>> >>> If case of restore, the local RocksDB store is cleared so it is empty, >>> and we read the complete changelog topic an apply those updates to the >>> store. >>> >>> This allows a fast recovery, because no source topic rewind and not >>> reprocessing is required. Furthermore, because the changelog topic is >>> compacted, it is roughly the size of the number of distinct keys in the >>> store -- this also reduced recovery time as you don't need to replay >>> every update to the store. >>> >>> We are currently working on an optimization, that allows us to only >>> reply the tail to the changelog topic in certain cases to get the store >>> back into a valid state: See >>> https://issues.apache.org/jira/browse/KAFKA-4317 >>> >>> Furthermore, changelog topic allow to maintain StandbyTask -- those >>> tasks only apply all updates to the changelog topic (that are written by >>> the main task maintaining the store) to a local copy of the store. Thus, >>> in case of fail-over those StandbyTasks can replace a failed task and >>> because they have a copy of the state, they can take over even more >>> quickly than a newly created tasks that needs to reply the changelog to >>> rebuild the state first. >>> >>> >>> >>> -Matthias >>> >>> On 2/28/17 8:17 AM, Steven Schlansker wrote: >>>> >>>>> On Feb 28, 2017, at 12:17 AM, Michael Noll <mich...@confluent.io> wrote: >>>>> >>>>> Sachin, >>>>> >>>>> disabling (change)logging for state stores disables the fault-tolerance of >>>>> the state store -- i.e. changes to the state store will not be backed up >>>>> to >>>>> Kafka, regardless of whether the store uses a RocksDB store, an in-memory >>>>> store, or something else >>>> >>>> One thing I've wanted is a more concrete description of this failure mode. >>>> What exactly is the process to recover from such a "failed" state store? >>>> >>>> Does Kafka Streams rewind the source topic and replay? (Including any >>>> Processors you may have wired up?) >>>> Does the state store remain faulted? Can an administrator fix it by >>>> resetting some offsets? >>>> >>>> I looked around both in the project and Confluent documentation and didn't >>>> really find >>>> an answer to how non-logged state stores fail or recover. >>>> >>>> Thanks for any insight! >>>> >>>>> >>>>> >>>>>> When disabling this in 0.10.2 what does this exactly means. >>>>> >>>>> See above. >>>>> >>>>> >>>>>> Does this means no longer any rocksdb state store would get created? >>>>> >>>>> No, local state stores will still be created. By default, the storage >>>>> engine is RocksDB, so if you disable changelogging then you will still >>>>> have >>>>> local RocksDB stores (as usual) but those stores will not be backed up to >>>>> Kafka behind the scenes. If, in this situation, you lose a machine that >>>>> has local RocksDB stores, then this state data is lost, too. >>>>> >>>>> So there are two different things at play here: >>>>> >>>>> 1. Whether you want to enable or disable (change)logging of state store, >>>>> and thus to enable/disable fault-tolerant state stores. >>>>> >>>>> 2. Which storage engine you want to use for the state stores. The default >>>>> is RocksDB. >>>>> >>>>> If, for (2), you do not want to have RocksDB state stores, you can switch >>>>> the storage engine to e.g. the in-memory store. However, when you do >>>>> switch from RocksDB to in-memory then all your state store's data must fit >>>>> into memory (obviously), otherwise you'll run OOM. >>>>> >>>>> In summary, you can have either of the following: >>>>> >>>>> a. RocksDB state stores with changelogging enabled (= fault-tolerant >>>>> stores). >>>>> >>>>> b. RocksDB state stores with changelogging disabled (= stores are not >>>>> fault-tolerant, you may suffer from data loss during e.g. machine >>>>> failures). >>>>> >>>>> c. In-memory state stores with changelogging enabled (= fault-tolerant >>>>> stores). But careful: you may run OOM if the state data does not fit into >>>>> the available memory. >>>>> >>>>> d. In-memory state stores with changelogging disabled (= stores are not >>>>> fault-tolerant, you may suffer from data loss during e.g. machine >>>>> failures). But careful: you may run OOM if the state data does not fit >>>>> into >>>>> the available memory. >>>>> >>>>> >>>>> Hope this helps, >>>>> Michael >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Feb 28, 2017 at 8:01 AM, Sachin Mittal <sjmit...@gmail.com> wrote: >>>>> >>>>>> I had a question regarding >>>>>> http://docs.confluent.io/3.1.2/streams/developer-guide. >>>>>> html#enable-disable-state-store-changelogs >>>>>> >>>>>> When disabling this in 0.10.2 what does this exactly means. >>>>>> Dos this means no longer any rocksdb state store would get created? >>>>>> >>>>>> On this subject we had started with spark streaming, but we ran into >>>>>> memory >>>>>> issues and the hardware we have got is not so fantastic to support spark >>>>>> streaming. >>>>>> >>>>>> So we switched to high level DSL kafka streaming . >>>>>> >>>>>> I think if your source is kafka queues, kafka streaming is good and >>>>>> simple >>>>>> to use. However you need to plan ahead as anticipate the (max) load and >>>>>> create adequate partitions based on some key on which aggregations can be >>>>>> performed independently. >>>>>> >>>>>> Then you can run cluster of stream threads (same and multiple machines), >>>>>> each processing a partition. >>>>>> >>>>>> Having said this, we however run into lot of issues of frequent stream >>>>>> re-balance, especially when we have multiple instances of rocks db >>>>>> running >>>>>> on a single machine. >>>>>> Now we don't know if this is some bad VM configuration issue or some >>>>>> problem with kafka streams/rocks db integration, we are still working on >>>>>> that. >>>>>> >>>>>> So I would suggest if you partition your data well enough and have single >>>>>> streams thread consuming only one partition and not many instances of >>>>>> rocksdb created on a single machine, the overall applications runs fine. >>>>>> Also make sure not to create big time windows and set a not so long >>>>>> retention time, so that state stores size is limited. >>>>>> >>>>>> We use a sliding 5 minutes window of size 10 minutes and retention of 30 >>>>>> minutes and see overall performance much better than say 30 minutes >>>>>> sliding >>>>>> of size 1 hour and retention of 3 hours. >>>>>> >>>>>> So to conclude if you can manage rocks db, then kafka streams is good to >>>>>> start with, its simple and very intuitive to use. >>>>>> >>>>>> Again on rocksdb side, is there a way to eliminate that and is >>>>>> >>>>>> disableLogging >>>>>> >>>>>> for that? >>>>>> >>>>>> Thanks >>>>>> Sachin >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Feb 27, 2017 at 7:47 PM, Michael Noll <mich...@confluent.io> >>>>>> wrote: >>>>>> >>>>>>>> Also, is it possible to stop the syncing between state stores to >>>>>> brokers, >>>>>>> if I am fine with failures? >>>>>>> >>>>>>> Yes, you can disable the syncing (or the "changelog" feature) of state >>>>>>> stores: >>>>>>> http://docs.confluent.io/current/streams/developer- >>>>>>> guide.html#enable-disable-state-store-changelogs >>>>>>> >>>>>>>> I do have a Spark Cluster, but I am not convince how Spark Streaming >>>>>> can >>>>>>> do this differently. >>>>>>>> Guozhang, could you comment anything regarding Kafka Streams vs Spark >>>>>>> Streaming, especially >>>>>>>> in terms of aggregations/groupbys/joins implementation logic? >>>>>>> >>>>>>> As you are hinting at yourself, if you want fault-tolerant state, then >>>>>> this >>>>>>> fault tolerance comes at a price (in Kafka Streams, this is achieved by >>>>>>> changelog-ing state stores). Other tools such as Flink or Spark work in >>>>>> a >>>>>>> similar fashion, there's no free lunch. >>>>>>> >>>>>>> One option, which you brought up above, is to disable the fault >>>>>>> tolerance >>>>>>> functionality for state by disabling the changelogs of state stores (see >>>>>>> above). Another option is to leverage Kafka's record caching for Kafka >>>>>>> Streams, which does lower the amount of data that is sent across the >>>>>>> network (from your app's state store changelogs to the Kafka cluster and >>>>>>> vice versa), though you may need to tune some parameters in your >>>>>> situation >>>>>>> because your key space has high cardinality and message volume per key >>>>>>> is >>>>>>> relatively low (= you don't benefit as much from record caching as most >>>>>>> other users/use cases). >>>>>>> >>>>>>> >>>>>>> -Michael >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Feb 27, 2017 at 2:42 PM, Tianji Li <skyah...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Guozhang and Kohki, >>>>>>>> >>>>>>>> Thanks for your replies. >>>>>>>> >>>>>>>> I think I know how to deal with partitioning now, but I am still not >>>>>> sure >>>>>>>> how to deal with the traffic between the hidden state store sizes and >>>>>>> Kafka >>>>>>>> Brokers (same as Kohki). >>>>>>>> >>>>>>>> I feel like the easiest thing to do is to set a larger commit window, >>>>>> so >>>>>>>> that the state stores are synced to brokers slower than default. >>>>>>>> >>>>>>>> I do have a Spark Cluster, but I am not convince how Spark Streaming >>>>>> can >>>>>>>> do this differently. Guozhang, could you comment anything regarding >>>>>> Kafka >>>>>>>> Streams vs Spark Streaming, especially in terms of >>>>>>>> aggregations/groupbys/joins implementation logic? >>>>>>>> >>>>>>>> Also, is it possible to stop the syncing between state stores to >>>>>> brokers, >>>>>>>> if I am fine with failures? >>>>>>>> >>>>>>>> Thanks >>>>>>>> Tianji >>>>>>>> >>>>>>>> >>>>>>>> On 2017-02-26 23:52 (-0500), Guozhang Wang <wangg...@gmail.com> wrote: >>>>>>>>> Hello Tianji, >>>>>>>>> >>>>>>>>> As Kohki mentioned, in Streams joins and aggregations are always done >>>>>>>>> pre-partitioned, and hence locally. So there won't be any inter-node >>>>>>>>> communications needed to execute the join / aggregations. Also they >>>>>> can >>>>>>>> be >>>>>>>>> hosted as persistent local state stores so you don't need to keep >>>>>> them >>>>>>> in >>>>>>>>> memory. So for example if you partition your data with K1 / K2, then >>>>>>> data >>>>>>>>> with the same values in combo (K1, K2) will always goes to the same >>>>>>>>> partition, and hence good for aggregations / joins on either K1, K2, >>>>>> or >>>>>>>>> combo(K1, K2), but not sufficient for combo(K1, K2, K3, K4), as data >>>>>>> with >>>>>>>>> the same values of K3 / K4 might still goes to different partitions >>>>>>>>> processed by different Streams instances. >>>>>>>>> >>>>>>>>> So what you want is really to partition based on the "maximum >>>>>> superset" >>>>>>>> of >>>>>>>>> all the involved keys. Note that with the superset of all the keys >>>>>> one >>>>>>>>> thing to watch out is the even distribution of the partitions. If it >>>>>> is >>>>>>>> not >>>>>>>>> evenly distributed, then some instance might become hot points. This >>>>>>> can >>>>>>>> be >>>>>>>>> tackled by customizing the "PartitionGrouper" interface in Streams, >>>>>>> which >>>>>>>>> indicates which set of partitions will be assigned to each of the >>>>>> tasks >>>>>>>> (by >>>>>>>>> default each one partition from the source topics will form a task, >>>>>> and >>>>>>>>> task is the unit of parallelism in Streams). >>>>>>>>> >>>>>>>>> Hope this helps. >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sun, Feb 26, 2017 at 10:57 AM, Kohki Nishio <tarop...@gmail.com> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Tianji, >>>>>>>>>> KStream is indeed Append mode as long as I do stateless processing, >>>>>>> but >>>>>>>>>> when you do aggregation that is a stateful operation and it turns >>>>>> to >>>>>>>> KTable >>>>>>>>>> and that does Update mode. >>>>>>>>>> >>>>>>>>>> In regard to your aggregation, I believe Kafka's aggregation works >>>>>>> for >>>>>>>> a >>>>>>>>>> single partition not over multiple partitions, are you doing 100 >>>>>>>>>> different aggregation against record key ? Then you should have a >>>>>>>> single >>>>>>>>>> data object for those 100 values, anyway it sounds like we have >>>>>>> similar >>>>>>>>>> problem .. >>>>>>>>>> >>>>>>>>>> -Kohki >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sat, Feb 25, 2017 at 1:11 PM, Tianji Li <skyah...@gmail.com> >>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Kohki, >>>>>>>>>>> >>>>>>>>>>> Thanks very much for providing your investigation results. >>>>>>> Regarding >>>>>>>>>>> 'append' mode with Kafka Streams, isn't KStream the thing you >>>>>> want? >>>>>>>>>>> >>>>>>>>>>> Hi Guozhang, >>>>>>>>>>> >>>>>>>>>>> Thanks for the pointers to the two blogs. I read one of them >>>>>> before >>>>>>>> and >>>>>>>>>>> just had a look at the other one. >>>>>>>>>>> >>>>>>>>>>> What I am hoping to do is below, can you help me decide if Kafka >>>>>>>> Stream >>>>>>>>>> is >>>>>>>>>>> a good fit? >>>>>>>>>>> >>>>>>>>>>> We have a few data sources, and we are hoping to correlate these >>>>>>>> sources, >>>>>>>>>>> and then do aggregations, as *a stream in real-time*. >>>>>>>>>>> >>>>>>>>>>> The number of aggregations is around 100 which means, if using >>>>>>> Kafka >>>>>>>>>>> Streams, we need to maintain around 100 state stores with 100 >>>>>>>> change-log >>>>>>>>>>> topics behind >>>>>>>>>>> the scene when joining and aggregations. >>>>>>>>>>> >>>>>>>>>>> The number of unique entries in each of these state stores is >>>>>>>> expected to >>>>>>>>>>> be at the level of < 100M. The size of each record is around 1K >>>>>>>> bytes and >>>>>>>>>>> so, >>>>>>>>>>> each state is expected to be ~100G bytes in size. The total >>>>>> number >>>>>>> of >>>>>>>>>>> bytes in all these state stores is thus around 10T bytes. >>>>>>>>>>> >>>>>>>>>>> If keeping all these stores in memory, this translates into >>>>>> around >>>>>>> 50 >>>>>>>>>>> machines with 256Gbytes for this purpose alone. >>>>>>>>>>> >>>>>>>>>>> Plus, the incoming raw data rate could reach 10M records per >>>>>> second >>>>>>>> in >>>>>>>>>>> peak hours. So, during aggregation, data movement between Kafka >>>>>>>> Streams >>>>>>>>>>> instances >>>>>>>>>>> will be heavy, i.e., 10M records per second in the cluster for >>>>>>>> joining >>>>>>>>>> and >>>>>>>>>>> aggregations. >>>>>>>>>>> >>>>>>>>>>> Is Kafka Streams good for this? My gut feeling is Kafka Streams >>>>>> is >>>>>>>> fine. >>>>>>>>>>> But I'd like to run this by you. >>>>>>>>>>> >>>>>>>>>>> And, I am hoping to minimize data movement (to saving bandwidth) >>>>>>>> during >>>>>>>>>>> joins/groupBys. If I partition the raw data with the minimum >>>>>> subset >>>>>>>> of >>>>>>>>>>> aggregation keys (say K1 and K2), then I wonder if the following >>>>>>>>>>> joins/groupBys (say on keys K1, K2, K3, K4) happen on local data, >>>>>>> if >>>>>>>>>> using >>>>>>>>>>> DSL? >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Tianji >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 2017-02-25 13:49 (-0500), Guozhang Wang <w...@gmail.com> >>>>>> wrote: >>>>>>>>>>>> Hello Kohki,> >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the email. I'd like to learn what's your concern of >>>>>>> the >>>>>>>> size >>>>>>>>>>> of> >>>>>>>>>>>> the state store? From your description it's a bit hard to >>>>>> figure >>>>>>>> out >>>>>>>>>> but> >>>>>>>>>>>> I'd guess you have lots of state stores while each of them are >>>>>>>>>>> relatively> >>>>>>>>>>>> small?> >>>>>>>>>>>> >>>>>>>>>>>> Hello Tianji,> >>>>>>>>>>>> >>>>>>>>>>>> Regarding your question about maturity and users of Streams, >>>>>> you >>>>>>>> can >>>>>>>>>>> take a> >>>>>>>>>>>> look at a bunch of the blog posts written about their Streams >>>>>>>> usage in> >>>>>>>>>>>> production, for example:> >>>>>>>>>>>> >>>>>>>>>>>> http://engineering.skybettingandgaming.com/2017/01/23/ >>>>>>>>>>> streaming-architectures/> >>>>>>>>>>>> >>>>>>>>>>>> http://developers.linecorp.com/blog/?p=3960> >>>>>>>>>>>> >>>>>>>>>>>> Guozhang> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Feb 25, 2017 at 7:52 AM, Kohki Nishio <ta...@gmail.com >>>>>>> >>>>>>>>>> wrote:> >>>>>>>>>>>> >>>>>>>>>>>>> I did a bit of research on that matter recently, the >>>>>> comparison >>>>>>>> is >>>>>>>>>>> between> >>>>>>>>>>>>> Spark Structured Streaming(SSS) and Kafka Streams,> >>>>>>>>>>>>>> >>>>>>>>>>>>> Both are relatively new (~1y) and trying to solve similar >>>>>>>> problems, >>>>>>>>>>> however> >>>>>>>>>>>>> if you go with Spark, you have to go with a cluster, if your >>>>>>>>>>> environment> >>>>>>>>>>>>> already have a cluster, then it's good. However our team >>>>>>> doesn't >>>>>>>> do >>>>>>>>>>> any> >>>>>>>>>>>>> Spark, so the initial cost would be very high. On the other >>>>>>> hand, >>>>>>>>>>> Kafka> >>>>>>>>>>>>> Streams is a java library, since we have a service framework, >>>>>>>> doing >>>>>>>>>>> stream> >>>>>>>>>>>>> inside a service is super easy.> >>>>>>>>>>>>>> >>>>>>>>>>>>> However for some reason, people see SSS is more mature and >>>>>>> Kafka >>>>>>>>>>> Streams is> >>>>>>>>>>>>> not so mature (like Beta). But old fashion stream is both >>>>>>> mature >>>>>>>>>>> enough (in> >>>>>>>>>>>>> my opinion), I didn't see any difference in DStream(Spark) >>>>>> and> >>>>>>>>>>>>> KStream(Kafka)> >>>>>>>>>>>>>> >>>>>>>>>>>>> DataFrame (Structured Streaming) and KTable, I found it quite >>>>>>>>>>> different.> >>>>>>>>>>>>> Kafka's model is more like a change log, that means you need >>>>>> to >>>>>>>> see >>>>>>>>>>> the> >>>>>>>>>>>>> latest entry to make a final decision. I would call this as >>>>>>>> 'Update' >>>>>>>>>>> model,> >>>>>>>>>>>>> whereas Spark does 'Append' model and it doesn't support >>>>>>> 'Update' >>>>>>>>>>> model> >>>>>>>>>>>>> yet. (it's coming to 2.2)> >>>>>>>>>>>>>> >>>>>>>>>>>>> http://spark.apache.org/docs/latest/structured-streaming-pro >>>>>>> >>>>>>>>>>>>> gramming-guide.html#output-modes> >>>>>>>>>>>>>> >>>>>>>>>>>>> I wanted to have 'Append' model with Kafka, but it seems it's >>>>>>> not >>>>>>>>>> easy> >>>>>>>>>>>>> thing to do, also Kafka Streams uses an internal topic to >>>>>> keep >>>>>>>> state> >>>>>>>>>>>>> changes for fail-over scenario, but I'm dealing with a lots >>>>>> of >>>>>>>> tiny> >>>>>>>>>>>>> information and I have a big concern about the size of the >>>>>>> state >>>>>>>>>> store >>>>>>>>>>> /> >>>>>>>>>>>>> topic, so my decision is that I'm going with my own handling >>>>>> of >>>>>>>> Kafka >>>>>>>>>>> API> >>>>>>>>>>>>> ..> >>>>>>>>>>>>>> >>>>>>>>>>>>> If you do stateless operation and don't have a spark cluster, >>>>>>>> yeah >>>>>>>>>>> Kafka> >>>>>>>>>>>>> Streams is perfect.> >>>>>>>>>>>>> If you do stateful complicated operation and happen to have a >>>>>>>> spark> >>>>>>>>>>>>> cluster, give Spark a try> >>>>>>>>>>>>> else you have to write a code which is optimized for your use >>>>>>>> case> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> thanks> >>>>>>>>>>>>> -Kohki> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Feb 24, 2017 at 6:22 PM, Tianji Li <sk...@gmail.com> >>>>>>>> wrote:> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi there,> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> Can anyone give a good explanation in what cases Kafka >>>>>>> Streams >>>>>>>> is> >>>>>>>>>>>>>> preferred, and in what cases Sparking Streaming is better?> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks> >>>>>>>>>>>>>> Tianji> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> --> >>>>>>>>>>>>> Kohki Nishio> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- > >>>>>>>>>>>> -- Guozhang> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Kohki Nishio >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -- Guozhang >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>> >> >
signature.asc
Description: OpenPGP digital signature