Re:Streaming

2018-06-27 Thread sihua zhou
Hi aitozi, I think it can be implemented by window or non-window, but it can not be implemented without keyBy(). A general approach to implement this is as follow. {code} process(Record records) { for (Record record : records) ( if (!isFilter(record)) { agg(record);

Re: Multiple kafka consumers

2018-06-25 Thread sihua zhou
Hi Amol, I think If you set the parallelism of the source node equal to the number of the partition of the kafka topic, you could have per kafka customer per partition in your job. But if the number of the partitions of the kafka is dynamic, the 1:1 relationship might break. I think maybe @Gor

Re: Some doubts related to Rocksdb state backed and checkpointing!

2018-06-24 Thread sihua zhou
Hi Ashwin, I think the questions here might be a bit general and that could make it a bit hard to offer the answer meet your expected exactly, could you please somehow bref outlined your user case here to accossiated with questions, that would definitely make it easier to offer a better help

Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread sihua zhou
r() state for that key. [0] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/state.html Please let me know. Thanks, On Thu, Jun 21, 2018 at 1:19 PM sihua zhou wrote: Hi Garvit, Let's say you clearing the state at timestamp t1, then the checkpoints c

Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread sihua zhou
nly clear a key-value pair of the state currently, you can't cleared the whole state currently. Best, Sihua On 06/21/2018 15:41,Garvit Sharma wrote: So, would it delete all the files in HDFS associated with the cleared state? On Thu, Jun 21, 2018 at 12:58 PM sihua zhou wrote: Hi Garvit,

Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread sihua zhou
I just create a JIRA for this: https://issues.apache.org/jira/browse/FLINK-9633 On 06/21/2018 15:10,Chesnay Schepler wrote: That's quite weird that it tries to us the local file-system. Maybe it derives the FS from state.backend.fs.checckpointdir, but uses it for savepoints.dir? What happens

Re:Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread sihua zhou
Hi Garvit, > Now, let's say, we clear the state. Would the state data be removed from HDFS > too? The state data would not be removed from HDFS immediately, if you clear the state in your job. But after you clearing the state in your job, the later completed checkpoint won't contain the stat

Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread sihua zhou
Hi Yow, I had a look at the related code, I think this seems like a bug. Flink use Checkpoint path's FileSystem to create the output stream for the Savepoint, but in your case the checkpoint & savepoint are not using the same file system. A workaround is to use the same file system for both ch

Re: Ordering of stream from different kafka partitions

2018-06-20 Thread sihua zhou
n the order but only for out of orderness period of time which also increases latency. Cheers, Andrey On 19 Jun 2018, at 14:12, sihua zhou wrote: Hi Amol, I'm not sure whether this is impossible, especially when you need to operate the record in multi parallelism. IMO, in theroy, we can

Re:How to get past "bad" Kafka message, restart, keep state

2018-06-19 Thread sihua zhou
Hi, Flink will reset the kafka offset to the latest successful checkpoint when recovery, but the "bad" message will always raise exception and cause recovery, so it will never be covered by any successful checkpoint, and your job will never skip the record that "bad" message. I think you may

Re:Ordering of stream from different kafka partitions

2018-06-19 Thread sihua zhou
Hi Amol, I'm not sure whether this is impossible, especially when you need to operate the record in multi parallelism. IMO, in theroy, we can only get a ordered stream when there is a single partition of kafka and operate it with a single parallelism in flink. Even in this case, if you on

Re:Checkpoint/ Savepoint usage

2018-06-13 Thread sihua zhou
Hi Rinat, > are my assumptions about checkpoint/ savepoint state usage correct ? Indeed, a bit incorrect, you can also restore the job from a checkpoint. By default, the checkpoint data will be removed if the job finish(maybe canceled by user), but you can configure flink to retain the checkp

Re:Checkpointing Large State to S3

2018-06-09 Thread sihua zhou
Hi Gregory, could you share the TaskManager's log with us? It would be helpful to diagnost the problem. And which version are you using? Best, Sihua On 06/7/2018 06:42,Gregory Fee wrote: Hello Everyone! I am running some streaming Flink jobs using SQL and the table api. I enabled incremen

Re: [DISCUSS] Flink 1.6 features

2018-06-09 Thread sihua zhou
Hi Stephan, Thanks very much for your response! That gave me the confidence to continue to work on the Elastic Filter. But even though we have implemented it(based on 1.3.2) and used it on production for a several months, If there's one commiter is willing to guide me(since it's not a very tri

Re:State life-cycle for different state-backend implementations

2018-06-09 Thread sihua zhou
Hi Rinat, I think there is one configuration {{state.checkpoints.num-retained}} to control the maximum number of completed checkpoints to retain, the default value is 1. So the risk you mentioned should not happen. Refer to https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config

Re:[DISCUSS] Flink 1.6 features

2018-06-04 Thread sihua zhou
Hi Stephan, could you please also consider the "Elastic Filter " feature discussioned in thread http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/PROPOSAL-Introduce-Elastic-Bloom-Filter-For-Flink-td22430.html ? Best, Sihua On 06/4/2018 17:21,Stephan Ewen wrote: Hi Flink Comm

Re: ConcurrentModificationException while accessing managed keyed state

2018-06-02 Thread sihua zhou
Hi Garvit, thanks for you feedback, I see you are using the 1.4.1 with Heap state backend, and there are actually two bugs in 1.4.1 related to the kryo serializer and DefaultOperateStateBackend which may cause the ConcurrentModificationException(when checkpointing), they both have been fixed

Re: ConcurrentModificationException while accessing managed keyed state

2018-06-01 Thread sihua zhou
Hi Garvit, this is unexpected, could you please provide more information about this? - which flink version are you using? - what state backend are you using? - are you using the incremental checkpoint?(in case you used the rocksdb backend) - did you create the customer thread to operate the sta

Re:TimerService/Watermarks and Checkpoints

2018-05-29 Thread sihua zhou
Hi Nara, yes, the watermark in TimerService is not covered by the checkpoint, everytime the job is restarted from a previous checkpoint, it is reset to Long.MIN_VALUE. I can see it a bit tricky to cover it into the checkpoint, especially when we need to support rescaling(it seems not like a pu

Re: does Flink call FullGC to reclaim direct memory mainly occupied by RocksDB

2018-05-25 Thread sihua zhou
Hi, I think each time when canceling the job, flink will close the RocksDB to release the resource held by it. You can find this in RocksDBKeyedStateBackend. Best, Sihua On 05/25/2018 19:27, makeyang wrote: each time when cancel Job does Flink call FullGC to reclaim direct memory mainly occup

Re: Regarding Keyed State of Connected Stream

2018-05-24 Thread sihua zhou
te: Hi Sihua, Thanks for the quick response. Could you please let me know, where can I find more details about it. Thanks, On Fri, May 25, 2018 at 11:04 AM, sihua zhou wrote: Hi Garvit, I think you don't need to lock it, they are executed in the same thread sync. Best, Sihua

Re: Regarding Keyed State of Connected Stream

2018-05-24 Thread sihua zhou
Hi Garvit, I think you don't need to lock it, they are executed in the same thread sync. Best, Sihua 在2018年05月25日 10:26,Garvit Sharma 写道: Hi, Let's consider, I have two keyed streams one for rules and another for data and I have created a connected stream. I am maintaining a managed keyed

Re:Kryo Exception

2018-05-24 Thread sihua zhou
reported issue was caused by https://issues.apache.org/jira/browse/FLINK-9263 (which has a fix included in 1.5.0)? Cheers, Gordon On 25 May 2018 at 11:39:03 AM, sihua zhou (summerle...@163.com) wrote: Hi, this looks like the bug "when duplicating a KryoSerializer does not duplicate regis

Re:Kryo Exception

2018-05-24 Thread sihua zhou
Hi, this looks like the bug "when duplicating a KryoSerializer does not duplicate registered default serializers", and this has been fixed on the branch master, 1.5.0, and 1.4.x. But, unfortunately not included in 1.4.2(because this bug was discovered after 1.4.2 release). @Stefan plz correct me

Re:Efficient Stateful Processing with Time-Series Data and Enrichments

2018-05-23 Thread sihua zhou
Hi Mike, if I'm not misunderstand, you are doing aggregation for every device on the stream. You mentioned that, you want to use the MapState to store the state for each device ID? this is a bit confusing to me, I think what you need maybe a ValueState. In flink, every keyed state(Value, MapStat

Re: Missing MapState when Timer fires after restored state

2018-05-20 Thread sihua zhou
018 08:19,sihua zhou wrote: Sorry for the incorrect information, that's not the case. Best, Sihua On 05/19/2018 07:58,sihua zhou wrote: Hi Juho & Stefan, just a quick summarize, good news is that I think I found the bug, bad news is that we are currently at the RC4..

Re: Missing MapState when Timer fires after restored state

2018-05-18 Thread sihua zhou
Sorry for the incorrect information, that's not the case. Best, Sihua On 05/19/2018 07:58,sihua zhou wrote: Hi Juho & Stefan, just a quick summarize, good news is that I think I found the bug, bad news is that we are currently at the RC4... The bug is here. try (RocksIterat

Re: Missing MapState when Timer fires after restored state

2018-05-18 Thread sihua zhou
start key group is bigger that _state.keyGroupRange.getStartKeyGroup()_. Then, data lost @Stefan, this still need your double check, plz correct me if I'm wrong. Best, Sihua On 05/18/2018 17:29,sihua zhou wrote: Hi Juho, thanks for trying this out. I'm running out of myself now... Let

Re: Missing MapState when Timer fires after restored state

2018-05-18 Thread sihua zhou
alveOutputHandler.handleWatermark(StreamInputProcessor.java:262) ... 7 more On Fri, May 18, 2018 at 11:06 AM, Juho Autio wrote: Thanks Sihua, I'll give that RC a try. On Fri, May 18, 2018 at 10:58 AM, sihua zhou wrote: Hi Juho, would you like to try out the latest RC(http://people.apac

Re: Missing MapState when Timer fires after restored state

2018-05-18 Thread sihua zhou
Hi Juho, would you like to try out the latest RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic" checkpoint? The latest RC includes a fix for the potential silently data lost. If it's the reason, you will see a different exception when you tryin

Re:are there any ways to test the performance of rocksdb state backend?

2018-05-17 Thread sihua zhou
Hi makeyang, there are some cases under _org.apache.flink.contrib.streaming.state.benchmark.*_ that you can refer to. But, I not sure whether it's possible to upgrade the RocksDB to any higher version because the regression of the merge operator, the comments in this PR https://github.com/apa

Re: Missing MapState when Timer fires after restored state

2018-05-16 Thread sihua zhou
int without worrying about if we have a recent enough savepoint available – which we quite often may not have especially when there's a problem that requires upscaling. On Wed, May 16, 2018 at 12:30 PM, sihua zhou wrote: Hi, Juho > If restoring + rescaling a checkpoint is not suppor

Re: Missing MapState when Timer fires after restored state

2018-05-16 Thread sihua zhou
h Stefan only. On Wed, May 16, 2018 at 5:22 AM, sihua zhou wrote: Hi Juho, if I'm not misunderstand, you saied your're rescaling the job from the checkpoint? If yes, I think that behavior is not guaranteed yet, you can find this on the doc https://ci.apache.org/projects/flink/flin

Re: Missing MapState when Timer fires after restored state

2018-05-15 Thread sihua zhou
est, Sihua On 05/16/2018 10:22,sihua zhou wrote: Hi Juho, if I'm not misunderstand, you saied your're rescaling the job from the checkpoint? If yes, I think that behavior is not guaranteed yet, you can find this on the doc https://ci.apache.org/projects/flink/flink-docs-release-1

Re: Missing MapState when Timer fires after restored state

2018-05-15 Thread sihua zhou
Hi Juho, if I'm not misunderstand, you saied your're rescaling the job from the checkpoint? If yes, I think that behavior is not guaranteed yet, you can find this on the doc https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#difference-to-savepoints. So, I no

Re: Missing MapState when Timer fires after restored state

2018-05-14 Thread sihua zhou
Hi Juho, in fact, from your code I can't see any possible that the MapState could be inconsistency with the timer, it's looks like a bug to me, because once the checkpoint's complete and you haven't query the state in a customer thread async, then the result of the checkpoint should be consisten