Re: Possible bug clean up savepoint dump into filesystem and low network IO starting from savepoint

2020-07-20 Thread Till Rohrmann
Hi David, which S3 file system implementation are you using? If I'm not mistaken, then the state backend should try to directly write to the target file system. If this should result in temporary files on your TM, then this might be a problem of the file system implementation. Having access to the

Re: flink app crashed

2020-07-20 Thread Yang Wang
Could you check whether the Flink job has been submitted successfully? You could find some logs like the following in JobManager. Starting execution of job ... Also it will help a lot if you could share the full jobmanager and client log. Best, Yang Rainie Li 于2020年7月16日周四 上午4:03写道: > These

Handle idle kafka source in Flink 1.9

2020-07-20 Thread bat man
Hello, I have a pipeline which consumes data from a Kafka source. Since, the partitions are partitioned by device_id in case a group of devices is down some partitions will not get normal flow of data. I understand from documentation here[1] in flink 1.11 one can declare the source idle - Watermar

Re: How to get flink JobId in runtime

2020-07-20 Thread Congxian Qiu
Hi Sili I'm not sure if there are other ways to get this value properly. Maybe you can try `RuntimeContext.getMetricGroup().getAllVariables().get("")`. Best, Congxian Si-li Liu 于2020年7月20日周一 下午7:38写道: > Hi > > I want to retrieve flink JobId in runtime, for example, during > RichFunction's

Re: How does TaskManager announce JobManager about available ResultPartitions?

2020-07-20 Thread Zhu Zhu
Hi Joseph, The availability of pipelined result partition is notified to JM via scheduleOrUpdateConsumers RPC. Just want to mention that it's better to send such questions to the user mail list. Thanks, Zhu Zhu Fork Joseph 于2020年7月21日周二 上午3:30写道: > Hi, > > According to description in > > https

Re: Flink SQL - Join Lookup Table

2020-07-20 Thread Leonard Xu
Hi, kelly Looks like you want to use fact table(from Kafka) to join a dimension table(From filesystem), dimension table is one kind of Temporal Table, temporal table join syntax you could refer Danny's post[1]. But `FileSystemTableSource` did not implement `LookupTableSource` interface yet wh

Re: Encoding problem in WHERE LIKE

2020-07-20 Thread Danny Chan
Hi, Dongwon ~ > Caused by: org.apache.calcite.sql.parser.SqlParseException: Lexical error at > line 1, column 96.  Encountered The error did report the position, you can take a reference to see which syntax context caused the problem. Best, Danny Chan 在 2020年7月20日 +0800 PM11:10,Dongwon Kim ,写道

Re: Flink SQL - Join Lookup Table

2020-07-20 Thread Danny Chan
Seems you want a temporal table join instead of a two stream join, if that is your request, you should use syntax Join LookupTable FOR SYSTEM_TIME AS OF … See [1] for details. [1]  https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table

Re: Flink SQL - Join Lookup Table

2020-07-20 Thread godfrey he
JIRA: https://issues.apache.org/jira/browse/FLINK-18651 godfrey he 于2020年7月21日周二 上午9:46写道: > hi Kelly, > As the exception message mentioned: currently, we must cast the time > attribute to regular TIMESTAMP type, > then we can do regular join. Because time attribute will be out-of-order > after

Re: Flink SQL - Join Lookup Table

2020-07-20 Thread godfrey he
hi Kelly, As the exception message mentioned: currently, we must cast the time attribute to regular TIMESTAMP type, then we can do regular join. Because time attribute will be out-of-order after regular join, and then we can't do window aggregate based on the time attribute. We can improve it tha

Flink SQL - Join Lookup Table

2020-07-20 Thread Kelly Smith
Hi folks, I have a question Flink SQL. What I want to do is this: * Join a simple lookup table (a few rows) to a stream of data to enrich the stream by adding a column from the lookup table. For example, a simple lookup table: CREATE TABLE LookupTable ( `computeClass` STRING, `m

Re: records-lag-max

2020-07-20 Thread Chen, Mason
I removed an unnecessary `.keyBy()` and I’m getting the metrics again. Is this a potential bug? From: "Chen, Mason" Date: Monday, July 20, 2020 at 12:41 PM To: "user@flink.apache.org" Subject: records-lag-max Hi all, I am having some trouble with the lag metric from the kafka connector. The

Re: Are files in savepoint still needed after restoring if turning on incremental checkpointing

2020-07-20 Thread Lu Niu
Thanks Yun! That's what I thought :) Best Lu On Sun, Jul 19, 2020 at 7:57 PM Yun Tang wrote: > Hi Lu > > Once a new checkpoint is completed when restoring from a savepoint, the > previous savepoint would be useless if you decide to restore from new > checkpoint. > In other words, new incrementa

records-lag-max

2020-07-20 Thread Chen, Mason
Hi all, I am having some trouble with the lag metric from the kafka connector. The gauge value is always reported as NaN although I’m producing events first and then starting the flink job. Anyone know how to fix this? ``` # TYPE flink_taskmanager_job_task_operator_records_lag_max gauge flink_t

Re: AllwindowStream and RichReduceFunction

2020-07-20 Thread Aljoscha Krettek
What are you trying to do in the ReduceFunction? Without knowing the code, maybe an aggregate(AggregateFunction) is the solution. Best, Aljoscha On 20.07.20 18:03, Flavio Pompermaier wrote: Thanks Aljosha for the reply. So what can I do in my reduce function that contains transient variables (

Re: Accumulators in Table API

2020-07-20 Thread Dawid Wysakowicz
Hi Flavio. You don't have access to accumulators in Table API. A few other ways that come to my mind are: 1. Use existing metrics e.g. operator input/output records. 2. Use metrics in a UDF 3. Have a regular count (you can have multiple queries optimized into a single graph via TableEnvironmen

Re: AllwindowStream and RichReduceFunction

2020-07-20 Thread Flavio Pompermaier
Thanks Aljosha for the reply. So what can I do in my reduce function that contains transient variables (i.e. not serializable)? On Mon, Jul 20, 2020 at 4:38 PM Aljoscha Krettek wrote: > Hi Flavio, > > the reason is that under the covers the ReduceFunction will be used as > the ReduceFunction of

Re: Global Hashmap & global static variable.

2020-07-20 Thread Piotr Nowojski
Hi Annemarie, You are missing some basic concepts in Flink, please take a look at [1]. > Weirdly enough it worked fine in my Intellij. It's completely normal. If you are accessing some static variable in your code and you are executing your Flink application in a testing local environment (Intel

Re: Is there a way to use stream API with this program?

2020-07-20 Thread Piotr Nowojski
Hi, I'm afraid that there is not out of the box way of doing this. I've created a ticket [1] to write down and document a discussion that we had about this issue in the past. The issue is that currently, untriggered processing time timers are ignored on end of input and it seems like there might

Re: Status of a job when a kafka source dies

2020-07-20 Thread Aljoscha Krettek
Hi, Flink doesn't do any special failure-handling or retry logic, so it’s up to how the KafkaConsumer is configured via properties. In general Flink doesn’t try to be smart: when something fails an exception fill bubble up that will fail this execution of the job. If checkpoints are enabled t

Re: Encoding problem in WHERE LIKE

2020-07-20 Thread Dongwon Kim
Hi Leonard, You're right; I was missing a single quotation mark before the LIKE. There's no encoding problem at all! Sorry for the confusion. Thanks, Dongwon On Tue, Jul 21, 2020 at 12:00 AM Leonard Xu wrote: > Hi, Kim > > The clause ` LIKE '%양현마을%’ ` should work well, could you post the t

Re: Encoding problem in WHERE LIKE

2020-07-20 Thread Leonard Xu
Hi, Kim The clause ` LIKE '%양현마을%’ ` should work well, could you post the the entire query(or select clause) ? Best Leonard Xu > 在 2020年7月20日,21:49,Dongwon Kim 写道: > > When I execute the following query in .sqlQuery(), > SELECT ... > FROM ... > WHERE location.goalName LIKE '%양현마을%'

Re: Beam flink runner job not keeping up with input rate after downscaling

2020-07-20 Thread Piotr Nowojski
Hi, maxParallelism = -1, the default value, is interpreted as described in the documentation you linked: > The default setting for the maximum parallelism is roughly operatorParallelism + (operatorParallelism / 2) with a lower bound of 128 and an upper bound of 32768. So maxParallelism should be

Re: Flink Jobs are failing for running testcases when trying to build in Jenkins server

2020-07-20 Thread Piotr Nowojski
Hi, It looks like Flink mini (test) cluster has troubles starting up on your Jenkins machine. Frankly, it's hard for me to guess what could be the issue here. 1. Are you following this guideline? [1] 2. Maybe there are some other error messages somewhere else in the logs/stderr/stdout? This 100s

Re: AllwindowStream and RichReduceFunction

2020-07-20 Thread Aljoscha Krettek
Hi Flavio, the reason is that under the covers the ReduceFunction will be used as the ReduceFunction of a ReducingState. And those cannot be rich functions because we cannot provide all the required context "inside" the state backend. You can see how the ReduceFunction is used to create a R

Encoding problem in WHERE LIKE

2020-07-20 Thread Dongwon Kim
Hi, When I execute the following query in .sqlQuery(), > SELECT ... > FROM ... > WHERE location.goalName LIKE '%양현마을%' > I got the following error message > Caused by: org.apache.calcite.sql.parser.SqlParseException: Lexical error > at line 1, column 96. Encountered: "\uc591" (50577), after : "

Simple MDC logs don't show up

2020-07-20 Thread Manish G
Hi All, I have some very simple MDC logs in my flink job: MDC.put("methodName", new Object() {}.getClass().getEnclosingMethod().getName()); MDC.put("className", this.getClass().getSimpleName()); When I run flink job locally, I can see them in the application logs. But when I run the same job on

Re: Flink Cluster Java 11 support

2020-07-20 Thread Yangze Guo
Hi, AFAIK, there is no official image with Java 11. However, I think you could simply build a custom image by changing the base layer[1] to openjdk:11-jre. [1] https://github.com/apache/flink-docker/blob/949e445006c4fc288813900c264847d23d3e33d4/1.11/scala_2.12-debian/Dockerfile Best, Yangze Guo

How to get flink JobId in runtime

2020-07-20 Thread Si-li Liu
Hi I want to retrieve flink JobId in runtime, for example, during RichFunction's open method. Is there anyway to do it? I checked the methods in RuntimeContext and ExecutionConfig, seems I can't get this information from them. Thanks! -- Best regards Sili Liu

Flink Cluster Java 11 support

2020-07-20 Thread Pedro Cardoso
Hello, Are there docker images available for Flink Clusters in Kubernetes that run on Java 11? Thank you. Regards Pedro Cardoso Research Data Engineer pedro.card...@feedzai.com [image: Follow Feedzai on Facebook.] [image: Follow Feedzai on Twitter!]

Re: Kafka Consumer consuming rate suddenly dropped

2020-07-20 Thread Jake
Need some flink kafka consumer log and kafka server log! > On Jul 20, 2020, at 5:45 PM, Mu Kong wrote: > > Hi, community > > I have a flink application consuming from a kafka topic with 60 partitions. > The parallelism of the source is set to 60, same with the topic partition > number. > The

Fwd: Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Danny Chan
Best, Danny Chan -- 转发信息 -- 发件人: Danny Chan 日期: 2020年7月20日 +0800 PM4:51 收件人: Dongwon Kim 主题: Re: [Table API] how to configure a nested timestamp field > Or is it possible you pre-define a catalog there and register through the SQL > CLI yaml ? > > Best, > Danny Chan > 在 2020年7月

Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Dongwon Kim
Hi Leonard, Unfortunately the answer is no, the YAML you defined will parse by Table > API and then execute, the root cause of your post error is Table API does > not support computed column now, > there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, > I think DDL is recommend

Re: [Table API] how to configure a nested timestamp field

2020-07-20 Thread Leonard Xu
Hi, Kim > Hi Leonard, > > Can I have a YAML definition corresponding to the DDL you suggested? Unfortunately the answer is no, the YAML you defined will parse by Table API and then execute, the root cause of your post error is Table API does not support computed column now, there is a FLIP u