Re: Recommended approach to debug this

2019-09-23 Thread Biao Liu
Hi Zili, Thanks for pointing that out. I didn't realize that it's a REST API based case. Debasish's case has been discussed not only in this thread... It's really hard to analyze the case without the full picture. I think the reason of why `ProgramAbortException` is not caught is that he did som

Re: Flink job manager doesn't remove stale checkmarks

2019-09-23 Thread Biao Liu
Hi Clay, Sorry I don't get your point. I'm not sure what the "stale checkmarks" exactly means. The HA storage and checkpoint directory left after shutting down cluster? Thanks, Biao /'bɪ.aʊ/ On Tue, 24 Sep 2019 at 03:12, Clay Teeter wrote: > I'm trying to get my standalone cluster to remove

Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-23 Thread Zhu Zhu
Steven, In my mind, Flink counter only stores its accumulated count and reports that value. Are you using an external counter directly? Maybe Flink Meter/MeterView is what you need? It stores the count and calculates the rate. And it will report its "count" as well as "rate" to external metric ser

Re: Recommended approach to debug this

2019-09-23 Thread Zili Chen
Hi Biao, The log below already infers that the job was submitted via REST API and I don't think it matters. at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$ JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.l

Re: Recommended approach to debug this

2019-09-23 Thread Biao Liu
> We submit the code through Kubernetes Flink Operator which uses the REST API to submit the job to the Job Manager So you are submitting job through REST API, not Flink client? Could you explain more about this? Thanks, Biao /'bɪ.aʊ/ On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh wrote: > Hi

Re: Is there a lifecycle listener that gets notified when a topology starts/stops on a task manager

2019-09-23 Thread Zhu Zhu
Hi Stephen, I think disposing static components in the closing stage of a task is required. This is because your code(operators/UDFs) is part of the task, namely that it can only be executed when the task is not disposed. Thanks, Zhu Zhu Stephen Connolly 于2019年9月24日周二 上午2:13写道: > Currently the

Re: Is there a lifecycle listener that gets notified when a topology starts/stops on a task manager

2019-09-23 Thread Dian Fu
AFAIK, RichFunction is the only way you could take for this purpose. It's designed for life cycle management of functions. Regards, Dian > 在 2019年9月24日,上午2:13,Stephen Connolly 写道: > > Currently the best I can see is to make *everything* a Rich... and hook into > the open and close methods...

Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-23 Thread Steven Wu
When we setup alert like "fullRestarts > 1" for some rolling window, we want to use counter. if it is a Gauge, "fullRestarts" will never go below 1 after a first full restart. So alert condition will always be true after first job restart. If we can apply a derivative to the Gauge value, I guess al

Re: Per Key Grained Watermark Support

2019-09-23 Thread Sameer Wadkar
You could still handle late data. Just keep state around longer ( within a predefined lateness interval). Say your time window is a tumbling window of 5 mins and your events for a key are allowed to arrive 30 mins late, keep events around for 35 mins before evicting them from state. It means y

Re: Recommended approach to debug this

2019-09-23 Thread Debasish Ghosh
Hi Dian - We submit one job through the operator. We just use the following to complete a promise when the job completes .. Try { createLogic.executeStreamingQueries(ctx.env) }.fold( th ⇒ completionPromise.tryFailure(th), _ ⇒ completionPromise.trySuccess(Dun)

Flink job manager doesn't remove stale checkmarks

2019-09-23 Thread Clay Teeter
I'm trying to get my standalone cluster to remove stale checkmarks. The cluster is composed of a single job and task manager backed by rocksdb with high availability. The configuration on both the job and task manager are: state.backend: rocksdb state.checkpoints.dir: file:///opt/ha/49/checkpoin

Flink child job running on a kerberized cluster

2019-09-23 Thread Imami,Taariq
I am trying to run a flink application through oozie on a kerberized Hadoop cluster (Flink version 1.7.2 and the hadoop jar we run with is hadoop-common-2.6.0-cdh5.14.0.jar). We are getting a GSS exception when a child job is launched. We confirmed through shell actions that we have valid Kerber

Re: Is there a lifecycle listener that gets notified when a topology starts/stops on a task manager

2019-09-23 Thread Stephen Connolly
Currently the best I can see is to make *everything* a Rich... and hook into the open and close methods... but feels very ugly. On Mon 23 Sep 2019 at 15:45, Stephen Connolly < stephen.alan.conno...@gmail.com> wrote: > We are using a 3rd party library that allocates some resources in one of > ou

Re: Recommended approach to debug this

2019-09-23 Thread Dian Fu
Hi Debasish, In which case will the exception occur? Does it occur when you submit one job at a time or when multiple jobs are submitted at the same time? I'm asking this because I noticed that you used Future to execute the job unblocking. I guess ThreadLocal doesn't work well in this case.

Re: Recommended approach to debug this

2019-09-23 Thread Debasish Ghosh
Hi tison - Please find my response below in >>. regards. On Mon, Sep 23, 2019 at 6:20 PM Zili Chen wrote: > Hi Debasish, > > The OptimizerPlanEnvironment.ProgramAbortException should be caught at > OptimizerPlanEnvironment#getOptimizedPlan > in its catch (Throwable t) branch. > >> true but wh

Re: How to prevent from launching 2 jobs at the same time

2019-09-23 Thread David Morin
Thanks Till, Perfect. I gonna use RestClusterClient with listJobs It should work perfectly for my need Cheers David On 2019/09/23 12:36:46, Till Rohrmann wrote: > Hi David, > > you could use Flink's RestClusterClient and call #listJobs to obtain the > list of jobs being executed on the cluste

Is there a lifecycle listener that gets notified when a topology starts/stops on a task manager

2019-09-23 Thread Stephen Connolly
We are using a 3rd party library that allocates some resources in one of our topologies. Is there a listener or something that gets notified when the topology starts / stops running in the Task Manager's JVM? The 3rd party library uses a singleton, so I need to initialize the singleton when the f

Can I cross talk between environments

2019-09-23 Thread srikanth flink
Hi, I'm using Java code to source from Kafka, streaming to table and registered the table. I understand that I have started the StreamExecutionEnvironment and execution. Is there a way that I could access the registered table/temporal function from SQL client? Thanks Srikanth

Re: What is the right way to use the physical partitioning strategy in Data Streams?

2019-09-23 Thread Biao Liu
Wow, that's really cool! There are indeed a lot works you have done. IMO it's beyond the scope of user group somewhat. Just one small concern, I'm not sure I have fully understood your way of "tackle data skew by altering the way Flink partition keys using KeyedStream". >From my understanding, ke

Re: Per Key Grained Watermark Support

2019-09-23 Thread bupt_ljy
Hi Congxian, Thanks but by doing that, we will lose some features like output of the late data. Original Message Sender: Congxian Qiu Recipient: Lasse Nedergaard Cc: 廖嘉逸; user@flink.apache.org; d...@flink.apache.org Date: Monday, Sep 23, 2019 19:56 Subject: Re: Per Key Grained Watermark Supp

Re: What is the right way to use the physical partitioning strategy in Data Streams?

2019-09-23 Thread Felipe Gutierrez
I`ve implemented a combiner [1] in Flink by extending OneInputStreamOperator in Flink. I call my operator using "transform". It works well and I guess it is useful if I import this operator in the DataStream.java. I just need more to check if I need to touch other parts of the source code. But now

Re: Recommended approach to debug this

2019-09-23 Thread Zili Chen
Hi Debasish, The OptimizerPlanEnvironment.ProgramAbortException should be caught at OptimizerPlanEnvironment#getOptimizedPlan in its catch (Throwable t) branch. It should always throw a ProgramInvocationException instead of OptimizerPlanEnvironment.ProgramAbortException if any exception thrown in

Re: What is the right way to use the physical partitioning strategy in Data Streams?

2019-09-23 Thread Biao Liu
Hi Felipe, If I understand correctly, you want to solve data skew caused by imbalanced key? There is a common strategy to solve this kind of problem, pre-aggregation. Like combiner of MapReduce. But sadly, AFAIK Flink does not support pre-aggregation currently. I'm afraid you have to implement it

Re: How to prevent from launching 2 jobs at the same time

2019-09-23 Thread Till Rohrmann
Hi David, you could use Flink's RestClusterClient and call #listJobs to obtain the list of jobs being executed on the cluster (note that it will also report finished jobs). By providing a properly configured Configuration (e.g. loading flink-conf.yaml via GlobalConfiguration#loadConfiguration) it

Re: Recommended approach to debug this

2019-09-23 Thread Debasish Ghosh
This is the complete stack trace which we get from execution on Kubernetes using the Flink Kubernetes operator .. The boxed error comes from the fact that we complete a Promise with Success when it returns a JobExecutionResult and with Failure when we get an exception. And here we r getting an exce

Re: Recommended approach to debug this

2019-09-23 Thread Dian Fu
Regarding to the code you pasted, personally I think nothing is wrong. The problem is how it's executed. As you can see from the implementation of of StreamExecutionEnvironment.getExecutionEnvironment, it may created different StreamExecutionEnvironment implementations under different scenarios.

Re: Per Key Grained Watermark Support

2019-09-23 Thread Congxian Qiu
Hi There was a discussion about this issue[1], as the previous discussion said at the moment this is not supported out of the box by Flink, I think you can try keyed process function as Lasse said. [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Watermark-for-each-key-td274

Re: What is the right way to use the physical partitioning strategy in Data Streams?

2019-09-23 Thread Felipe Gutierrez
thanks Biao, I see. To achieve what I want to do I need to work with KeyedStream. I downloaded the Flink source code to learn and alter the KeyedStream to my needs. I am not sure but it is a lot of work because as far as I understood the key-groups have to be predictable [1]. and altering this tou

Approach to match join streams to create unique streams.

2019-09-23 Thread srikanth flink
Hi there, I've two streams source Kafka. Stream1 is a continuous data and stream2 is a periodic update. Stream2 contains only one column. *Use case*: Every entry from stream1 should verify if the stream2 has any match. The matched and unmatched records should be separated into new unique streams

Re: Recommended approach to debug this

2019-09-23 Thread Debasish Ghosh
Can it be the case that the threadLocal stuff in https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 does not behave deterministically when we submit job through a Kubernetes Flink opera

Re: Recommended approach to debug this

2019-09-23 Thread Debasish Ghosh
ah .. Ok .. I get the Throwable part. I am using import org.apache.flink.streaming.api.scala._ val env = StreamExecutionEnvironment.getExecutionEnvironment How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ? regards. On Mon, Sep 23, 2019 at 3:53 PM Dian Fu wrote: > Hi D

Re: Recommended approach to debug this

2019-09-23 Thread Dian Fu
Hi Debasish, As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnviro

Re: Recommended approach to debug this

2019-09-23 Thread Debasish Ghosh
Hi Tison - This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka. override def buildExecutionGraph = { val rides: DataStream[TaxiRide] = readStream(inTaxiRide) .filter { ride ⇒ ride.getIsStart().booleanValue }

Job name in logs

2019-09-23 Thread Gaël Renoux
Hello everyone, Is there a way to specify the job name in the logging pattern (since the logging configuration is global for the cluster)? We have two different jobs running on our Flink cluster, and when there's a message it's not obvious which of the two is logging. We can figure it out most of

Question about reading ORC file in Flink

2019-09-23 Thread ShuQi
Hi Guys, The Flink version is 1.9.0. I use OrcTableSource to read ORC file in HDFS and the job is executed successfully, no any exception or error. But some fields(such as tagIndustry) are always null, actually these fields are not null. I can read these fields by direct reading it. Below is m

Re: How to use thin JAR instead of fat JAR when submitting Flink job?

2019-09-23 Thread Dian Fu
Hi Qi Kang, You don't need and also should not package the dependencies of Flink to the job jar. Only application specific dependencies are enough. Regards, Dian > 在 2019年9月23日,下午5:17,Qi Kang 写道: > > Hi, > > According to the documentation of Flink, it seems that fat JAR is recommended > wh

How to use thin JAR instead of fat JAR when submitting Flink job?

2019-09-23 Thread Qi Kang
Hi, According to the documentation of Flink, it seems that fat JAR is recommended when submitting a Flink job. However, the Flink dependencies (as well as other dependencies like Hadoop) are too big in size, thus producing a fat JAR which exceeds 100MB. Is there some way to separate the 'common

Re: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-09-23 Thread Dian Fu
Hi Subbu, The issue you encountered is very similar to the issue which has been fixed in FLINK-10455 [1]. Could you check if that fix could solve your problem? The root cause for that issue is that the method close() has not closed all things. After the method "close()" is called, the classload

Re: What is the right way to use the physical partitioning strategy in Data Streams?

2019-09-23 Thread Biao Liu
Hi Felipe, Flink job graph is DAG based. It seems that you set an "edge property" (partitioner) several times. Flink does not support multiple partitioners on one edge. The later one overrides the priors. That means the "keyBy" overrides the "rebalance" and "partitionByPartial". You could insert

RE: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-09-23 Thread Subramanyam Ramanathan
Hi, I was able to simulate the issue again and understand the cause a little better. The issue occurs when : -One of the RichMapFunction transformations uses a third party library in the open() method that spawns a thread. -The thread doesn’t get properly closed in the close()

Re: How to prevent from launching 2 jobs at the same time

2019-09-23 Thread David Morin
Hi, Thanks for your replies. Yes, it could be useful to have a way to define jobid. Thus, I would have been able to define the jbid based on the name for example. At the moment we do not use the REST API but the cli to submit our jobs on Yarn. Nevertheless, I can implement a little trick: at sta