Hi Marco,
FYI this has now been merged to trunk and 0.10.2. If you wanted to try it
out i'd probably suggest checking out building the 0.10.2 branch.
Thanks,
Damian
On Tue, 7 Mar 2017 at 17:57 Damian Guy wrote:
> Hi Marco,
>
> Absolutely no problem at all. I'm glad it work it out. For referenc
Hi Marco,
Absolutely no problem at all. I'm glad it work it out. For reference here
is a PR that fixes the problem:
https://github.com/apache/kafka/pull/2645 and the associated JIRA
https://issues.apache.org/jira/browse/KAFKA-4851
Thanks,
Damian
On Tue, 7 Mar 2017 at 17:48 Marco Abitabile
wrote
Hello Damian,
Thanks a lot for your precious support.
I confirm you that your workaround is perfectly working for my use case.
I'll be glad to support you to test the original code whenever the issue
you've spotted will be solved.
Thanks a lot again.
Marco.
Il 06/mar/2017 16:03, "Damian Guy"
Hi Marco,
I've done some testing and found that there is a performance issue when
caching is enabled. I suspect his might be what you are hitting. It looks
to me that you can work around this by doing something like:
final StateStoreSupplier sessionStore =
Stores.create(*"session-store-name"*)
Thanks Damian,
sure, you are right, these details are modified to be compliant with my
company rules. However the main points are unchanged.
The producer of the original data is a "data ingestor" that attach few
extra fields and produces a message such as:
row = new JsonObject({
"id" : 1234565
Hi Marco,
Can you try setting StreamsConfig.CACHE_MAX_BYTES_BUFFER_CONFIG to 0 and
see if that resolves the issue?
Thanks,
Damian
On Mon, 6 Mar 2017 at 10:59 Damian Guy wrote:
> Hi Marco,
>
> Your config etc look ok.
>
> 1. It is pretty hard to tell what is going on from just your code below,
Hi Marco,
Your config etc look ok.
1. It is pretty hard to tell what is going on from just your code below,
unfortunately. But the behaviour doesn't seem to be inline with what I'm
reading in the streams code. For example your MySession::new function
should be called once per record. The merger a
Hello,
I'm playing around with the brand new SessionWindows. I have a simple
topology such as:
KStream sess =
builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
sess
.map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
.groupByKey(stringSerde, jsonSerde)
.aggregate(
MySes