I had a question regarding
http://docs.confluent.io/3.1.2/streams/developer-guide.html#enable-disable-state-store-changelogs

When disabling this in 0.10.2 what does this exactly means.
Dos this means no longer any rocksdb state store would get created?

On this subject we had started with spark streaming, but we ran into memory
issues and the hardware we have got is not so fantastic to support spark
streaming.

So we switched to high level DSL kafka streaming .

I think if your source is kafka queues, kafka streaming is good and simple
to use. However you need to plan ahead as anticipate the (max) load and
create adequate partitions based on some key on which aggregations can be
performed independently.

Then you can run cluster of stream threads (same and multiple machines),
each processing a partition.

Having said this, we however run into lot of issues of frequent stream
re-balance, especially when we have multiple instances of rocks db running
on a single machine.
Now we don't know if this is some bad VM configuration issue or some
problem with kafka streams/rocks db integration, we are still working on
that.

So I would suggest if you partition your data well enough and have single
streams thread consuming only one partition and not many instances of
rocksdb created on a single machine, the overall applications runs fine.
Also make sure not to create big time windows and set a not so long
retention time, so that state stores size is limited.

We use a sliding 5 minutes window of size 10 minutes and retention of 30
minutes and see overall performance much better than say 30 minutes sliding
of size 1 hour and retention of 3 hours.

So to conclude if you can manage rocks db, then kafka streams is good to
start with, its simple and very intuitive to use.

Again on rocksdb side, is there a way to eliminate that and is

disableLogging

for that?

Thanks
Sachin



On Mon, Feb 27, 2017 at 7:47 PM, Michael Noll <mich...@confluent.io> wrote:

> > Also, is it possible to stop the syncing between state stores to brokers,
> if I am fine with failures?
>
> Yes, you can disable the syncing (or the "changelog" feature) of state
> stores:
> http://docs.confluent.io/current/streams/developer-
> guide.html#enable-disable-state-store-changelogs
>
> > I do have a Spark Cluster, but I am not convince how Spark Streaming can
> do this differently.
> > Guozhang, could you comment anything regarding Kafka Streams vs Spark
> Streaming, especially
> > in terms of aggregations/groupbys/joins implementation logic?
>
> As you are hinting at yourself, if you want fault-tolerant state, then this
> fault tolerance comes at a price (in Kafka Streams, this is achieved by
> changelog-ing state stores).  Other tools such as Flink or Spark work in a
> similar fashion, there's no free lunch.
>
> One option, which you brought up above, is to disable the fault tolerance
> functionality for state by disabling the changelogs of state stores (see
> above).  Another option is to leverage Kafka's record caching for Kafka
> Streams, which does lower the amount of data that is sent across the
> network (from your app's state store changelogs to the Kafka cluster and
> vice versa), though you may need to tune some parameters in your situation
> because your key space has high cardinality and message volume per key is
> relatively low (= you don't benefit as much from record caching as most
> other users/use cases).
>
>
> -Michael
>
>
>
>
> On Mon, Feb 27, 2017 at 2:42 PM, Tianji Li <skyah...@gmail.com> wrote:
>
> > Hi Guozhang and Kohki,
> >
> > Thanks for your replies.
> >
> > I think I know how to deal with partitioning now, but I am still not sure
> > how to deal with the traffic between the hidden state store sizes and
> Kafka
> > Brokers (same as Kohki).
> >
> > I feel like the easiest thing to do is to set a larger commit window, so
> > that the state stores are synced to brokers slower than default.
> >
> > I do have a Spark Cluster, but I am not convince how Spark Streaming can
> > do this differently. Guozhang, could you comment anything regarding Kafka
> > Streams vs Spark Streaming, especially in terms of
> > aggregations/groupbys/joins implementation logic?
> >
> > Also, is it possible to stop the syncing between state stores to brokers,
> > if I am fine with failures?
> >
> > Thanks
> > Tianji
> >
> >
> > On 2017-02-26 23:52 (-0500), Guozhang Wang <wangg...@gmail.com> wrote:
> > > Hello Tianji,
> > >
> > > As Kohki mentioned, in Streams joins and aggregations are always done
> > > pre-partitioned, and hence locally. So there won't be any inter-node
> > > communications needed to execute the join / aggregations. Also they can
> > be
> > > hosted as persistent local state stores so you don't need to keep them
> in
> > > memory. So for example if you partition your data with K1 / K2, then
> data
> > > with the same values in combo (K1, K2) will always goes to the same
> > > partition, and hence good for aggregations / joins on either K1, K2, or
> > > combo(K1, K2), but not sufficient for combo(K1, K2, K3, K4), as data
> with
> > > the same values of K3 / K4 might still goes to different partitions
> > > processed by different Streams instances.
> > >
> > > So what you want is really to partition based on the "maximum superset"
> > of
> > > all the involved keys. Note that with the superset of all the keys one
> > > thing to watch out is the even distribution of the partitions. If it is
> > not
> > > evenly distributed, then some instance might become hot points. This
> can
> > be
> > > tackled by customizing the "PartitionGrouper" interface in Streams,
> which
> > > indicates which set of partitions will be assigned to each of the tasks
> > (by
> > > default each one partition from the source topics will form a task, and
> > > task is the unit of parallelism in Streams).
> > >
> > > Hope this helps.
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Feb 26, 2017 at 10:57 AM, Kohki Nishio <tarop...@gmail.com>
> > wrote:
> > >
> > > > Tianji,
> > > > KStream is indeed Append mode as long as I do stateless processing,
> but
> > > > when you do aggregation that is a stateful operation and it turns to
> > KTable
> > > > and that does Update mode.
> > > >
> > > > In regard to your aggregation, I believe Kafka's aggregation works
> for
> > a
> > > > single partition not over multiple partitions, are you doing 100
> > > > different aggregation against record key ? Then you should have a
> > single
> > > > data object for those 100 values, anyway it sounds like we have
> similar
> > > > problem ..
> > > >
> > > > -Kohki
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Sat, Feb 25, 2017 at 1:11 PM, Tianji Li <skyah...@gmail.com>
> wrote:
> > > >
> > > > > Hi Kohki,
> > > > >
> > > > > Thanks very much for providing your investigation results.
> Regarding
> > > > > 'append' mode with Kafka Streams, isn't KStream the thing you want?
> > > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > Thanks for the pointers to the two blogs. I read one of them before
> > and
> > > > > just had a look at the other one.
> > > > >
> > > > > What I am hoping to do is below, can you help me decide if Kafka
> > Stream
> > > > is
> > > > > a good fit?
> > > > >
> > > > > We have a few data sources, and we are hoping to correlate these
> > sources,
> > > > > and then do aggregations, as *a stream in real-time*.
> > > > >
> > > > > The number of aggregations is around 100 which means, if using
> Kafka
> > > > > Streams, we need to maintain around 100 state stores with 100
> > change-log
> > > > > topics behind
> > > > > the scene when joining and aggregations.
> > > > >
> > > > > The number of unique entries in each of these state stores is
> > expected to
> > > > > be at the level of < 100M. The size of each record is around 1K
> > bytes and
> > > > > so,
> > > > > each state is expected to be ~100G bytes in size. The total number
> of
> > > > > bytes in all these state stores is thus around 10T bytes.
> > > > >
> > > > > If keeping all these stores in memory, this translates into around
> 50
> > > > > machines with 256Gbytes for this purpose alone.
> > > > >
> > > > > Plus, the incoming raw data rate could reach 10M records per second
> > in
> > > > > peak hours. So, during aggregation, data movement between Kafka
> > Streams
> > > > > instances
> > > > > will be heavy, i.e., 10M records per second in the cluster for
> > joining
> > > > and
> > > > > aggregations.
> > > > >
> > > > > Is Kafka Streams good for this? My gut feeling is Kafka Streams is
> > fine.
> > > > > But I'd like to run this by you.
> > > > >
> > > > > And, I am hoping to minimize data movement (to saving bandwidth)
> > during
> > > > > joins/groupBys. If I partition the raw data with the minimum subset
> > of
> > > > > aggregation keys (say K1 and K2),  then I wonder if the following
> > > > > joins/groupBys (say on keys K1, K2, K3, K4) happen on local data,
> if
> > > > using
> > > > > DSL?
> > > > >
> > > > > Thanks
> > > > > Tianji
> > > > >
> > > > >
> > > > > On 2017-02-25 13:49 (-0500), Guozhang Wang <w...@gmail.com> wrote:
> > > > > > Hello Kohki,>
> > > > > >
> > > > > > Thanks for the email. I'd like to learn what's your concern of
> the
> > size
> > > > > of>
> > > > > > the state store? From your description it's a bit hard to figure
> > out
> > > > but>
> > > > > > I'd guess you have lots of state stores while each of them are
> > > > > relatively>
> > > > > > small?>
> > > > > >
> > > > > > Hello Tianji,>
> > > > > >
> > > > > > Regarding your question about maturity and users of Streams, you
> > can
> > > > > take a>
> > > > > > look at a bunch of the blog posts written about their Streams
> > usage in>
> > > > > > production, for example:>
> > > > > >
> > > > > > http://engineering.skybettingandgaming.com/2017/01/23/
> > > > > streaming-architectures/>
> > > > > >
> > > > > > http://developers.linecorp.com/blog/?p=3960>
> > > > > >
> > > > > > Guozhang>
> > > > > >
> > > > > >
> > > > > > On Sat, Feb 25, 2017 at 7:52 AM, Kohki Nishio <ta...@gmail.com>
> > > > wrote:>
> > > > > >
> > > > > > > I did a bit of research on that matter recently, the comparison
> > is
> > > > > between>
> > > > > > > Spark Structured Streaming(SSS) and Kafka Streams,>
> > > > > > >>
> > > > > > > Both are relatively new (~1y) and trying to solve similar
> > problems,
> > > > > however>
> > > > > > > if you go with Spark, you have to go with a cluster, if your
> > > > > environment>
> > > > > > > already have a cluster, then it's good. However our team
> doesn't
> > do
> > > > > any>
> > > > > > > Spark, so the initial cost would be very high. On the other
> hand,
> > > > > Kafka>
> > > > > > > Streams is a java library, since we have a service framework,
> > doing
> > > > > stream>
> > > > > > > inside a service is super easy.>
> > > > > > >>
> > > > > > > However for some reason, people see SSS is more mature and
> Kafka
> > > > > Streams is>
> > > > > > > not so mature (like Beta). But old fashion stream is both
> mature
> > > > > enough (in>
> > > > > > > my opinion), I didn't see any difference in DStream(Spark) and>
> > > > > > > KStream(Kafka)>
> > > > > > >>
> > > > > > > DataFrame (Structured Streaming) and KTable, I found it quite
> > > > > different.>
> > > > > > > Kafka's model is more like a change log, that means you need to
> > see
> > > > > the>
> > > > > > > latest entry to make a final decision. I would call this as
> > 'Update'
> > > > > model,>
> > > > > > > whereas Spark does 'Append' model and it doesn't support
> 'Update'
> > > > > model>
> > > > > > > yet. (it's coming to 2.2)>
> > > > > > >>
> > > > > > > http://spark.apache.org/docs/latest/structured-streaming-pro>
> > > > > > > gramming-guide.html#output-modes>
> > > > > > >>
> > > > > > > I wanted to have 'Append' model with Kafka, but it seems it's
> not
> > > > easy>
> > > > > > > thing to do, also Kafka Streams uses an internal topic to keep
> > state>
> > > > > > > changes for fail-over scenario, but I'm dealing with a lots of
> > tiny>
> > > > > > > information and I have a big concern about the size of the
> state
> > > > store
> > > > > />
> > > > > > > topic, so my decision is that I'm going with my own handling of
> > Kafka
> > > > > API>
> > > > > > > ..>
> > > > > > >>
> > > > > > > If you do stateless operation and don't have a spark cluster,
> > yeah
> > > > > Kafka>
> > > > > > > Streams is perfect.>
> > > > > > > If you do stateful complicated operation and happen to have a
> > spark>
> > > > > > > cluster, give Spark a try>
> > > > > > > else you have to write a code which is optimized for your use
> > case>
> > > > > > >>
> > > > > > >>
> > > > > > > thanks>
> > > > > > > -Kohki>
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > > On Fri, Feb 24, 2017 at 6:22 PM, Tianji Li <sk...@gmail.com>
> > wrote:>
> > > > > > >>
> > > > > > > > Hi there,>
> > > > > > > >>
> > > > > > > > Can anyone give a good explanation in what cases Kafka
> Streams
> > is>
> > > > > > > > preferred, and in what cases Sparking Streaming is better?>
> > > > > > > >>
> > > > > > > > Thanks>
> > > > > > > > Tianji>
> > > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > > -->
> > > > > > > Kohki Nishio>
> > > > > > >>
> > > > > >
> > > > > >
> > > > > >
> > > > > > -- >
> > > > > > -- Guozhang>
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Kohki Nishio
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Reply via email to