Re: ProcessFunction collect and close, when to use?

2019-11-28 Thread bupt_ljy
Hi Shuwen, > When to call close() ? After every element processed? Or on > ProcessFunction.close() ? Or never to use it? IMO, the #close() function is used to manage the lifecycle of #Collector instead of a single element. I think it should not be called in user function unless you have som

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-26 Thread bupt_ljy
Hi, I’ve met the exactly same problem recently and solved it in Piotr’s way. @zhijiang, I didn’t see any oom error thrown by JVM (I’m not sure this can be thrown if yarn decides to kill it in a mandatory way). According to our monitoring system, the overusage of memory is from JVM directy memo

Re: RocksDB state on HDFS seems not being cleanned up

2019-11-05 Thread bupt_ljy
This should be sent to user mailing list. Moving it here... Original Message Sender: bupt_ljy Recipient: dev Date: Tuesday, Nov 5, 2019 21:13 Subject: Re: RocksDB state on HDFS seems not being cleanned up Hi Shuwen, The “shared” means that the state files are shared among multiple

Re: Finite source without blocking save-points

2019-11-04 Thread bupt_ljy
Oh the parallelism problem didn’t bother me because we used to set the parallelism of rule source to be one :o). Maybe a more elegant way is hashing the rule emitting by #RuntimeContext#getIndexOfThisSubtask. Best, Jiayi Liao Original Message Sender: Gaël Renoux Recipient: bupt_ljy Cc

Re: Finite source without blocking save-points

2019-11-04 Thread bupt_ljy
Hi Gael, I had a similar situation before. Actually you don’t need to accomplish this in such a complicated way. I guess you’ve already had a rules source and you can send rules in #open function for a startup if your rules source inherit from #RichParallelSourceFunction. Best, Jiayi Liao O

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread bupt_ljy
Hi all, Firstly thanks @tison for bring this up and strongly +1 for the overall design. I’d like to add one more example of "multiple jobs in one program" with what I’m currently working on. I’m trying to run a TPC-DS benchmark testing (including tens of sql query job) on Flink and sufferrin

Re: Setting environment variables of the taskmanagers (yarn)

2019-09-24 Thread bupt_ljy
Hi Richard, You can use dynamic properties to add your environmental variables. Set jobmanager env: e.g. -Dcontainerized.master.env.GOOGLE_APPLICATION_CREDENTIALS=xyz Set taskmanager env: e.g. -Dcontainerized.taskmanager.env.GOOGLE_APPLICATION_CREDENTIALS=xyz Best Regards, Jiayi Liao Origi

Re: Per Key Grained Watermark Support

2019-09-23 Thread bupt_ljy
Hi Congxian, Thanks but by doing that, we will lose some features like output of the late data. Original Message Sender: Congxian Qiu Recipient: Lasse Nedergaard Cc: 廖嘉逸; user@flink.apache.org; d...@flink.apache.org Date: Monday, Sep 23, 2019 19:56 Subject: Re: Per Key Grained Watermark Supp

Re: [ANNOUNCE] Zili Chen becomes a Flink committer

2019-09-11 Thread bupt_ljy
Congratulations! Best, Jiayi Liao Original Message Sender: Till Rohrmann Recipient: dev; user Date: Wednesday, Sep 11, 2019 17:22 Subject: [ANNOUNCE] Zili Chen becomes a Flink committer Hi everyone, I'm very happy to announce that Zili Chen (some of you might also know him as Tison Kun)

[flink-cep] - What is the difference between GroupPattern and Pattern?

2019-02-15 Thread bupt_ljy
Hi, all I notice that we have GroupPattern, which is subclass of Pattern in flink-cep module. I’m not very sure about the meaning of GroupPattern and its necessity. I will appreciate if anyone can list some examples for me to help understand this? Thanks! Best Regards, Jiayi Liao

Re: Singleton in a taskmanager

2018-12-11 Thread bupt_ljy
Hi Chen, They will not be sharing the same singleton. Firstly, the class is referenced by its classloader. And the classloader is bound to task. Therefore, different job’s slots have different classloaders, which means the different task’s class's references are different. Please correct me

Re: Something wrong with the until condition FLINK-CEP

2018-12-10 Thread bupt_ljy
Sorry, it seems that I misunderstood the concept of the composition of the until condition and oneOrMore. Original Message Sender:bupt_ljybupt_...@163.com Recipient:useru...@flink.apache.org Date:Tuesday, Dec 11, 2018 14:00 Subject:Something wrong with the until condition FLINK-CEP Hi all, I

Something wrong with the until condition FLINK-CEP

2018-12-10 Thread bupt_ljy
Hi all, I seem to find a problem of until condition in testGreedyUntilZeroOrMoreWithDummyEventsAfterQuantifier in GreedyITCase.java. I modify the unit test a little bit like this: @Test public void testGreedyUntilZeroOrMoreWithDummyEventsAfterQuantifier() throws Exception { ListStreamRecord

Re: Flink operator UUID and serialVersionUID

2018-11-28 Thread bupt_ljy
t the MapState or all the operators? If I have a map operator which converts DataStreamMyObject to DataStreamTuple2String, MyObject. Will this fail to recover as well if a field is added to MyObject? Jayant Ameta On Wed, Nov 28, 2018 at 3:08 PM bupt_ljy bupt_...@163.com wrote: Hi, It’ll fail be

Re: Flink operator UUID and serialVersionUID

2018-11-28 Thread bupt_ljy
add a field in "MyObject" class. Will the restore fail? If so, how to handle such scenarios? Should I convert the "MyObject" instance in json and store the string? Jayant Ameta On Wed, Nov 28, 2018 at 1:26 PM bupt_ljy bupt_...@163.com wrote: Hi Jayant, If you change the

Re: Flink operator UUID and serialVersionUID

2018-11-27 Thread bupt_ljy
"MyObject" class, would it help to have a serialVersionUID defined? Thanks, Jayant On Wed, Nov 28, 2018 at 12:52 PM bupt_ljy bupt_...@163.com wrote: Hi, Jayant 1. The uuid is an unique identifier for a specific operator, which means that Flink uses the uuid to recognize the ope

Re: Flink operator UUID and serialVersionUID

2018-11-27 Thread bupt_ljy
Hi, Jayant 1. The uuid is an unique identifier for a specific operator, which means that Flink uses the uuid to recognize the operator when restoring. 2. The operator has already implemented the Serializable interface so you don’t need to do it explicitly. 3. The type information of “MyObject”

Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-13 Thread bupt_ljy
nWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more Jayant Ameta On Tue, Nov 13, 2018 at 11:35 AM bupt_ljy bupt_...@163.com wrote: Hi, Jayant The key you specified in getKvState function should be the key of the keyed stream inste

Re: Queryable state when key is UUID - getting Kyro Exception

2018-11-12 Thread bupt_ljy
. Cheers, Till On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy bupt_...@163.com wrote: Hi, Jayant Your code looks good to me. And I’ve tried the serialize/deserialize of Kryo on UUID class, it all looks okay. I’m not very sure about this problem. Maybe you can write a very simple demo to try

Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-30 Thread bupt_ljy
Jayant Ameta wittyam...@gmail.com wrote: MapStateDescriptorUUID, String descriptor = new MapStateDescriptor("rulePatterns", UUID.class, String.class); Jayant Ameta On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy bupt_...@163.com wrote: Hi, Can you show us the descriptor in the c

Re: Unbalanced Kafka consumer consumption

2018-10-30 Thread bupt_ljy
Hi, If I understand your problem correctly, there is a similar JIRA issueFLINK-10348, reported by me. Maybe you can take a look at it. Jiayi Liao,Best Original Message Sender:Gerard garciager...@talaia.io Recipient:fearsome.lucidityfearsome.lucid...@gmail.com Cc:useru...@flink.apache.org Date

Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread bupt_ljy
.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) I am not using any custom serialize as mentioned by Jiayi. Jay

Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread bupt_ljy
Ameta On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy bupt_...@163.com wrote: Hi, It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? Jiayi Liao, Best Original Message Sender:Jayant ametawi

Re: Queryable state when key is UUID - getting Kyro Exception

2018-10-25 Thread bupt_ljy
Hi, It seems that your codes are right. Are you sure that you’re using the same Serializer as the Flink program do? Could you show the serializer in descriptor? Jiayi Liao, Best Original Message Sender:Jayant ametawittyam...@gmail.com Recipient:useru...@flink.apache.org Date:Thursday, Oct 2

Re: How do I initialize the window state on first run?

2018-10-18 Thread bupt_ljy
? Hi Jiayi, This topic has been discussed by others, take a look here for some options by Lyft:https://youtu.be/WdMcyN5QZZQ Rafi On Fri, Oct 12, 2018, 16:51 bupt_ljy bupt_...@163.com wrote: Yes…that’s an option, but it’ll be very complicated because of our storage and business. Now I’m

Re: How do I initialize the window state on first run?

2018-10-12 Thread bupt_ljy
ink's state, then use flink to aggregate the data realtime. bupt_ljy bupt_...@163.com 于2018年10月12日周五 下午3:33写道: Hi, vivo, My Flink program is to aggregate the data of a whole day, assume we start this program on 6:00 am, the default state in the window should be the aggregated result of 0:

Re: How do I initialize the window state on first run?

2018-10-12 Thread bupt_ljy
Cc:useru...@flink.apache.org Date:Friday, Oct 12, 2018 15:13 Subject:Re: How do I initialize the window state on first run? Hi Jiayi, If you don't mind, I would like to ask you what kind of situation do you have in this situation? Thanks, vino. bupt_ljy bupt_...@163.com 于2018年10月12日周五 下午1

How do I initialize the window state on first run?

2018-10-11 Thread bupt_ljy
Hi, I’m going to run a new Flink program with some initialized window states. I can’t see there is an official way to do this, right? I’ve tried the bravo project, but it doesn’t support FsStateBackend and it costs too much work if we add a new StateBackend in it. Any good ideas about this?

Re: Can rocksDBBackend handle rescaling?

2018-09-13 Thread bupt_ljy
Thanks for your answer, but I still have some questions. Could you tell me why the checkpoint cannot be used for rescaling? From my perspective, the difference between checkpoint and savepoint is nullable externalSavepointLocation. And from this doc:https://flink.apache.org/features/2017/07/04/f

Re: What is the right way to add classpath?

2018-09-12 Thread bupt_ljy
dependencies using maven-shade-plugin or maven-assembly-plugin.Copy the dependency jars to local ${FLINK_HOME}/lib folder.Submit the job with-yt,--yarnship args command, please refer to https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/cli.html#usage Best Yun Tang From: bupt_ljy bupt_...@163

What is the right way to add classpath?

2018-09-12 Thread bupt_ljy
Hi,all My program needs some dependencies before it’s submitted to yarn. Like: ``` stream.filter(new FilterService()).print() env.execute() ``` I use external dependency inFilterService, and the program reports NoClassDefFoundError at org.apache.flink.client.program.PackagedProgram.callM

Re: Deadlock in SafetyNetCloseableRegistry?

2018-09-11 Thread bupt_ljy
Hi, all Sorry for attaching this again. The flink version is 1.6 and the dead lock stack is "CloseableReaperThread" #54 daemon prio=5 os_prio=0 tid=0x7f4d6d3af000 nid=0x32f6 in Object.wait() [0x7f4d3fdfe000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(N

Deadlock in SafetyNetCloseableRegistry?

2018-09-11 Thread bupt_ljy
Hi,all I starts a flink program and it runs on yarn. At first it doesn’t aquire enough resources so this is thrown. “org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 30 ms. Slots required: 16, slots allocated

Re: Cannot set classpath which can be used before job is submitted to yarn.

2018-09-06 Thread bupt_ljy
No sure about the answer. Do you mean that you can't read the file from local FS? Have you ever tried load the file through a full path? or you choose a wrong classloader. Best, Hequn On Thu, Sep 6, 2018 at 11:01 PM bupt_ljy bupt_...@163.com wrote: Hi,all I’m using “bin/flink run -m y

Cannot set classpath which can be used before job is submitted to yarn.

2018-09-06 Thread bupt_ljy
Hi,all I’m using “bin/flink run -m yarn-cluster” to run my program on yarn. However, it seems that I can’t add my own files into classpath before the the job is submitted to yarn. For example, I have a conf file, which located in my own conf directory, and I need to load file from the conf dire