Re: Capping RocksDb memory usage

2019-08-08 Thread Ning Shi
Hi Cam, This blog post has some pointers in tuning RocksDB memory usage that might be of help. https://klaviyo.tech/flinkperf-c7bd28acc67 Ning On Wed, Aug 7, 2019 at 1:28 PM Cam Mach wrote: > > Hello everyone, > > What is the most easy and efficiently way to cap RocksDb's memory usage? > > Tha

Re: Migrating Existing TTL State to 1.8

2019-05-15 Thread Ning Shi
Hi Andrey, Thank you for the reply. We are using incremental checkpointing. Good to know that the incremental cleanup only applies to the heap state backend. Looks like taking some downtime to take a full savepoint and restore everything is inevitable. Thanks, -- Ning On Wed, 15 May 2019 10:5

Re: Missing state in RocksDB checkpoints

2019-04-24 Thread Ning Shi
fixed right away. For future reference here is > the issue [1]. > > I've also pulled in Stefan who knows these components very well. > > [1] https://issues.apache.org/jira/browse/FLINK-12296 > > Cheers, > Till > >> On Tue, Apr 23, 2019 at 5:24 PM Ning

Re: Error restoring from checkpoint on Flink 1.8

2019-04-23 Thread Ning Shi
Hi Congxian, I think I have figured out the issue. It's related to the checkpoint directory collision issue you responded to in the other thread. We reproduced this bug on 1.6.1 after unchaining the operators. There are two stateful operators in the chain, one is a CoBroadcastWithKeyedOperator, t

Re: Missing state in RocksDB checkpoints

2019-04-23 Thread Ning Shi
On Tue, 23 Apr 2019 10:53:52 -0400, Congxian Qiu wrote: > Sorry for the misleading, in the previous email, I just want to say the > problem is not caused by the UUID generation, it is caused by the different > operators share the same directory(because currentlyFlink uses JobVertx as > the direc

Re: Missing state in RocksDB checkpoints

2019-04-23 Thread Ning Shi
Congxian, We just did a test. Separating the two stateful operators from chaining seems to have worked around the problem. The states for both of them are successfully saved in the checkpoint. Ning On Tue, Apr 23, 2019 at 7:41 AM Ning Shi wrote: > > Congxian, > > Thank you for

Re: Missing state in RocksDB checkpoints

2019-04-23 Thread Ning Shi
Congxian, Thank you for creating the ticket and providing the relevant code. I’m curious why you don’t think the directory collision is not a problem. What we observe is that one of the operator states are not included in the checkpoint and data is lost on restore. That’s a pretty serious probl

Missing state in RocksDB checkpoints

2019-04-22 Thread Ning Shi
We have a Flink job using RocksDB state backend. We found that one of the RichMapFunction state was not being saved in checkpoints or savepoints. After some digging, it seems that two operators in the same operator chain are colliding with each other during checkpoint or savepoint, resulting in one

Re: Error restoring from checkpoint on Flink 1.8

2019-04-22 Thread Ning Shi
Congxian, Thanks for the reply. I will try to get a minimum reproducer and post it to this thread soon. Ning On Sun, 21 Apr 2019 09:27:12 -0400, Congxian Qiu wrote: > > Hi, > From the given error message, this seems flink can't open RocksDB because > of the number of column family mismatch, do

Re: Error restoring from checkpoint on Flink 1.8

2019-04-20 Thread Ning Shi
worked in Flink 1.6 but not in Flink 1.8. Any insights would be appreciated. Ning On Sat, Apr 20, 2019 at 10:28 PM Ning Shi wrote: > > When testing a job on Flink 1.8, we hit the following error during > resuming from RocksDB checkpoint. This job has been working well on > Flink 1.6

Error restoring from checkpoint on Flink 1.8

2019-04-20 Thread Ning Shi
When testing a job on Flink 1.8, we hit the following error during resuming from RocksDB checkpoint. This job has been working well on Flink 1.6.1. The checkpoint was taken using the exact same job on 1.8. The operator StreamMap_3c5866a6cc097b462de842b2ef91910d it mentioned in the error message is

Re: Migrating Existing TTL State to 1.8

2019-03-15 Thread Ning Shi
Hi Stefan, Thank you for the confirmation. Doing a one time cleanup with full snapshot and upgrading to Flink 1.8 could work. However, in our case, the state is quite large (TBs). Taking a savepoint takes over an hour, during which we have to pause the job or it may process more events. The Java

Re: Migrating Existing TTL State to 1.8

2019-03-13 Thread Ning Shi
Just wondering if anyone has any insights into the new TTL state cleanup feature mentioned below. Thanks, — Ning > On Mar 11, 2019, at 1:15 PM, Ning Shi wrote: > > It's exciting to see TTL state cleanup feature in 1.8. I have a question > regarding the migration of existing

Migrating Existing TTL State to 1.8

2019-03-11 Thread Ning Shi
It's exciting to see TTL state cleanup feature in 1.8. I have a question regarding the migration of existing TTL state to the newer version. Looking at the Pull Request [1] that introduced this feature, it seems like that Flink is leveraging RocksDB's compaction filter to remove stale state. I ass

Re: TTL state migration

2018-12-05 Thread Ning Shi
ntries will expire later relative to their originally saved > access timestamp. > > Best, > Andrey > > > On 5 Dec 2018, at 04:20, Ning Shi wrote: > > > > I have a job using TTL map state and RocksDB state backend. If I > > lengthen the TTL on the map state and

TTL state migration

2018-12-04 Thread Ning Shi
I have a job using TTL map state and RocksDB state backend. If I lengthen the TTL on the map state and resume the job from savepoint (or checkpoint assuming I don't change the state backend), will new values added to that map have the new TTL or will the old TTL in the savepoint override my changes

Re: Live configuration change

2018-11-06 Thread Ning Shi
> On Nov 6, 2018, at 4:22 PM, Elias Levy wrote: > > Also note that there is a pending PR to allow the Cassandra sink to back > pressure, so that the cluster does not get overwhelmed. Yes, I’ve been following the development on that pull request. Unfortunately, we have to go live very soon so

Re: Live configuration change

2018-11-06 Thread Ning Shi
> for rate limiting, would quota at Kafka brokers help? Thanks, Steven. This looks very promising. I'll try it out. -- Ning

Re: Live configuration change

2018-11-06 Thread Ning Shi
On Tue, Nov 06, 2018 at 07:44:50PM +0200, Nicos Maris wrote: > Ning can you provide another example except for rate limiting? Our main use case and concern is rate limiting because without it we could potentially overwhelm downstream systems (Cassandra) when the job plays catch up or replay events

Live configuration change

2018-11-06 Thread Ning Shi
In the job I'm implementing, there are a couple of configuration variables that I wnat to change at runtime, such as rate limit at the Kafka source. I know it's possible to use a control stream and join it with the normal stream to configure things in certain operators, but this doesn't work for th

Questions about Savepoints

2018-11-04 Thread Ning Shi
I have the following questions regarding savepoint recovery. - In my job, it takes over 30 minutes to take a savepoint of over 100GB on 3 TMs. Most time spent after the alignment. I assume it was serialization and uploading to S3. However, when I resume a new job from the savepoint, it only

Re: RocksDB State Backend Exception

2018-10-25 Thread Ning Shi
Hi Andrey, Thank you for the explanation. I think you are right. It is either kStaleFile or kNoSpace. We found the cause of the issue, even though we still don't know how to explain it. We set the java.io.tmpdir to an EBS-backed drive instead of the default and the exception started happening. Th

RocksDB State Backend Exception

2018-10-24 Thread Ning Shi
Hi, We are doing some performance testing on a 12 node cluster with 8 task slots per TM. Every 15 minutes or so, the job would run into the following exception. java.lang.IllegalArgumentException: Illegal value provided for SubCode. at org.rocksdb.Status$SubCode.getSubCode(Status.java:109

Re: Clean shutdown of streaming job

2018-10-24 Thread Ning Shi
Hi Neils, Thanks for the response. > I think your problem is that the Cassandra sink doesn't support exactly > once guarantees when the Cassandra query isn't idempotent. If possible, the > cleanest solution would be implementing a new or extending the existing > Cassandra sink with the > https://

Clean shutdown of streaming job

2018-10-21 Thread Ning Shi
I'm implementing a streaming job that consumes events from Kafka and writes results to Cassandra. The writes to Cassandra are not idempotent. In preparation for planned maintenance events like Flink version upgrade or job update, I'd like to be able to shutdown the job cleanly. I understand that c

Re: RocksDB Read IOPs

2018-09-27 Thread Ning Shi
Yun, > Then I would share some experience about tuning RocksDB performance. Since > you did not cache index and filter in block cache, it's no worry about the > competition between data blocks and index&filter blocks[1]. And to improve > the read performance, you should increase your block cach

Re: RocksDB Read IOPs

2018-09-26 Thread Ning Shi
Hi Yun, > From your description, I think you actually concern more about the overall > performance instead of the high disk IOPs. Maybe you should first ensure > whether the job performance degradation is related to RocksDB's performance. You are right that my main concern is the overall perfor

RocksDB Read IOPs

2018-09-25 Thread Ning Shi
Hi, I'm benchmarking a job with large state in various window sizes (hourly, daily). I noticed that it would consistently slow down after 30 minutes into the benchmark due to high disk read IOPs. The first 30 minutes were fine, with close to 0 disk IOPs. Then after 30 minutes, read IOPs would grad

Re: Aggregator State in Keyed Windowed Stream

2018-09-10 Thread Ning Shi
Hi Andrey, > the answer is yes, it is backed by state backend (should be RocksDB if you > configure it), > you can trace it through these method calls: > > sourceStream.keyBy(…) > .timeWindow(Time.seconds(…)) > .trigger(CountTrigger.of(…)) > gives you WindowedStream, > WindowedStream.aggregate(ne

Re: Aggregator State in Keyed Windowed Stream

2018-09-10 Thread Ning Shi
> Back to my first question, is the accumulator state backed by RocksDB state > backend? If so, I don’t need to use rich function for the aggregate function. I did some testing and code reading. To answer my own question, the accumulator state seems to be managed by RocksDB if I use it as the sta

Re: Aggregator State in Keyed Windowed Stream

2018-09-10 Thread Ning Shi
Hi Vino, > If you need access to the state API, you can consider using > ProcessWindowFunction[1], which allows you to use ProcessWindowFunction. I was hoping that I could use the aggregate function to do incremental aggregation. My understanding is that ProcessWindowFunction either has to loop

Aggregator State in Keyed Windowed Stream

2018-09-09 Thread Ning Shi
Since the aggregate() function on a keyed window stream does not allow using rich functions, I can only use an AggregateFunction. Is the accumulator state of the AggregateFunction backed by RocksDB and persisted in checkpoints if I use the RocksDB backend. My job looks like the following, sourceSt

Re: Low Performance in High Cardinality Big Window Application

2018-08-31 Thread Ning Shi
Hi Konstantin, > could you replace the Kafka Source by a custom SourceFunction-implementation, > which just produces the new events in a loop as fast as possible. This way we > can rule out that the ingestion is responsible for the performance jump or > the limit at 5000 events/s and can benchm

Re: Low Performance in High Cardinality Big Window Application

2018-08-27 Thread Ning Shi
> If you have a window larger than hours then you need to rethink your > architecture - this is not streaming anymore. Only because you receive events > in a streamed fashion you don’t need to do all the processing in a streamed > fashion. Thanks for the thoughts, I’ll keep that in mind. Howeve

Low Performance in High Cardinality Big Window Application

2018-08-26 Thread Ning Shi
The application consumes from a single Kafka topic, deserializes the JSON payload into POJOs and use a big keyed window (30+ days) for deduplication, then emits the result for every single event to four other keyed windows for aggregation. It looks roughly like the following. Source->KeyBy(A,B,C)