Just an update, on staging with the new code the issue might have been on
our side, I'm still not sure but I'm now monitoring the aggregation list
size to confirm that incoming data, even without a change in rate, was the
real issue.
(this however wasn't happening in production and the cache proble
Hi Sophie,
thanks for explaining that.
So yeah it seems that since I'm using the default grace period of 24 hours,
that's might cause the records to be sent to the changelog after ~24 hours.
However, I'm switching the regular windowing system to the custom one, and
while for the regular windows n
Thanks for collecting all these metrics. It might be that as the length of
the lists
increases over time, the cache is able to hold fewer unique keys and
eventually has to
start evicting things. This would explain why the cache hit rate starts to
decrease, and
likely why latency starts to go up. Wh
Just an update since it has been happening again now and I have some more
metrics to show, the topology is this:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-00 (topics: [sensors])
--> KSTREAM-TRANSFORMVALUES-01
Processor: KSTREAM-TRANSFORMVALUES-01 (
You're saying that with a 100ms commit interval, caching won't help because
it would still send the compacted changes to the changelog every 100ms?
Regarding the custom state store I'll look into that because I didn't go
much further than transformers and stores in my kafka experience so I'll
need
Alright, well I see why you have so much data being sent to the changelog
if each
update involves appending to a list and then writing in the whole list. And
with 340
records/minute I'm actually not sure how the cache could really help at all
when it's
being flushed every 100ms.
Here's kind of a w
Hi Sophie,
Just to give a better context, yes we use EOS and the problem happens in
our aggregation store.
Basically when windowing data we append each record into a list that's
stored in the aggregation store.
We have 2 versions, in production we use the kafka streams windowing API,
in staging we
It's an LRU cache, so once it gets full new records will cause older ones
to be evicted (and thus sent
downstream). Of course this should only apply to records of a different
key, otherwise it will just cause
an update of that key in the cache.
I missed that you were using EOS, given the short com
Hi Sophie,
thanks fo helping.
By eviction of older records you mean they get flushed to the changelog
topic?
Or the cache is just full and so all new records go to the changelog topic
until the old ones are evicted?
Regarding the timing, what timing do you mean? Between when the cache stops
and
It might be that the cache appears to "stop working" because it gets full,
and each
new update causes an eviction (of some older record). This would also
explain the
opposite behavior, that it "starts working" again after some time without
being restarted,
since the cache is completely flushed on c
And it seems that for some reason after a while caching works again without
a restart of the streams application
[image: Screen Shot 2019-12-08 at 11.59.30 PM.png]
I'll try to enable debug metrics and see if I can find something useful
there.
Any idea is appreciated in the meantime :)
--
Alessan
It seems that even with caching enabled, after a while the sent bytes stil
go up
[image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
you can see the deploy when I've enabled caching but it looks like it's
still a temporary solution.
--
Alessandro Tagliapietra
On Sat, Dec 7, 2019 at 10:08 AM Ale
Could be, but since we have a limite amount of input keys (~30), windowing
generates new keys but old ones are never touched again since the data per
key is in order, I assume it shouldn't be a big deal for it to handle 30
keys
I'll have a look at cache metrics and see if something pops out
Thanks
Hmm, that’s a good question. Now that we’re talking about caching, I wonder if
the cache was just too small. It’s not very big by default.
On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote:
> Ok I'll check on that!
>
> Now I can see that with caching we went from 3-4MB/s to 400KB/s,
Ok I'll check on that!
Now I can see that with caching we went from 3-4MB/s to 400KB/s, that will
help with the bill.
Last question, any reason why after a while the regular windowed stream
starts sending every update instead of caching?
Could it be because it doesn't have any more memory availab
Ah, yes. Glad you figured it out!
Caching does not reduce EOS guarantees at all. I highly recommend using it. You
might even want to take a look at the caching metrics to make sure you have a
good hit ratio.
-John
On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra wrote:
> Never mind I've
Never mind I've found out I can use `.withCachingEnabled` on the store
builder to achieve the same thing as the windowing example as
`Materialized.as` turns that on by default.
Does caching in any way reduces the EOS guarantees?
--
Alessandro Tagliapietra
On Sat, Dec 7, 2019 at 1:12 AM Alessand
Seems my journey with this isn't done just yet,
This seems very complicated to me but I'll try to explain it as best I can.
To better understand the streams network usage I've used prometheus with
the JMX exporter to export kafka metrics.
To check the amount of data we use I'm looking at the incre
Oh, good!
On Tue, Dec 3, 2019, at 23:29, Alessandro Tagliapietra wrote:
> Testing on staging shows that a restart on exception is much faster and the
> stream starts right away which I think means we're reading way less data
> than before!
>
> What I was referring to is that, in Streams, the keys
Testing on staging shows that a restart on exception is much faster and the
stream starts right away which I think means we're reading way less data
than before!
What I was referring to is that, in Streams, the keys for window
> aggregation state is actually composed of both the window itself and
Oh, yeah, I remember that conversation!
Yes, then, I agree, if you're only storing state of the most recent window for
each key, and the key you use for that state is actually the key of the
records, then an aggressive compaction policy plus your custom transformer
seems like a good way forward
Hi John,
afaik grace period uses stream time
https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Windows.html
which is
per partition, unfortunately we process data that's not in sync between
keys so each key needs to be independent and a key can have much older data
than the other
Hey Alessandro,
That sounds also like it would work. I'm wondering if it would actually change
what you observe w.r.t. recovery behavior, though. Streams already sets the
retention time on the changelog to equal the retention time of the windows, for
windowed aggregations, so you shouldn't be l
Thanks John for the explanation,
I thought that with EOS enabled (which we have) it would in the worst case
find a valid checkpoint and start the restore from there until it reached
the last committed status, not completely from scratch. What you say
definitely makes sense now.
Since we don't real
Hi Alessandro,
To take a stab at your question, maybe it first doesn't find it, but then
restores some data, writes the checkpoint, and then later on, it has to
re-initialize the task for some reason, and that's why it does find a
checkpoint then?
More to the heart of the issue, if you have EO
Hi John,
thanks a lot for helping, regarding your message:
- no we only have 1 instance of the stream application, and it always
re-uses the same state folder
- yes we're seeing most issues when restarting not gracefully due exception
I've enabled trace logging and filtering by a single state s
Hi Alessandro,
I'm sorry to hear that.
The restore process only takes one factor into account: the current offset
position of the changelog topic is stored in a local file alongside the state
stores. On startup, the app checks if the recorded position lags the latest
offset in the changelog. I
Hello everyone,
we're having a problem with bandwidth usage on streams application startup,
our current setup does this:
...
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.aggregate(
{ MetricSequenceList(ArrayList()) },
{ key, value, aggregate ->
agg
28 matches
Mail list logo