Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-08 Thread Eno Thereska
Hi, Users have many options for buffering in the Processor API and it doesn't seem right we should prescribe a particular one. Also, there is value in continuing to keep the Processor API simple. As such, we'll remove the ".enableCaching" for a store used in the processor API from the KIP and

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-07 Thread Damian Guy
Gouzhang, Some points about what you have mentioned: 1. You can't just call context.forward() on the flush listener. You have to set some other contextual information (currently ProcessorRecordContext) prior to doing this otherwise the nodes you are forwarding to are undetermined, i.e, this can be

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-06 Thread Guozhang Wang
Hi Matthias, I agree with your concerns of coupling with record forwarding with record storing in the state store, and my understanding is that this can (and should) be resolved with the current interface. Here are my thoughts: 1. The global cache, MemoryLRUCacheBytes, although is currently defin

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-06 Thread Eno Thereska
A small update to the KIP: the deduping of records using the cache does not affect the .to operator since we'd have already deduped the KTable before the operator. Adjusting KIP. Thanks Eno > On 5 Sep 2016, at 12:43, Eno Thereska wrote: > > Hi Matthias, > > The motivation for KIP-63 was prim

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-05 Thread Eno Thereska
Hi Matthias, The motivation for KIP-63 was primarily aggregates and reducing the load on "both" state stores and downstream. I think there is agreement that for the DSL the motivation and design make sense. For the Processor API: caching is a major component in any system, and it is difficult

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Matthias J. Sax
> Processor code should always work; independently if caching is enabled or not. If we want to get this, I guess we need a quite different design (see (1)). The point is, that we want to dedup the output, and not state updates. It just happens that our starting point was KTable, for which state

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Matthias J. Sax
Sure, you can use a non-cached state. However, if you write code like below for a non-cached state, and learn about caching later on, and think, caching is a cool feature, I want to use it, you would simply want to enable caching (without breaking your code). Processor code should always work inde

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Damian Guy
Thanks for clarifying On 4 September 2016 at 17:36, Matthias J. Sax wrote: > Sorry for not being precise. What I meant be "completely" is for a > single processor. Assume I want to have the following pattern: > > process(...) { > if (someCondition) { > state.put(...) > context.

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Eno Thereska
Hi Matthias, Thanks for the good questions. There is still the option of not using cached state. If one uses cached state it will dedup for stores and forwarding further. But you can always disable caching and do what you say. Eno > On 4 Sep 2016, at 17:36, Matthias J. Sax wrote: > > Sorry

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Matthias J. Sax
Sorry for not being precise. What I meant be "completely" is for a single processor. Assume I want to have the following pattern: process(...) { if (someCondition) { state.put(...) context.forward(...); } else { context.forward(...); } Ie, for some record I do update

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Damian Guy
Hi Matthias, Thanks for bringing the conversation across to the thread. I think a main limitation would be, that you cannot mix the 4 patterns > within a single application anymore (iff you use a "caches state"). If > you have processor with a "cached state" this disables direct usage of > contex

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Matthias J. Sax
I think a main limitation would be, that you cannot mix the 4 patterns within a single application anymore (iff you use a "caches state"). If you have processor with a "cached state" this disables direct usage of context.forward() completely -- if I understand the design correctly. Thus, if a "cach

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-09-04 Thread Matthias J. Sax
We had a recent discussion about KIP-63, and I just c&p from the JIRA discussion: Damian: > During the code walk-through, Matthias raised a very good point about the use > of context().forward being coupled to whether or not caching is enabled. Now > that i've had the chance to think about it I

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-08-24 Thread Eno Thereska
Hi folks, We've been working on a proof-of-concept for KIP-63 and that can now be found at the main JIRA (https://issues.apache.org/jira/browse/KAFKA-3776) under PR https://github.com/apache/kafka/pull/1752. It is still work in progress, however we are confident that the basic structure is there.

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-21 Thread Roger Hoover
Thanks, Eno. On Tue, Jun 21, 2016 at 2:22 AM, Eno Thereska wrote: > Hi Roger, > > I realised I never got back to you on this one, sorry. Some answers inline: > > > On 3 Jun 2016, at 22:48, Roger Hoover wrote: > > > > Hi Eno, > > > > Does this mean that Kafka Streams will disable the RocksDB wri

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-21 Thread Eno Thereska
Hi Roger, I realised I never got back to you on this one, sorry. Some answers inline: > On 3 Jun 2016, at 22:48, Roger Hoover wrote: > > Hi Eno, > > Does this mean that Kafka Streams will disable the RocksDB write buffer? For the purposes of this KIP we might not want to change the current Ro

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-19 Thread Neha Narkhede
I'm in favor of a global config that is then evenly divided amongst the threads of a Kafka Streams instance. On Mon, Jun 13, 2016 at 6:23 PM, Guozhang Wang wrote: > Although this KIP is not mainly for memory management of Kafka Streams, > since it touches on quite some part of it I think it is g

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-13 Thread Guozhang Wang
Although this KIP is not mainly for memory management of Kafka Streams, since it touches on quite some part of it I think it is good to first think of what we would REALLY want as an end goal for memory usage in order to make sure that whatever we proposed in this KIP aligns with that long-term pla

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-13 Thread Matthias J. Sax
I am just catching up on this thread. From my point of view, easy tuning for the user is the most important thing, because Kafka Streams is a library. Thus, a global cache size parameter should be the best. About dividing the memory vs a single global cache. I would argue that in the first place

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-08 Thread Henry Cai
One more thing for this KIP: Currently RocksDBWindowStore serialize the key/value before it puts into the in-memory cache, I think we should delay this serialization/deserialization unless it needs flush to db. For a simple countByKey for 100 records, this would trigger 100 serialization/deserial

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-06 Thread Ismael Juma
On Mon, Jun 6, 2016 at 6:48 PM, Guozhang Wang wrote: > > About using Instrumentation.getObjectSize, yeah we were worried a lot about > its efficiency as well as accuracy when discussing internally, but not a > better solution was proposed. So if people have better ideas, please throw > them here,

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-06 Thread Guozhang Wang
About the distribution of memory: I think it is good to evenly distribute the memory across threads, while what I was calling our for considerata is the distribution of memory within a thread, to its tasks / caches of the task, as the number of tasks and the number of caches in the task can vary fr

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-06 Thread Jay Kreps
For the threads, I think it might be reasonable to just give each thread cache_bytes/num_threads for simplicity? Presumably you wouldn't want to have to make the cache threadsafe so you'd want to avoid sharing? Giving each thread the same cache isn't quite right, as you point out, but depending on

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-06 Thread Henry Cai
If we are going with a global setting, can the cache just be one gigantic LRU cache instead of many smaller ones? (partition_name, task_name) can be part of cache key. If there are many smaller caches, it's hard to achieve efficient global resource utilization (some caches are busy, some caches a

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-06 Thread Eno Thereska
Hi Guozhang, About your first point: the alternative is not knowing how much memory a KafkaStreams instance will consume, since, as you mention, M and N can change. I agree the implementation is slightly harder since each cache now can change size dynamically (and the Kafka Streams instance nee

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-05 Thread Gwen Shapira
So you were listing the difficulties in order to provide context for the upcoming design change and discussion? Its all good then :) On Sun, Jun 5, 2016 at 11:26 PM, Guozhang Wang wrote: > Don't get me wrong Gwen :) I'm definitely for removing as less burden as > possible from users. All I'm sayi

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-05 Thread Guozhang Wang
Don't get me wrong Gwen :) I'm definitely for removing as less burden as possible from users. All I'm saying it is not straight-forward to do so, and we'd better at least have a concrete implementation design on the KIP page rather than just a one-line change of the config semantics. On Sun, Jun 5

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-05 Thread Gwen Shapira
If it is hard for you, just imagine how much fun the users will have. Machines have X GB available RAM. Someone has to figure out how to divide it for processors and rocksDB. The user doesn't have any special knowledge that you don't in this case, so there's no point in pushing the decision to the

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-05 Thread Guozhang Wang
There are some details needed to be figured out if we go global: A KafkaStreams instance could have M threads, and each thread could various number (let's say N, but in practice it may be different from thread to thread) tasks, and each task contains a sub-topology with P caches (again in practice

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-04 Thread Eno Thereska
Hi Jay, We can make it global instead of per-processor, sounds good. Thanks Eno > On 3 Jun 2016, at 23:15, Jay Kreps wrote: > > Hey Eno, > > Should the config be the global memory use rather than the per-processor? > That is, let’s say I know I have fixed a 1GB heap because that is what I >

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-03 Thread Jay Kreps
Hey Eno, Should the config be the global memory use rather than the per-processor? That is, let’s say I know I have fixed a 1GB heap because that is what I set for Java, and want to use 100MB for caching, it seems like right now I’d have to do some math that depends on my knowing a bit about how c

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-03 Thread Roger Hoover
Hi Eno, Does this mean that Kafka Streams will disable the RocksDB write buffer? Is it currently safe to recover a Kafka Streams application after SIGKILL on the same machine? If not, will this make it safe to do so? If RocksDB is not flushed before offsets are commited in Kafka and is killed w

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-03 Thread Eno Thereska
Hi Gwen, Yes. As an example, if cache.max.bytes.buffering set to X, and if users have A aggregation operators and T KTable.to() operators, then X*(A + T) total bytes will be allocated for caching. Eno > On 3 Jun 2016, at 21:37, Gwen Shapira wrote: > > Just to clarify: "cache.max.bytes.buffer

Re: [DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-03 Thread Gwen Shapira
Just to clarify: "cache.max.bytes.buffering" is per processor? On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska wrote: > Hi there, > > I have created KIP-63: Unify store and downstream caching in streams > https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+cachin

[DISCUSS] KIP-63: Unify store and downstream caching in streams

2016-06-02 Thread Eno Thereska
Hi there, I have created KIP-63: Unify store and downstream caching in streams https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams Fee