Hi Cam,
This blog post has some pointers in tuning RocksDB memory usage that
might be of help.
https://klaviyo.tech/flinkperf-c7bd28acc67
Ning
On Wed, Aug 7, 2019 at 1:28 PM Cam Mach wrote:
>
> Hello everyone,
>
> What is the most easy and efficiently way to cap RocksDb's memory usage?
>
> Tha
Hi Andrey,
Thank you for the reply.
We are using incremental checkpointing.
Good to know that the incremental cleanup only applies to the heap state
backend. Looks like taking some downtime to take a full savepoint and restore
everything is inevitable.
Thanks,
--
Ning
On Wed, 15 May 2019 10:5
fixed right away. For future reference here is
> the issue [1].
>
> I've also pulled in Stefan who knows these components very well.
>
> [1] https://issues.apache.org/jira/browse/FLINK-12296
>
> Cheers,
> Till
>
>> On Tue, Apr 23, 2019 at 5:24 PM Ning
Hi Congxian,
I think I have figured out the issue. It's related to the checkpoint directory
collision issue you responded to in the other thread. We reproduced this bug on
1.6.1 after unchaining the operators.
There are two stateful operators in the chain, one is a
CoBroadcastWithKeyedOperator, t
On Tue, 23 Apr 2019 10:53:52 -0400,
Congxian Qiu wrote:
> Sorry for the misleading, in the previous email, I just want to say the
> problem is not caused by the UUID generation, it is caused by the different
> operators share the same directory(because currentlyFlink uses JobVertx as
> the direc
Congxian,
We just did a test. Separating the two stateful operators from
chaining seems to have worked around the problem. The states for both
of them are successfully saved in the checkpoint.
Ning
On Tue, Apr 23, 2019 at 7:41 AM Ning Shi wrote:
>
> Congxian,
>
> Thank you for
Congxian,
Thank you for creating the ticket and providing the relevant code. I’m curious
why you don’t think the directory collision is not a problem. What we observe
is that one of the operator states are not included in the checkpoint and data
is lost on restore. That’s a pretty serious probl
We have a Flink job using RocksDB state backend. We found that one of the
RichMapFunction state was not being saved in checkpoints or savepoints. After
some digging, it seems that two operators in the same operator chain are
colliding with each other during checkpoint or savepoint, resulting in one
Congxian,
Thanks for the reply. I will try to get a minimum reproducer and post it to this
thread soon.
Ning
On Sun, 21 Apr 2019 09:27:12 -0400,
Congxian Qiu wrote:
>
> Hi,
> From the given error message, this seems flink can't open RocksDB because
> of the number of column family mismatch, do
worked in Flink 1.6 but not in Flink 1.8. Any
insights would be appreciated.
Ning
On Sat, Apr 20, 2019 at 10:28 PM Ning Shi wrote:
>
> When testing a job on Flink 1.8, we hit the following error during
> resuming from RocksDB checkpoint. This job has been working well on
> Flink 1.6
When testing a job on Flink 1.8, we hit the following error during
resuming from RocksDB checkpoint. This job has been working well on
Flink 1.6.1. The checkpoint was taken using the exact same job on 1.8.
The operator StreamMap_3c5866a6cc097b462de842b2ef91910d it mentioned
in the error message is
Hi Stefan,
Thank you for the confirmation.
Doing a one time cleanup with full snapshot and upgrading to Flink 1.8
could work. However, in our case, the state is quite large (TBs).
Taking a savepoint takes over an hour, during which we have to pause
the job or it may process more events.
The Java
Just wondering if anyone has any insights into the new TTL state cleanup
feature mentioned below.
Thanks,
—
Ning
> On Mar 11, 2019, at 1:15 PM, Ning Shi wrote:
>
> It's exciting to see TTL state cleanup feature in 1.8. I have a question
> regarding the migration of existing
It's exciting to see TTL state cleanup feature in 1.8. I have a question
regarding the migration of existing TTL state to the newer version.
Looking at the Pull Request [1] that introduced this feature, it seems like
that Flink is leveraging RocksDB's compaction filter to remove stale state.
I ass
ntries will expire later relative to their originally saved
> access timestamp.
>
> Best,
> Andrey
>
> > On 5 Dec 2018, at 04:20, Ning Shi wrote:
> >
> > I have a job using TTL map state and RocksDB state backend. If I
> > lengthen the TTL on the map state and
I have a job using TTL map state and RocksDB state backend. If I
lengthen the TTL on the map state and resume the job from savepoint (or
checkpoint assuming I don't change the state backend), will new values
added to that map have the new TTL or will the old TTL in the savepoint
override my changes
> On Nov 6, 2018, at 4:22 PM, Elias Levy wrote:
>
> Also note that there is a pending PR to allow the Cassandra sink to back
> pressure, so that the cluster does not get overwhelmed.
Yes, I’ve been following the development on that pull request. Unfortunately,
we have to go live very soon so
> for rate limiting, would quota at Kafka brokers help?
Thanks, Steven. This looks very promising. I'll try it out.
--
Ning
On Tue, Nov 06, 2018 at 07:44:50PM +0200, Nicos Maris wrote:
> Ning can you provide another example except for rate limiting?
Our main use case and concern is rate limiting because without it we
could potentially overwhelm downstream systems (Cassandra) when the job
plays catch up or replay events
In the job I'm implementing, there are a couple of configuration
variables that I wnat to change at runtime, such as rate limit at the
Kafka source. I know it's possible to use a control stream and join it
with the normal stream to configure things in certain operators, but
this doesn't work for th
I have the following questions regarding savepoint recovery.
- In my job, it takes over 30 minutes to take a savepoint of over 100GB
on 3 TMs. Most time spent after the alignment. I assume it was
serialization and uploading to S3. However, when I resume a new job
from the savepoint, it only
Hi Andrey,
Thank you for the explanation. I think you are right. It is either
kStaleFile or kNoSpace. We found the cause of the issue, even though we
still don't know how to explain it.
We set the java.io.tmpdir to an EBS-backed drive instead of the
default and the exception started happening. Th
Hi,
We are doing some performance testing on a 12 node cluster with 8 task
slots per TM. Every 15 minutes or so, the job would run into the
following exception.
java.lang.IllegalArgumentException: Illegal value provided for SubCode.
at org.rocksdb.Status$SubCode.getSubCode(Status.java:109
Hi Neils,
Thanks for the response.
> I think your problem is that the Cassandra sink doesn't support exactly
> once guarantees when the Cassandra query isn't idempotent. If possible, the
> cleanest solution would be implementing a new or extending the existing
> Cassandra sink with the
> https://
I'm implementing a streaming job that consumes events from Kafka and
writes results to Cassandra. The writes to Cassandra are not
idempotent. In preparation for planned maintenance events like Flink
version upgrade or job update, I'd like to be able to shutdown the job
cleanly.
I understand that c
Yun,
> Then I would share some experience about tuning RocksDB performance. Since
> you did not cache index and filter in block cache, it's no worry about the
> competition between data blocks and index&filter blocks[1]. And to improve
> the read performance, you should increase your block cach
Hi Yun,
> From your description, I think you actually concern more about the overall
> performance instead of the high disk IOPs. Maybe you should first ensure
> whether the job performance degradation is related to RocksDB's performance.
You are right that my main concern is the overall perfor
Hi,
I'm benchmarking a job with large state in various window sizes
(hourly, daily). I noticed that it would consistently slow down after
30 minutes into the benchmark due to high disk read IOPs. The first 30
minutes were fine, with close to 0 disk IOPs. Then after 30 minutes,
read IOPs would grad
Hi Andrey,
> the answer is yes, it is backed by state backend (should be RocksDB if you
> configure it),
> you can trace it through these method calls:
>
> sourceStream.keyBy(…)
> .timeWindow(Time.seconds(…))
> .trigger(CountTrigger.of(…))
> gives you WindowedStream,
> WindowedStream.aggregate(ne
> Back to my first question, is the accumulator state backed by RocksDB state
> backend? If so, I don’t need to use rich function for the aggregate function.
I did some testing and code reading. To answer my own question, the
accumulator state seems to be managed by RocksDB if I use it as the
sta
Hi Vino,
> If you need access to the state API, you can consider using
> ProcessWindowFunction[1], which allows you to use ProcessWindowFunction.
I was hoping that I could use the aggregate function to do incremental
aggregation. My understanding is that ProcessWindowFunction either has to loop
Since the aggregate() function on a keyed window stream does not allow
using rich functions, I can only use an AggregateFunction. Is the
accumulator state of the AggregateFunction backed by RocksDB and
persisted in checkpoints if I use the RocksDB backend. My job looks
like the following,
sourceSt
Hi Konstantin,
> could you replace the Kafka Source by a custom SourceFunction-implementation,
> which just produces the new events in a loop as fast as possible. This way we
> can rule out that the ingestion is responsible for the performance jump or
> the limit at 5000 events/s and can benchm
> If you have a window larger than hours then you need to rethink your
> architecture - this is not streaming anymore. Only because you receive events
> in a streamed fashion you don’t need to do all the processing in a streamed
> fashion.
Thanks for the thoughts, I’ll keep that in mind. Howeve
The application consumes from a single Kafka topic, deserializes the
JSON payload into POJOs and use a big keyed window (30+ days) for
deduplication, then emits the result for every single event to four
other keyed windows for aggregation. It looks roughly like the
following.
Source->KeyBy(A,B,C)
35 matches
Mail list logo