Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread Benchao Li
这个按照目前的设计,应该不能算是bug,应该是by desigh的。 主要问题还是因为有两层agg,第一层的agg的retract会导致第二层的agg重新计算和下发结果。 dixingxing85 于2020年4月18日 周六上午11:38写道: > 多谢benchao, > 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如: > 20200417,86 > 20200417,90 > 20200417,130 > 20200417,131 > > 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的: > 20200417,90 > 202004

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-17 Thread tison
Hi Yang, Name filtering & schema special handling makes sense for me. We can enrich later if there is requirement without breaking interface. For #1, from my perspective your first proposal is having an option specifies remote flink/lib, then we turn off auto uploading local flink/lib and regi

Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread dixingxing85
多谢benchao, 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如: 20200417,86 20200417,90 20200417,130 20200417,131 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的: 20200417,90 20200417,86 20200417,130 20200417,86 20200417,131 我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。 如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧? Sent

Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread Benchao Li
Hi, 这个是支持的哈。 你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new]. 如果是两层的话,就成了: 第一层-[old], 第二层-[cur], +[old] 第一层+[new], 第二层[-old], +[new] dixingxin...@163.com 于2020年4月18日周六 上午2:11写道: > > Hi all: > > 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,*我们想确认下,这是否是个bug, > 或者flink还不支持这种sql*。 > 具体场景是:先gr

Re: Checkpoints for kafka source sometimes get 55 GB size (instead of 2 MB) and flink job fails during restoring from such checkpoint

2020-04-17 Thread Jacob Sevart
This sounds a lot like an issue I just went through ( http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Very-large-metadata-file-td33356.html). Are you using a union list state anywhere? You could also use the debugging steps mentioned in that thread to inspect the contents of th

Re: Akka Error

2020-04-17 Thread tison
If you run a program using "flink run" in dist/bin, dependencies should be taken care of. Could you describe detailedly how you "start a flink program"? Did you write an entrypoint, compile it and run by "java YouProgram"? If so, you should configure classpath by yourself. Best, tison. Alexande

Re: multi-sql checkpoint fail

2020-04-17 Thread tison
Hi, Could you share the stack traces? Best, tison. forideal 于2020年4月18日周六 上午12:33写道: > Hello friend > I have two SQL, checkpoint fails all the time. One task is to open a > sliding window for an hour, and then another task consumes the output data > of the previous task. There will be no prob

Re: Flink 1.10 Out of memory

2020-04-17 Thread Zahid Rahman
https://betsol.com/java-memory-management-for-java-virtual-machine-jvm/ Backbutton.co.uk ¯\_(ツ)_/¯ ♡۶Java♡۶RMI ♡۶ Make Use Method {MUM} makeuse.org On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard < lassenedergaardfl...@gmail.com> wrote: > Hi. > > We have migrated to

Akka Error

2020-04-17 Thread Alexander Borgschulze
When I try to start a flink program, I get the following exception:   com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.version'         at com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152)         at com.typesafe.config.impl.Simple

Re: Debug Slowness in Async Checkpointing

2020-04-17 Thread Robert Metzger
Hi, did you check the TaskManager logs if there are retries by the s3a file system during checkpointing? I'm not aware of any metrics in Flink that could be helpful in this situation. Best, Robert On Tue, Apr 14, 2020 at 12:02 AM Lu Niu wrote: > Hi, Flink users > > We notice sometimes async ch

Re: Checkpoint Space Usage Debugging

2020-04-17 Thread Yun Tang
Hi Kent You can view checkpoint details via web UI to know how much checkpointed data uploaded for each operator, and you can compare the state size as time goes on to see whether they upload checkpointed data in stable range. Best Yun Tang From: Kent Murra Sen

Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread dixingxin...@163.com
Hi all: 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,我们想确认下,这是否是个bug, 或者flink还不支持这种sql。 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A -> dt, B -> pvareaid) SELECT dt, SUM(a.uv) AS uv FROM ( SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv FROM streaming_log_event

Checkpoint Space Usage Debugging

2020-04-17 Thread Kent Murra
I'm looking into a situation where our checkpoint sizes are automatically growing over time. I'm unable to pinpoint exactly why this is happening, and it would be great if there was a way to figure out how much checkpoint space is attributable to each operator so I can narrow it down. Are there a

Re: Flink upgrade to 1.10: function

2020-04-17 Thread seeksst
Hi, Thank you for reply. I find it caused by SqlStdOperatorTable and have tried many ways to change it, but failed. Finally, I decided to copy it and renamed. Another thing that caught my attention is that i also define last_value function which args is same to SqlStdOperatorTable, and

multi-sql checkpoint fail

2020-04-17 Thread forideal
Hello friend I have two SQL, checkpoint fails all the time. One task is to open a sliding window for an hour, and then another task consumes the output data of the previous task. There will be no problem with the two tasks submitted separately. -- first Calculation -- second Write the calculatio

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-17 Thread Yang Wang
Hi tison, For #3, if you mean registering remote HDFS file as local resource, we should make the "-yt/--yarnship" to support remote directory. I think it is the right direction. For #1, if the users could ship remote directory, then they could also specify like this "-yt hdfs://hdpdev/flink/relea

Re: Flink upgrade to 1.10: function

2020-04-17 Thread Jark Wu
Hi, I guess you are already close to the truth. Since Flink 1.10 we upgraded Calcite to 1.21 which reserves JSON_VALUE as keyword. So that a user define function can't use this name anymore. That's why JSON_VALUE(...) will always be parsed as the Calcite's builtin function definition. Currently, I

Re: Can I use Joda-Time in Flink?

2020-04-17 Thread tison
Hi Alexander, What do you mean exactly? Could you describe it in pseudo code? I'm not quite sure where Java-Time used in env. Best, tison. Alexander Borgschulze 于2020年4月17日周五 下午9:21写道: > Can I use Joda-Time instead of Java-Time and set it up in the > StreamExecutionEnvironment? >

Can I use Joda-Time in Flink?

2020-04-17 Thread Alexander Borgschulze
Can I use Joda-Time instead of Java-Time and set it up in the StreamExecutionEnvironment?

Flink 1.10 Out of memory

2020-04-17 Thread Lasse Nedergaard
Hi. We have migrated to Flink 1.10 and face out of memory exception and hopeful can someone point us in the right direction. We have a job that use broadcast state, and we sometimes get out memory when it creates a savepoint. See stacktrack below. We have assigned 2.2 GB/task manager and configur

Re: Flink upgrade to 1.10: function

2020-04-17 Thread Till Rohrmann
Hi, thanks for reporting these problems. I'm pulling in Timo and Jark who are working on the SQL component. They might be able to help you with your problem. Cheers, Till On Thu, Apr 16, 2020 at 11:10 AM seeksst wrote: > Hi, All > > > Recently, I try to upgrade flink from 1.8.2 to 1.10, but i

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-17 Thread tison
Hi Yang, I agree that these two of works would benefit from single assignee. My concern is as below 1. Both share libs & remote flink dist/libs are remote ship files. I don't think we have to implement multiple codepath/configuration. 2. So, for concept clarification, there are (1) an option to

Re: Flink job didn't restart when a task failed

2020-04-17 Thread Till Rohrmann
Keep us posted once you caught the problem in the act. This would help to debug/understand this problem tremendously. Cheers, Till On Wed, Apr 15, 2020 at 8:44 AM Zhu Zhu wrote: > Sorry I made a mistake. Even if it's the case I had guessed, you will not > get a log "Task {} is already in state

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-17 Thread Till Rohrmann
Hi Yang, from what I understand it sounds reasonable to me. Could you sync with Tison on FLINK-14964 on how to proceed. I'm not super deep into these issues but they seem to be somewhat related and Tison already did some implementation work. I'd say it be awesome if we could include this kind of

Re: AvroParquetWriter issues writing to S3

2020-04-17 Thread Arvid Heise
Hi Diogo, I saw similar issues already. The root cause is always users actually not using any Flink specific stuff, but going to the Parquet Writer of Hadoop directly. As you can see in your stacktrace, there is not one reference to any Flink class. The solution usually is to use the respective F

Re: Schema with TypeInformation or DataType

2020-04-17 Thread tison
Thanks for your inputs and sorry that I said Schema doesn't support DataType to register a field because I was looking into Flink 1.9 codes... Best, tison. Jark Wu 于2020年4月17日周五 下午2:42写道: > Hi Tison, > > Migration from TypeInformation to DataType is a large work and will across > many releases

Re: How watermark is generated in sql DDL statement

2020-04-17 Thread Benchao Li
Actually, BoundedOutOfOrderWatermarkGenerator is only used in tests, the real WatermarkGenerator is code generated in WatermarkGeneratorCodeGenerator lec ssmi 于2020年4月17日周五 下午5:19写道: > I think you are all right. I have I checked the source code of > WatermarkAssignerOperator, and I have found t

Re: How watermark is generated in sql DDL statement

2020-04-17 Thread lec ssmi
I think you are all right. I have I checked the source code of WatermarkAssignerOperator, and I have found the WatermarkGenerator passed in WatermarkAssignerOperator is the interface WatermarkGenerator. And BoundedOutOfOrderWatermarkGenerator is the only implementation class of WatermarkGene

Re: How watermark is generated in sql DDL statement

2020-04-17 Thread Benchao Li
WatermarkAssignerOperator is an inner mechanism for generating watermarks. The "Bounded Out of Orderness" is just one kind of the watermark expressions, which is most commonly used. The main logic of WatermarkAssignerOperator is: - keep currentWatermark and lastWatermark - when each element comes

Re: How watermark is generated in sql DDL statement

2020-04-17 Thread lec ssmi
Maybe you are all right. I was more confused . As the cwiki said, flink could use BoundedOutOfOrderTimestamps , [image: image.png] but I have heard about WatermarkAssignerOperator from Blink developers. Benchao Li 于2020年4月17日周五 下午4:33写道: > Hi lec ssmi, > > It's a good question. In blink plann

Re: Jars replication

2020-04-17 Thread Chesnay Schepler
My apologies, I remembered wrong. The jar endpoints all require working against the leading job master unfortunately. On 17/04/2020 10:18, Andrés Garagiola wrote: Thank Chesnay, I'm invoking the "/jars" endpoint in both JMs and only one of them answers with the uploaded jar. If I try to send

Re: How watermark is generated in sql DDL statement

2020-04-17 Thread Benchao Li
Hi lec ssmi, It's a good question. In blink planner, we use code gen to handle watermark expression. And in `WatermarkAssignerOperator` we calculate current watermark when each element comes in. If the watermark - lastEmitedWatermark > watermark interval, we will emit the new watermark. So it's n

Re: Jars replication

2020-04-17 Thread Chesnay Schepler
Jars are not replicated to all JobManagers, this is currently expected, but generally undesirable for the use-case you out-lined. IIRC the important part is that the upload goes directly against the leader, the run request can be sent anywhere and it will be redirected internally to the leader

Jars replication

2020-04-17 Thread Andrés Garagiola
Hi, I'm configuring a Flink cluster with high availability based on ZooKeeper and two Job Managers. When I upload a jar using the /jars/upload REST API, I don't get the jar replicated in both JMs. Is this the expected behavior? I want to configure the cluster in such a way that once the jar is u

How watermark is generated in sql DDL statement

2020-04-17 Thread lec ssmi
Hi: In sql API , the declaration of watermark is realized by ddl statement . But which way is it implemented? * PeriodicWatermark * or *PunctuatedWatermark*? There seems to be no explanation on the official website. Thanks.