Hello everyone,
after upgrading to kafka streams 2.8 we have one streams app that's
stuck trying to restore a store changelog topic, this is the debug log
of the app:
https://gist.github.com/alex88/f31593aaabbd282b21f89a0252a28745
I would like to avoid having to delete and recreate the topic
Is there any reason why?
On 7/20/21 2:09 PM, Alessandro Tagliapietra wrote:
Hello everyone,
after upgrading to kafka streams 2.8 we have one streams app that's
stuck trying to restore a store changelog topic, this is the debug
log of the app:
https://gist.github.com/alex88/f31593aaabbd
I've tried to restart the streams application using at_least_once
processing guarantee and it worked, restarted again in exactly_once_beta
and it worked too.
Is there any reason why?
On 7/20/21 2:09 PM, Alessandro Tagliapietra wrote:
Hello everyone,
after upgrading to kafka streams 2
Hello everyone,
after upgrading to kafka streams 2.8 we have one streams app that's
stuck trying to restore a store changelog topic, this is the debug log
of the app:
https://gist.github.com/alex88/f31593aaabbd282b21f89a0252a28745
I would like to avoid having to delete and recreate the topic
ng I can try to solve this? Because it seems that the replicator is
moving messages but the rate is < 1MB/s so it's probably going very slow
compared to what it should.
Thank you in advance
--
Alessandro Tagliapietra
ys are the machine IDs) ends up in the same streams instance.
Which is instead guaranteed with the intermediate topic?
Thanks!
--
Alessandro Tagliapietra
On Tue, May 12, 2020 at 7:16 AM Bill Bejeck wrote:
> Hi Alessandro,
>
> For merging the three streams, have you considered the `KStr
f the "last
seen" of a machine is older than some time
To "merge' the 3 streams I was thinking to just map them into a single
intermediate topic and have the ValueTransformer read from that.
Is there a better way? Maybe without using an intermediate topic?
Thank you in advance
--
Alessandro Tagliapietra
such exception? Or is the configuration
wrong?
--
Alessandro Tagliapietra
On Mon, Jan 6, 2020 at 10:31 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> Hello everyone,
>
> I'm trying to run kafka connect to dump avro messages to S3, I'm having
&g
se our custom EnvironmentTopicNameStrategy but I
get: "java.lang.RuntimeException:
com.myapp..serializers.subject.EnvironmentTopicNameStrategy is not an
instance of
io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy" even
if the class extends TopicNameStrategy
- just adding value.converter (and auth/schema registry url) configs,
results in the same error as above just with the default class:
"java.lang.RuntimeException:
io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance
of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy"
Any idea? It seems that adding value.converter to the connector breaks even
the default serialization class.
Thanks in advance
--
Alessandro Tagliapietra
n and the cache problem in that
case was probably what you mentioned)
Regards
--
Alessandro Tagliapietra
On Fri, Dec 13, 2019 at 2:17 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> Hi Sophie,
>
> thanks for explaining that.
> So yeah it seems that since I
te store, is it possible to find out in some other way? Just to
tkeep track of it?
Regards
--
Alessandro Tagliapietra
On Thu, Dec 12, 2019 at 3:14 PM Sophie Blee-Goldman
wrote:
> Thanks for collecting all these metrics. It might be that as the length of
> the lists
> increases
e changelog topics increase
(full album https://imgur.com/a/tXlJJEO)
Any other metric that might be important?
It seems that the issue is between the aggregate and Ktable.toStream()
After a restart as expected usage go back to normal values
--
Alessandro Tagliapietra
On Mon, Dec 9, 20
ng it in our pipeline decreased
the sent bytes from 30-40MB/minute to 400KB/minute?
I'll look into the custom state store tho.
Thanks
--
Alessandro Tagliapietra
On Mon, Dec 9, 2019 at 7:02 PM Sophie Blee-Goldman
wrote:
> Alright, well I see why you have so much data being sent t
te. Most
of the records are also generated by simulators with very predictable
output rate.
In the meantime I've enabled reporting of debug metrics (so including cache
hit ratio) to hopefully get better insights the next time it happens.
Thank you in advance
--
Alessandro Tagliapietra
On Mon,
starts working again? We're using EOS os I believe the commit interval
is every 100ms.
Regards
--
Alessandro Tagliapietra
On Mon, Dec 9, 2019 at 12:15 PM Sophie Blee-Goldman
wrote:
> It might be that the cache appears to "stop working" because it gets full,
> and each
&
ntime :)
--
Alessandro Tagliapietra
On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> It seems that even with caching enabled, after a while the sent bytes stil
> go up
>
> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
>
> you
It seems that even with caching enabled, after a while the sent bytes stil
go up
[image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
you can see the deploy when I've enabled caching but it looks like it's
still a temporary solution.
--
Alessandro Tagliapietra
On Sat, Dec 7, 2019 a
g pops out
Thanks
--
Alessandro Tagliapietra
On Sat, Dec 7, 2019 at 10:02 AM John Roesler wrote:
> Hmm, that’s a good question. Now that we’re talking about caching, I
> wonder if the cache was just too small. It’s not very big by default.
>
> On Sat, Dec 7, 2019, at 11:16, Alessan
ore memory available? Any other
possible reason?
Thank you so much for your help
--
Alessandro Tagliapietra
On Sat, Dec 7, 2019 at 9:14 AM John Roesler wrote:
> Ah, yes. Glad you figured it out!
>
> Caching does not reduce EOS guarantees at all. I highly recommend using
> it. You might ev
Never mind I've found out I can use `.withCachingEnabled` on the store
builder to achieve the same thing as the windowing example as
`Materialized.as` turns that on by default.
Does caching in any way reduces the EOS guarantees?
--
Alessandro Tagliapietra
On Sat, Dec 7, 2019 at 1:
dn't every time it aggregates send a
record to the changelog?
- is the windowing doing some internal caching like not sending every
aggregation record until the window time is passed? (if so, where can I
find that code since I would like to use that also for our new
implementation)
Thank you
ometheus-jmx
monitoring so we can keep better track of what's going on :)
--
Alessandro Tagliapietra
On Tue, Dec 3, 2019 at 9:12 PM John Roesler wrote:
> Oh, yeah, I remember that conversation!
>
> Yes, then, I agree, if you're only storing state of the most recent window
&g
read a single value
per key (if the cleanup just happened) right?
--
Alessandro Tagliapietra
On Tue, Dec 3, 2019 at 8:51 PM John Roesler wrote:
> Hey Alessandro,
>
> That sounds also like it would work. I'm wondering if it would actually
> change what you observe w.r.t. recovery be
mpact only changelog topic should keep the rebuild data to
the minimum as it would have only the last value for each key.
Hope that makes sense
Thanks again
--
Alessandro Tagliapietra
On Tue, Dec 3, 2019 at 3:04 PM John Roesler wrote:
> Hi Alessandro,
>
> To take a stab at your quest
#x27;t find the checkpoint and then it does find it?
It seems it loaded about 2.7M records (sum of offset difference in the
"restorting partition " messages) right?
Maybe should I try to reduce the checkpoint interval?
Regards
--
Alessandro Tagliapietra
On Mon, Dec 2, 2019 at 9:
s (a custom suppressor basically) is there a way to only
keep the last window so that the store rebuilding goes faster and without
rebuilding old windows too? Or should I create a custom window using the
original key as key so that the compaction keeps only the last window data?
Thank you
--
Alessandro Tagliapietra
You can achieve exactly once on a consumer by enabling read committed and
manually committing the offset as soon as you receive a message. That way
you know that at next poll you won't get old message again.
On Fri, Sep 27, 2019, 6:24 AM christopher palm wrote:
> I had a similar question, and ju
.
Without the manual seek it wasn't obvious why it was skipping messages. Now
that I know that I have full control over the flow I can add the exception
handling logic, idempotency or whatever I need, because I know for sure
that I won't lose messages.
Thanks
--
Alessandro Tagliapietra
On
mits the next offset.
In fact, you can see that value 4 is reprocessed multiple times without a
restart and value 20 which throws and exception is reprocessed after a
restart because nothing has been committed.
Now, kafka gurus, is this the best way to achieve this? Wouldn't be better
to have a
The only way this works is if I don't catch the exception, let the consumer
crash and fully restart it.
Maybe the consumer has an internal state that always gets updated when it
receives a message during poll?
--
Alessandro Tagliapietra
On Wed, Sep 25, 2019 at 7:37 PM Alessandro Tagliap
"Committed
offset " part
--
Alessandro Tagliapietra
On Wed, Sep 25, 2019 at 7:09 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> It's still is in the topic because is weeks after the deletion threshold
> (today's message with a 4 weeks retent
th
value 16
it seems that the offset just automatically increases, with another client
I also see all the values in the topic so they're not deleted.
Shouldn't it have retried the message with value 3?
--
Alessandro Tagliapietra
On Wed, Sep 25, 2019 at 6:19 PM Steve Howard
wrote:
use kafka.
That's why I'm thinking if there's something wrong with my code.
--
Alessandro Tagliapietra
On Wed, Sep 25, 2019 at 4:29 PM M. Manna wrote:
> How long is your message retention set for ? Perhaps you want to increase
> that to a large enough value.
>
> I
weeks retention
set for the changelog topic (which also uses compact as policy not delete)
- the stream correctly processed data from that key over and over so I
don't think the producer ID has expired multiple times in minutes
Any help is very appreciated
--
Alessandro Tagliapietra
s way so that I can
confirm that I don't lose messages if I don't manually commit them which
doesn't seem to be the case
Any help is really appreciated
--
Alessandro Tagliapietra
On Wed, Sep 25, 2019 at 2:20 PM M. Manna wrote:
> Hi,
>
> How are you managing your offset com
again.
Instead it seems that after some retries the message is skipped and it goes
one with the next one.
What could be the reason? If in the next loop iteration it gets a message
from another partition it would also commit the offset of the other failed
partition?
Thank you
--
Alessandro Tagliapietra
Thanks a lot Bruno I'll check that!
--
Alessandro Tagliapietra
On Wed, Sep 18, 2019 at 4:20 PM Bruno Cadonna wrote:
> Hi Alessandro,
>
> If you want to get each update to an aggregate, you need to disable
> the cache. Otherwise, an update will only be emitted when the
> agg
ine is never logger.
Since between these lines there's just the groupByKey and WindowedBy, are
there any logics in these two that could stop the flow of data? Since I
don't have any window closing mechanism or suppression shouldn't it just go
through?
Thank you in advance
--
Alessandro Tagliapietra
you mean by “timestamp is per topic and not per key”. Can you
> please elaborate?
>
>
>
>
> > On Sep 11, 2019, at 10:13 PM, Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
> >
> > Did you ever push any data with a greater timestamp tha
Did you ever push any data with a greater timestamp than the current one
you're producing?
One thing took me a while to find out is that the suppress timestamp is per
topic and not per key
--
Alessandro Tagliapietra
On Wed, Sep 11, 2019 at 8:06 PM Thameem Ansari wrote:
> Yes I am abl
make sure I emit a window only when I receive a
new one per each key.
--
Alessandro Tagliapietra
On Wed, Jul 17, 2019 at 9:25 AM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> Seems that completely removing the grace period fixes the problem,
>
> is it ex
Seems that completely removing the grace period fixes the problem,
is it expected? Is the grace period per key or global?
--
Alessandro Tagliapietra
On Wed, Jul 17, 2019 at 12:07 AM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> I've added a reproductio
Alessandro Tagliapietra
On Tue, Jul 16, 2019 at 10:36 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> Actually suppress doesn't matter, it happens later in the code, I've also
> tried to remove that and add a grace period to the window function but the
>
Actually suppress doesn't matter, it happens later in the code, I've also
tried to remove that and add a grace period to the window function but the
issue persists.
--
Alessandro Tagliapietra
On Tue, Jul 16, 2019 at 10:17 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gma
e, plus all messages are ordered and the timestamp extractor
reads the timestamp from the message.
Anyone has any idea on what could be the problem?
--
Alessandro Tagliapietra
oth things I think everything should be working fine now.
Thank you again for your help!
--
Alessandro Tagliapietra
On Wed, Jul 10, 2019 at 5:37 AM Patrik Kleindl wrote:
> Hi
> Regarding the I/O, RocksDB has something called write amplification which
> writes the data to multiple l
ee the impact on our storage usage.
--
Alessandro Tagliapietra
On Tue, Jul 9, 2019 at 6:06 AM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:
> Hi Bruno,
>
> Oh I see, I'll try to add a persistent disk where the local stores are.
> I've other quest
Hi John,
thanks a lot for the great explanation and the links.
After I've sent the question I've researched a bit more about EOS and I'm
currently testing that out.
I'll read those links and see what I come up with!
Thanks and have a great day!
--
Alessandro Tagliapietra
on't persist the aggregate store
> > changelog as I do with the "LastValueStore" store which has
> > "withLoggingEnabled()", but even that store has:
> >
> > Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
> > offset 403910
>
re which has
"withLoggingEnabled()", but even that store has:
Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
offset 403910
Restoring task 0_0's state store LastValueStore from beginning of the
changelog myapp-id-LastValueStore-changelog-0
Thank you everyone in a
, if my topology starts from a stream uses multiple stores before
windowing, if there's an error in the windowing step, what happens to the
stores changes? When the message is reprocessed, will the store be in the
state it was after it processed the message on the first try?
Thank you in a
gging to better understand what's going on.
Have a great day
--
Alessandro Tagliapietra
On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang wrote:
> Honestly I cannot think of an issue that fixed in 2.2.1 but not in 2.2.0
> which could be correlated to your observations:
>
>
> ht
Yes that's right,
could that be the problem? Anyway, so far after upgrading to 2.2.1 from
2.2.0 we didn't experience that problem anymore.
Regards
--
Alessandro Tagliapietra
On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang wrote:
> That's right, but local state is used as a
s to bootstrap from
> scratch. E.g. if you are using K8s for cluster management, you'd better use
> stateful sets to make sure local states are preserves across re-deployment.
>
>
> Guozhang
>
> On Wed, Jun 5, 2019 at 4:52 PM Alessandro Tagliapietra <
> tagliapietra.alessan...
dows it
already processed.
In the meantime I'm trying to upgrade to kafka 2.2.1 to see if I get any
improvement.
--
Alessandro Tagliapietra
On Wed, Jun 5, 2019 at 4:45 PM Guozhang Wang wrote:
> Hello Alessandro,
>
> What did you do for `restarting the app online`? I'm not sur
o the confluent cloud kafka broker.
The problem is that locally everything is working properly, if I restart
the streams app it just continues where it left, if I restart the app
online it reprocesses the whole topic.
That shouldn't happen right?
Thanks in advance
--
Alessandro Tagliapietra
thub.com/alex88/43b72e23bda9e15657b008855e1904db is
the one with most information and the logs on what I was seeing.
Thank you for your help
--
Alessandro Tagliapietra
On Wed, May 8, 2019 at 2:18 AM Bruno Cadonna wrote:
> Hi Alessandro,
>
> Apologies for the late reply.
>
> I tried the c
Hi John,
on Slack Matthias suggested to have my own transform to window the data
myself, I'll have a look into it and the Windows implementation as you
suggested and see what I can do!
Thanks for the advice!
--
Alessandro Tagliapietra
On Tue, May 7, 2019 at 8:45 AM John Roesler wrote:
move 1 second from
the message timestamp but it's not an option I'd like.
Thank you
--
Alessandro Tagliapietra
tion": 3}
S1 with computed metric {"timestamp": 6, "production": 4}
S1 with computed metric {"timestamp": 6, "production": 5}
S1 with computed metric {"timestamp": 6, "production": 6}
S1 with computed metric {"times
able. Are there any snapshot builds
available?
In the meantime I'm trying to create a custom docker image from kafka
source.
Thanks
--
Alessandro Tagliapietra
On Tue, Apr 23, 2019 at 8:52 AM Bruno Cadonna wrote:
> Hi Alessandro,
>
> It seems that the behaviour you described reg
Thanks Matthias, one less thing to worry about in the future :)
--
Alessandro Tagliapietra
On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax
wrote:
> Just a side note. There is currently work in progress on
> https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the
> conf
alueSpecificAvroSerde.configure(serdeConfig, false);
and then in aggregate()
Materialized.with(Serdes.String(), valueSpecificAvroSerde)
fixed the issue.
Thanks in advance for the windowing help, very appreciated.
In the meantime I'll try to make some progress on the rest.
Have a great week
uot;production": 1}
S1 with filtered metric{"timestamp": 161000, "production": 1}
S1 with computed metric {"timestamp": 16, "production": 10}
S1 with filtered metric{"timestamp": 162000, "production": 1}
as you can see, window fo
Hi Bruno,
I'm using the confluent docker images 5.2.1, so kafka 2.2.
Anyway I'll try to make a small reproduction repo with all the different
cases soon.
Thank you
--
Alessandro Tagliapietra
On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna wrote:
> Hi Alessandro,
>
> What
find out the cause of these
issues, probably starting from scratch with a simpler example
Thank you for your help!
--
Alessandro Tagliapietra
On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna wrote:
> Hi Alessandro,
>
> Have a look at this Kafka Usage Pattern for computing averag
that has a field of
type array so I can use the regular avro serializer to implement this.
Should I create my own serdes instead or is this the right way?
Thank you in advance
--
Alessandro Tagliapietra
On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gma
;ve just having some issues with message types since I'm changing
the data type when aggregating the window but I think it's an easy problem.
Thank you again
Best
--
Alessandro Tagliapietra
On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna wrote:
> Hi Alessandro,
>
> the `Transfor
o yourprocessor topology
> <https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology>
> .
is it? Doesn't `transform` need a TransformSupplier while `addProcessor`
uses a ProcessorSupplier?
Thank you again for your help
--
Alessandro Tagliapietra
On
sage of the processor but `.process(...)` returns void, so I
cannot have a KStream from a processor?
Thank you all in advance
--
Alessandro Tagliapietra
70 matches
Mail list logo