[jira] [Commented] (FLINK-6736) Fix UDTF codegen bug when window follow by join( UDTF)
[ https://issues.apache.org/jira/browse/FLINK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028097#comment-16028097 ] sunjincheng commented on FLINK-6736: Actually, we can use a window after a TableFunction was applied when we add new column for `rowtime`. as follows: * Add a new `rowtime` column:(works well) {code} val table = stream.toTable(tEnv,'long, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string, 'rowtime.rowtime) table.join(udtf2('string)).select('*) {code} But If we using already existing column as `rowtime`, UDTF will not work. * Using already existing column as `rowtime`:( cannot work) {code} val table = stream.toTable(tEnv,'long.rowtime, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string) table.join(udtf2('string)).select('*) {code} So I think the problem is how to handle the `rowtime` column when we using already existing column. > Fix UDTF codegen bug when window follow by join( UDTF) > -- > > Key: FLINK-6736 > URL: https://issues.apache.org/jira/browse/FLINK-6736 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > When we run the tableAPI as follows: > {code} > val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, > 'bigdec, 'date,'pojo, 'string) > val windowedTable = table > .join(udtf2('string) as ('a, 'b)) > .window(Slide over 5.milli every 2.milli on 'long as 'w) > .groupBy('w) > .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end) > {code} > We will get the error message: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:377) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.codehaus.commons.compiler.CompileException: Line 77, Column > 62: Unknown variable or type "in2" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11523) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6292) > at org.codehaus.janino.UnitCompiler.access$12900(UnitCompiler.java:209) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5904) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5901) > at org.codehaus.janino.Java$Package.accept(Java.java:4074) > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5901) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6287) > at org.codehaus.janino.UnitCompiler.access$13500(UnitCo
[jira] [Comment Edited] (FLINK-6736) Fix UDTF codegen bug when window follow by join( UDTF)
[ https://issues.apache.org/jira/browse/FLINK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028097#comment-16028097 ] sunjincheng edited comment on FLINK-6736 at 5/29/17 7:35 AM: - Actually, we can use a window after a TableFunction was applied when we add new column for `rowtime`. as follows: * Add a new `rowtime` column:(works well) {code} val table = stream.toTable(tEnv,'long, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string,'rowtime.rowtime) table.join(udtf2('string)).select('*) {code} But If we using already existing column as `rowtime`, UDTF will not work. * Using already existing column as `rowtime`:( cannot work) {code} val table = stream.toTable(tEnv,'long.rowtime, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string) table.join(udtf2('string)).select('*) {code} So I think the problem is how to handle the `rowtime` column when we using already existing column. was (Author: sunjincheng121): Actually, we can use a window after a TableFunction was applied when we add new column for `rowtime`. as follows: * Add a new `rowtime` column:(works well) {code} val table = stream.toTable(tEnv,'long, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string, 'rowtime.rowtime) table.join(udtf2('string)).select('*) {code} But If we using already existing column as `rowtime`, UDTF will not work. * Using already existing column as `rowtime`:( cannot work) {code} val table = stream.toTable(tEnv,'long.rowtime, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string) table.join(udtf2('string)).select('*) {code} So I think the problem is how to handle the `rowtime` column when we using already existing column. > Fix UDTF codegen bug when window follow by join( UDTF) > -- > > Key: FLINK-6736 > URL: https://issues.apache.org/jira/browse/FLINK-6736 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > When we run the tableAPI as follows: > {code} > val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, > 'bigdec, 'date,'pojo, 'string) > val windowedTable = table > .join(udtf2('string) as ('a, 'b)) > .window(Slide over 5.milli every 2.milli on 'long as 'w) > .groupBy('w) > .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end) > {code} > We will get the error message: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:377) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Ca
[jira] [Commented] (FLINK-6695) Activate strict checkstyle in flink-contrib
[ https://issues.apache.org/jira/browse/FLINK-6695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028098#comment-16028098 ] ASF GitHub Bot commented on FLINK-6695: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4004#discussion_r118881130 --- Diff: flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/FiniteRandomSpout.java --- @@ -32,8 +32,8 @@ import java.util.Random; /** - * A Spout implementation that broadcast random numbers across a specified number of output streams, until a specified - * count was reached. + * A Spout implementation that broadcast randoms numbers across a specified number of output streams, until a specified --- End diff -- wow. I really added the `s` to the wrong word... > Activate strict checkstyle in flink-contrib > --- > > Key: FLINK-6695 > URL: https://issues.apache.org/jira/browse/FLINK-6695 > Project: Flink > Issue Type: Sub-task > Components: flink-contrib >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #4004: [FLINK-6695] Activate strict checkstyle in flink-c...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4004#discussion_r118881170 --- Diff: flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java --- @@ -28,7 +28,7 @@ import org.apache.storm.topology.TopologyBuilder; /** - * A simple topology that splits a number stream based the numbers parity, and verifies the result. + * A simple topology that splits a numbers stream based the numbers parity, and verifies the result. --- End diff -- sounds better --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6695) Activate strict checkstyle in flink-contrib
[ https://issues.apache.org/jira/browse/FLINK-6695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028099#comment-16028099 ] ASF GitHub Bot commented on FLINK-6695: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4004#discussion_r118881170 --- Diff: flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java --- @@ -28,7 +28,7 @@ import org.apache.storm.topology.TopologyBuilder; /** - * A simple topology that splits a number stream based the numbers parity, and verifies the result. + * A simple topology that splits a numbers stream based the numbers parity, and verifies the result. --- End diff -- sounds better > Activate strict checkstyle in flink-contrib > --- > > Key: FLINK-6695 > URL: https://issues.apache.org/jira/browse/FLINK-6695 > Project: Flink > Issue Type: Sub-task > Components: flink-contrib >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6736) Fix UDTF codegen bug when window follow by join( UDTF)
[ https://issues.apache.org/jira/browse/FLINK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028097#comment-16028097 ] sunjincheng edited comment on FLINK-6736 at 5/29/17 7:37 AM: - Actually, we can use a window after a TableFunction was applied when we add new column for `rowtime`. as follows: * Add a new `rowtime` column:(works well) {code} val table = stream.toTable(tEnv,'long, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string, 'rt.rowtime) table.join(udtf2('string)).select('*) {code} But If we using already existing column as `rowtime`, UDTF will not work. * Using already existing column as `rowtime`:( cannot work) {code} val table = stream.toTable(tEnv,'long.rowtime, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string) table.join(udtf2('string)).select('*) {code} So I think the problem is how to handle the `rowtime` column when we using already existing column. was (Author: sunjincheng121): Actually, we can use a window after a TableFunction was applied when we add new column for `rowtime`. as follows: * Add a new `rowtime` column:(works well) {code} val table = stream.toTable(tEnv,'long, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string,'rowtime.rowtime) table.join(udtf2('string)).select('*) {code} But If we using already existing column as `rowtime`, UDTF will not work. * Using already existing column as `rowtime`:( cannot work) {code} val table = stream.toTable(tEnv,'long.rowtime, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string) table.join(udtf2('string)).select('*) {code} So I think the problem is how to handle the `rowtime` column when we using already existing column. > Fix UDTF codegen bug when window follow by join( UDTF) > -- > > Key: FLINK-6736 > URL: https://issues.apache.org/jira/browse/FLINK-6736 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > When we run the tableAPI as follows: > {code} > val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, > 'bigdec, 'date,'pojo, 'string) > val windowedTable = table > .join(udtf2('string) as ('a, 'b)) > .window(Slide over 5.milli every 2.milli on 'long as 'w) > .groupBy('w) > .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end) > {code} > We will get the error message: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:377) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by
[GitHub] flink pull request #4004: [FLINK-6695] Activate strict checkstyle in flink-c...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4004#discussion_r118881130 --- Diff: flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/FiniteRandomSpout.java --- @@ -32,8 +32,8 @@ import java.util.Random; /** - * A Spout implementation that broadcast random numbers across a specified number of output streams, until a specified - * count was reached. + * A Spout implementation that broadcast randoms numbers across a specified number of output streams, until a specified --- End diff -- wow. I really added the `s` to the wrong word... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-6736) Fix UDTF codegen bug when window follow by join( UDTF)
[ https://issues.apache.org/jira/browse/FLINK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028097#comment-16028097 ] sunjincheng edited comment on FLINK-6736 at 5/29/17 7:39 AM: - Actually, we can use a window after a TableFunction was applied when we add new column for `rowtime`. as follows: * Add a new `rowtime` column:(works well) {code} val table = stream.toTable(tEnv,'long, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string, 'rt.rowtime) val result = table .join(udtf2('string)) .window(Slide over 5.milli every 2.milli on 'rt as 'w) .groupBy('w) .select('int.count, 'w.start, 'w.end) {code} But If we using already existing column as `rowtime`, UDTF will not work. * Using already existing column as `rowtime`:( cannot work) {code} val table = stream.toTable(tEnv,'long.rowtime, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string) val result = table .join(udtf2('string)) .window(Slide over 5.milli every 2.milli on 'long as 'w) .groupBy('w) .select('int.count, 'w.start, 'w.end) {code} So I think the problem is how to handle the `rowtime` column when we using already existing column. was (Author: sunjincheng121): Actually, we can use a window after a TableFunction was applied when we add new column for `rowtime`. as follows: * Add a new `rowtime` column:(works well) {code} val table = stream.toTable(tEnv,'long, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string, 'rt.rowtime) table.join(udtf2('string)).select('*) {code} But If we using already existing column as `rowtime`, UDTF will not work. * Using already existing column as `rowtime`:( cannot work) {code} val table = stream.toTable(tEnv,'long.rowtime, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string) table.join(udtf2('string)).select('*) {code} So I think the problem is how to handle the `rowtime` column when we using already existing column. > Fix UDTF codegen bug when window follow by join( UDTF) > -- > > Key: FLINK-6736 > URL: https://issues.apache.org/jira/browse/FLINK-6736 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > When we run the tableAPI as follows: > {code} > val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, > 'bigdec, 'date,'pojo, 'string) > val windowedTable = table > .join(udtf2('string) as ('a, 'b)) > .window(Slide over 5.milli every 2.milli on 'long as 'w) > .groupBy('w) > .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end) > {code} > We will get the error message: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(Str
[jira] [Comment Edited] (FLINK-6736) Fix UDTF codegen bug when window follow by join( UDTF)
[ https://issues.apache.org/jira/browse/FLINK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028097#comment-16028097 ] sunjincheng edited comment on FLINK-6736 at 5/29/17 7:40 AM: - Actually, we can use a window after a TableFunction was applied when we add new column for `rowtime`. as follows: * Add a new `rowtime` column:(works well) {code} val table = stream.toTable(tEnv,'long, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string, 'rt.rowtime) table .join(udtf2('string)) .window(Slide over 5.milli every 2.milli on 'rt as 'w) .groupBy('w) .select('int.count, 'w.start, 'w.end) {code} But If we using already existing column as `rowtime`, UDTF will not work. * Using already existing column as `rowtime`:( cannot work) {code} val table = stream.toTable(tEnv,'long.rowtime, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string) table .join(udtf2('string)) .window(Slide over 5.milli every 2.milli on 'long as 'w) .groupBy('w) .select('int.count, 'w.start, 'w.end) {code} So I think the problem is how to handle the `rowtime` column when we using already existing column. was (Author: sunjincheng121): Actually, we can use a window after a TableFunction was applied when we add new column for `rowtime`. as follows: * Add a new `rowtime` column:(works well) {code} val table = stream.toTable(tEnv,'long, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string, 'rt.rowtime) val result = table .join(udtf2('string)) .window(Slide over 5.milli every 2.milli on 'rt as 'w) .groupBy('w) .select('int.count, 'w.start, 'w.end) {code} But If we using already existing column as `rowtime`, UDTF will not work. * Using already existing column as `rowtime`:( cannot work) {code} val table = stream.toTable(tEnv,'long.rowtime, 'int, 'double, 'float, 'bigdec, 'ts, 'date,'pojo, 'string) val result = table .join(udtf2('string)) .window(Slide over 5.milli every 2.milli on 'long as 'w) .groupBy('w) .select('int.count, 'w.start, 'w.end) {code} So I think the problem is how to handle the `rowtime` column when we using already existing column. > Fix UDTF codegen bug when window follow by join( UDTF) > -- > > Key: FLINK-6736 > URL: https://issues.apache.org/jira/browse/FLINK-6736 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > When we run the tableAPI as follows: > {code} > val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, > 'bigdec, 'date,'pojo, 'string) > val windowedTable = table > .join(udtf2('string) as ('a, 'b)) > .window(Slide over 5.milli every 2.milli on 'long as 'w) > .groupBy('w) > .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end) > {code} > We will get the error message: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.op
[GitHub] flink issue #3942: FLINK-6379 Mesos ResourceManager (FLIP-6)
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3942 Thanks for your contribution @EronWright. I will try to review and merge this PR this week. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6379) Implement FLIP-6 Mesos Resource Manager
[ https://issues.apache.org/jira/browse/FLINK-6379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028109#comment-16028109 ] ASF GitHub Bot commented on FLINK-6379: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3942 Thanks for your contribution @EronWright. I will try to review and merge this PR this week. > Implement FLIP-6 Mesos Resource Manager > --- > > Key: FLINK-6379 > URL: https://issues.apache.org/jira/browse/FLINK-6379 > Project: Flink > Issue Type: Sub-task > Components: Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > Given the new ResourceManager of FLIP-6, implement a new > MesosResourceManager. > The minimal effort would be to implement a new resource manager while > continuing to use the various local actors (launch coordinator, task monitor, > etc.) which implement the various FSMs associated with Mesos scheduling. > The Fenzo library would continue to solve the packing problem of matching > resource offers to slot requests. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #4004: [FLINK-6695] Activate strict checkstyle in flink-contrib
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4004 @greghogan fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6695) Activate strict checkstyle in flink-contrib
[ https://issues.apache.org/jira/browse/FLINK-6695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028110#comment-16028110 ] ASF GitHub Bot commented on FLINK-6695: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4004 @greghogan fixed. > Activate strict checkstyle in flink-contrib > --- > > Key: FLINK-6695 > URL: https://issues.apache.org/jira/browse/FLINK-6695 > Project: Flink > Issue Type: Sub-task > Components: flink-contrib >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Trivial > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6754) Savepoints don't respect client timeout config
Gyula Fora created FLINK-6754: - Summary: Savepoints don't respect client timeout config Key: FLINK-6754 URL: https://issues.apache.org/jira/browse/FLINK-6754 Project: Flink Issue Type: Bug Components: Client, Configuration Reporter: Gyula Fora Priority: Trivial Savepoints have a hardcoded timeout: Future response = jobManager.ask(new TriggerSavepoint(jobId, Option.apply(savepointDirectory)), new FiniteDuration(1, TimeUnit.HOURS)); . . . result = Await.result(response, FiniteDuration.Inf()); -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-1313) Add support for out-of-place aggregations for streaming
[ https://issues.apache.org/jira/browse/FLINK-1313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-1313. - Resolution: Won't Fix > Add support for out-of-place aggregations for streaming > --- > > Key: FLINK-1313 > URL: https://issues.apache.org/jira/browse/FLINK-1313 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Minor > > There is an ongoing effort to implement a new aggregation api for batch > processing: https://issues.apache.org/jira/browse/FLINK-1293 > The streaming api should implement the same aggregation logic as well to keep > the two apis as close as possible. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6075) Support Limit/Top(Sort) for Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028132#comment-16028132 ] ASF GitHub Bot commented on FLINK-6075: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/4003 @fhueske - Fine for me! I just wanted to finish this as well :) > Support Limit/Top(Sort) for Stream SQL > -- > > Key: FLINK-6075 > URL: https://issues.apache.org/jira/browse/FLINK-6075 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > Attachments: sort.png > > > These will be split in 3 separated JIRA issues. However, the design is the > same only the processing function differs in terms of the output. Hence, the > design is the same for all of them. > Time target: Proc Time > **SQL targeted query examples:** > *Sort example* > Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL > '3' HOUR) ORDER BY b` > Comment: window is defined using GROUP BY > Comment: ASC or DESC keywords can be placed to mark the ordering type > *Limit example* > Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL > '1' HOUR AND current_timestamp ORDER BY b LIMIT 10` > Comment: window is defined using time ranges in the WHERE clause > Comment: window is row triggered > *Top example* > Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING > LIMIT 10) FROM stream1` > Comment: limit over the contents of the sliding window > General Comments: > -All these SQL clauses are supported only over windows (bounded collections > of data). > -Each of the 3 operators will be supported with each of the types of > expressing the windows. > **Description** > The 3 operations (limit, top and sort) are similar in behavior as they all > require a sorted collection of the data on which the logic will be applied > (i.e., select a subset of the items or the entire sorted set). These > functions would make sense in the streaming context only in the context of a > window. Without defining a window the functions could never emit as the sort > operation would never trigger. If an SQL query will be provided without > limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). > Although not targeted by this JIRA, in the case of working based on event > time order, the retraction mechanisms of windows and the lateness mechanisms > can be used to deal with out of order events and retraction/updates of > results. > **Functionality example** > We exemplify with the query below for all the 3 types of operators (sorting, > limit and top). Rowtime indicates when the HOP window will trigger – which > can be observed in the fact that outputs are generated only at those moments. > The HOP windows will trigger at every hour (fixed hour) and each event will > contribute/ be duplicated for 2 consecutive hour intervals. Proctime > indicates the processing time when a new event arrives in the system. Events > are of the type (a,b) with the ordering being applied on the b field. > `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) > ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `) > ||Rowtime|| Proctime|| Stream1|| Limit 2|| Top 2|| Sort > [ASC]|| > | |10:00:00 |(aaa, 11) | | | >| > | |10:05:00|(aab, 7) | | || > |10-11 |11:00:00 | | aab,aaa |aab,aaa | aab,aaa >| > | |11:03:00 |(aac,21) | | || > > |11-12|12:00:00 | | aab,aaa |aab,aaa | aab,aaa,aac| > | |12:10:00 |(abb,12) | | || > > | |12:15:00 |(abb,12) | | || > > |12-13 |13:00:00 | | abb,abb | abb,abb | > abb,abb,aac| > |...| > **Implementation option** > Considering that the SQL operators will be associated with window boundaries, > the functionality will be implemented within the logic of the window as > follows. > * Window assigner – selected based on the type of window used in SQL > (TUMBLING, SLIDING…) > * Evictor/ Trigger – time or count evictor based on the definition of the > window boundaries > * Apply – window function that sorts data and selects the output to trigger > (based on LIMIT/TOP parameters). All data will be sorted at once and result > outputted when the window is triggered > An alternative implementation can be to use a fold window function to sort > the elements as they arrive, one at a time followed by a flatMap to filter > the number of
[GitHub] flink issue #4003: [FLINK-6075] - Support Limit/Top(Sort) for Stream SQL
Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/4003 @fhueske - Fine for me! I just wanted to finish this as well :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6755) Allow triggering Checkpoints through command line client
Gyula Fora created FLINK-6755: - Summary: Allow triggering Checkpoints through command line client Key: FLINK-6755 URL: https://issues.apache.org/jira/browse/FLINK-6755 Project: Flink Issue Type: New Feature Components: Client, State Backends, Checkpointing Affects Versions: 1.3.0 Reporter: Gyula Fora The command line client currently only allows triggering (and canceling with) Savepoints. While this is good if we want to fork or modify the pipelines in a non-checkpoint compatible way, now with incremental checkpoints this becomes wasteful for simple job restarts/pipeline updates. I suggest we add a new command: ./bin/flink checkpoint [checkpointDirectory] and a new flag -c for the cancel command to indicate we want to trigger a checkpoint: ./bin/flink cancel -c [targetDirectory] Otherwise this can work similar to the current savepoint taking logic, we could probably even piggyback on the current messages by adding boolean flag indicating whether it should be a savepoint or a checkpoint. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6754) Savepoints don't respect client timeout config
[ https://issues.apache.org/jira/browse/FLINK-6754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-6754: -- Affects Version/s: 1.3.0 1.2.1 > Savepoints don't respect client timeout config > -- > > Key: FLINK-6754 > URL: https://issues.apache.org/jira/browse/FLINK-6754 > Project: Flink > Issue Type: Bug > Components: Client, Configuration >Affects Versions: 1.3.0, 1.2.1 >Reporter: Gyula Fora >Priority: Trivial > > Savepoints have a hardcoded timeout: > Future response = jobManager.ask(new TriggerSavepoint(jobId, > Option.apply(savepointDirectory)), new FiniteDuration(1, TimeUnit.HOURS)); > . > . > . > result = Await.result(response, FiniteDuration.Inf()); -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #4006: [FLINK-6478] [doc] Document how to upgrade state s...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4006 [FLINK-6478] [doc] Document how to upgrade state serializers This PR adds documentation on how use custom serializers for managed, as well as handling upgrades to the serializers. We could probably also consider restructuring the "Working with State" page to be a navigation to nested subpages that each cover different topics on working with state, e.g. "Programming API" and "State Serialization", as stuffing all this new detail into a single page might be a bit too overwhelming for new users. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6478 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4006.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4006 commit 46d002c7a81f5f5b2a5648bffbe18e18fda144c5 Author: Tzu-Li (Gordon) Tai Date: 2017-05-29T08:53:11Z [FLINK-6478] [doc] Document how to upgrade state serializers --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6478) Add documentation on how to upgrade serializers for managed state
[ https://issues.apache.org/jira/browse/FLINK-6478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028152#comment-16028152 ] ASF GitHub Bot commented on FLINK-6478: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4006 [FLINK-6478] [doc] Document how to upgrade state serializers This PR adds documentation on how use custom serializers for managed, as well as handling upgrades to the serializers. We could probably also consider restructuring the "Working with State" page to be a navigation to nested subpages that each cover different topics on working with state, e.g. "Programming API" and "State Serialization", as stuffing all this new detail into a single page might be a bit too overwhelming for new users. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6478 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4006.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4006 commit 46d002c7a81f5f5b2a5648bffbe18e18fda144c5 Author: Tzu-Li (Gordon) Tai Date: 2017-05-29T08:53:11Z [FLINK-6478] [doc] Document how to upgrade state serializers > Add documentation on how to upgrade serializers for managed state > - > > Key: FLINK-6478 > URL: https://issues.apache.org/jira/browse/FLINK-6478 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > There needs to be a documentation that explains how to use the new serializer > upgrade APIs in {{TypeSerializer}}, and how the methods work with > checkpoints. This documentation should probably be placed under "Application > development --> Streaming --> Working with State". > Ideally, it should also come with a minimal example for users that perhaps > use serialization frameworks that already have built-in backwards > compatibility (such as Thrift). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028158#comment-16028158 ] ASF GitHub Bot commented on FLINK-2055: --- Github user nragon commented on the issue: https://github.com/apache/flink/pull/2332 From my point of view, my sample works fine (my use case) > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Erli Ding > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user nragon commented on the issue: https://github.com/apache/flink/pull/2332 From my point of view, my sample works fine (my use case) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
Andrea Spina created FLINK-6756: --- Summary: Provide RichAsyncFunction to Scala API suite Key: FLINK-6756 URL: https://issues.apache.org/jira/browse/FLINK-6756 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Andrea Spina I can't find any tracking info about the chance to have a RichAsyncFunction in the Scala API suite. I think it'd be nice to have this function in order to access open/close methods and the RuntimeContext. I was able to retrieve http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 only, so my question is if there are some blocking issues avoiding this feature. [~till.rohrmann] If it's possible and nobody already have done it, I can assign the issue to myself in order to implement it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6757) Investigate Apache Atlas integration
Till Rohrmann created FLINK-6757: Summary: Investigate Apache Atlas integration Key: FLINK-6757 URL: https://issues.apache.org/jira/browse/FLINK-6757 Project: Flink Issue Type: Wish Components: Streaming Connectors Reporter: Till Rohrmann Users asked for an integration of Apache Flink with Apache Atlas. It might be worthwhile to investigate what is necessary to achieve this task. References: http://atlas.incubator.apache.org/StormAtlasHook.html -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #4005: [FLINK-6699] Add checkstyle plugin to flink-yarn-tests po...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4005 merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6699) Activate strict-checkstyle in flink-yarn-tests
[ https://issues.apache.org/jira/browse/FLINK-6699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028162#comment-16028162 ] ASF GitHub Bot commented on FLINK-6699: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4005 merging. > Activate strict-checkstyle in flink-yarn-tests > -- > > Key: FLINK-6699 > URL: https://issues.apache.org/jira/browse/FLINK-6699 > Project: Flink > Issue Type: Sub-task > Components: Tests, YARN >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6699) Activate strict-checkstyle in flink-yarn-tests
[ https://issues.apache.org/jira/browse/FLINK-6699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-6699. --- Resolution: Fixed Checkstyle plugin was added to pom for 1.4 in 89f0ad90b8551a1b617b7f9179b65fe81831970c. > Activate strict-checkstyle in flink-yarn-tests > -- > > Key: FLINK-6699 > URL: https://issues.apache.org/jira/browse/FLINK-6699 > Project: Flink > Issue Type: Sub-task > Components: Tests, YARN >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6699) Activate strict-checkstyle in flink-yarn-tests
[ https://issues.apache.org/jira/browse/FLINK-6699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028165#comment-16028165 ] ASF GitHub Bot commented on FLINK-6699: --- Github user zentol closed the pull request at: https://github.com/apache/flink/pull/4005 > Activate strict-checkstyle in flink-yarn-tests > -- > > Key: FLINK-6699 > URL: https://issues.apache.org/jira/browse/FLINK-6699 > Project: Flink > Issue Type: Sub-task > Components: Tests, YARN >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #4005: [FLINK-6699] Add checkstyle plugin to flink-yarn-t...
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/4005 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6758) Loaded configuration values are logged twice
Chesnay Schepler created FLINK-6758: --- Summary: Loaded configuration values are logged twice Key: FLINK-6758 URL: https://issues.apache.org/jira/browse/FLINK-6758 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 1.3.0, 1.4.0 Reporter: Chesnay Schepler Priority: Trivial When starting a Job- or TaskManager i found the following duplicated lines in the logs {code} 2017-05-29 09:28:07,834 INFO org.apache.flink.runtime.taskmanager.TaskManager - Loading configuration from /home/Zento/rc3/dist/flink-1.3.0/conf 2017-05-29 09:28:07,837 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.address, localhost 2017-05-29 09:28:07,837 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.port, 6123 2017-05-29 09:28:07,837 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.heap.mb, 1024 2017-05-29 09:28:07,837 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.heap.mb, 1024 2017-05-29 09:28:07,837 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.numberOfTaskSlots, 1 2017-05-29 09:28:07,837 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.memory.preallocate, false 2017-05-29 09:28:07,838 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: parallelism.default, 1 2017-05-29 09:28:07,838 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.web.port, 8081 2017-05-29 09:28:07,847 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.address, localhost 2017-05-29 09:28:07,847 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.port, 6123 2017-05-29 09:28:07,847 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.heap.mb, 1024 2017-05-29 09:28:07,848 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.heap.mb, 1024 2017-05-29 09:28:07,848 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.numberOfTaskSlots, 1 2017-05-29 09:28:07,848 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.memory.preallocate, false 2017-05-29 09:28:07,848 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: parallelism.default, 1 2017-05-29 09:28:07,848 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.web.port, 8081 {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrea Spina updated FLINK-6756: Description: I can't find any tracking info about the chance to have RichAsyncFunction in the Scala API suite. I think it'd be nice to have this function in order to access open/close methods and the RuntimeContext. I was able to retrieve http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 only, so my question is if there are some blocking issues avoiding this feature. [~till.rohrmann] If it's possible and nobody already have done it, I can assign the issue to myself in order to implement it. was: I can't find any tracking info about the chance to have a RichAsyncFunction in the Scala API suite. I think it'd be nice to have this function in order to access open/close methods and the RuntimeContext. I was able to retrieve http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 only, so my question is if there are some blocking issues avoiding this feature. [~till.rohrmann] If it's possible and nobody already have done it, I can assign the issue to myself in order to implement it. > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028170#comment-16028170 ] Till Rohrmann commented on FLINK-6756: -- I think there is nothing forbidding the implementation of a rich variant of the {{AsyncFunction}} for the Scala API. Thus, it would be great if you could tackle the issue [~spi-x-i]. > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6759) storm-examples cannot be built without cached dependencies
Chesnay Schepler created FLINK-6759: --- Summary: storm-examples cannot be built without cached dependencies Key: FLINK-6759 URL: https://issues.apache.org/jira/browse/FLINK-6759 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 1.4.0 Reporter: Chesnay Schepler The {{flink-storm-examples}} module fails to build if the {{flink-examples-batch}} dependency is not present in the local cache. {code} [ERROR] Failed to execute goal on project flink-storm-examples_2.10: Could not resolve dependencies for project org.apache.flink:flink-storm-examples_2.10:jar:1.4-SNAPSHOT: Failed to collect dependenc ies at org.apache.flink:flink-examples-batch_2.10:jar:1.4-SNAPSHOT: Failed to read artifact descriptor for org.apache.flink:flink-examples-batch_2.10:jar:1.4-SNAPSHOT: Failure to find org.apache.flink :flink-examples_${scala.binary.version}:pom:1.4-SNAPSHOT in https://repository.apache.org/snapshots was cached in the local repository, resolution will not be reattempted until the update interval of apache.snapshots has elapsed or updates are forced -> [Help 1] {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #4006: [FLINK-6478] [doc] Document how to upgrade state s...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118901207 --- Diff: docs/dev/stream/state.md --- @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how --- End diff -- This section is targeted as a guideline for users who require custom serialization for their state "using" doesn't really work in this context. You could say "the use of" instead, but it doesn't add anything. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6478) Add documentation on how to upgrade serializers for managed state
[ https://issues.apache.org/jira/browse/FLINK-6478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028185#comment-16028185 ] ASF GitHub Bot commented on FLINK-6478: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118904745 --- Diff: docs/dev/stream/state.md --- @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +{% highlight java %} +ListStateDescriptor> descriptor = +new ListStateDescriptor<>( +"state-name", +new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`. This --- End diff -- I don't understand the use of "confront" in this context. Perhaps you mean something like "determine the compatibility of" or "verify the compatibility of" ? > Add documentation on how to upgrade serializers for managed state > - > > Key: FLINK-6478 > URL: https://issues.apache.org/jira/browse/FLINK-6478 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > There needs to be a documentation that explains how to use the new serializer > upgrade APIs in {{TypeSerializer}}, and how the methods work with > checkpoints. This documentation should probably be placed under "Application > development --> Streaming --> Working with State". > Ideally, it should also come with a minimal example for users that perhaps > use serialization frameworks that already have built-in backwards > compatibility (such as Thrift). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6478) Add documentation on how to upgrade serializers for managed state
[ https://issues.apache.org/jira/browse/FLINK-6478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028182#comment-16028182 ] ASF GitHub Bot commented on FLINK-6478: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118905107 --- Diff: docs/dev/stream/state.md --- @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +{% highlight java %} +ListStateDescriptor> descriptor = +new ListStateDescriptor<>( +"state-name", +new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`. This +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure +the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + + Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base cla
[GitHub] flink pull request #4006: [FLINK-6478] [doc] Document how to upgrade state s...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118905107 --- Diff: docs/dev/stream/state.md --- @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +{% highlight java %} +ListStateDescriptor> descriptor = +new ListStateDescriptor<>( +"state-name", +new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`. This +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure +the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + + Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`. + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { + public abstract int getVersio
[GitHub] flink pull request #4006: [FLINK-6478] [doc] Document how to upgrade state s...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118900486 --- Diff: docs/dev/stream/state.md --- @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +{% highlight java %} +ListStateDescriptor> descriptor = +new ListStateDescriptor<>( +"state-name", +new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`. This +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure +the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + + Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`. + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { + public abstract int getVersio
[jira] [Commented] (FLINK-6478) Add documentation on how to upgrade serializers for managed state
[ https://issues.apache.org/jira/browse/FLINK-6478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028181#comment-16028181 ] ASF GitHub Bot commented on FLINK-6478: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118900486 --- Diff: docs/dev/stream/state.md --- @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +{% highlight java %} +ListStateDescriptor> descriptor = +new ListStateDescriptor<>( +"state-name", +new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`. This +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure +the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + + Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base cla
[GitHub] flink pull request #4006: [FLINK-6478] [doc] Document how to upgrade state s...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118903664 --- Diff: docs/dev/stream/state.md --- @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +{% highlight java %} +ListStateDescriptor> descriptor = +new ListStateDescriptor<>( +"state-name", +new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`. This +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure +the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + + Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`. + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { + public abstract int getVersio
[jira] [Commented] (FLINK-6478) Add documentation on how to upgrade serializers for managed state
[ https://issues.apache.org/jira/browse/FLINK-6478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028184#comment-16028184 ] ASF GitHub Bot commented on FLINK-6478: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118901207 --- Diff: docs/dev/stream/state.md --- @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how --- End diff -- This section is targeted as a guideline for users who require custom serialization for their state "using" doesn't really work in this context. You could say "the use of" instead, but it doesn't add anything. > Add documentation on how to upgrade serializers for managed state > - > > Key: FLINK-6478 > URL: https://issues.apache.org/jira/browse/FLINK-6478 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > There needs to be a documentation that explains how to use the new serializer > upgrade APIs in {{TypeSerializer}}, and how the methods work with > checkpoints. This documentation should probably be placed under "Application > development --> Streaming --> Working with State". > Ideally, it should also come with a minimal example for users that perhaps > use serialization frameworks that already have built-in backwards > compatibility (such as Thrift). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #4006: [FLINK-6478] [doc] Document how to upgrade state s...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118904745 --- Diff: docs/dev/stream/state.md --- @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +{% highlight java %} +ListStateDescriptor> descriptor = +new ListStateDescriptor<>( +"state-name", +new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`. This --- End diff -- I don't understand the use of "confront" in this context. Perhaps you mean something like "determine the compatibility of" or "verify the compatibility of" ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6478) Add documentation on how to upgrade serializers for managed state
[ https://issues.apache.org/jira/browse/FLINK-6478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028183#comment-16028183 ] ASF GitHub Bot commented on FLINK-6478: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118903664 --- Diff: docs/dev/stream/state.md --- @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +{% highlight java %} +ListStateDescriptor> descriptor = +new ListStateDescriptor<>( +"state-name", +new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`. This +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure +the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + + Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base cla
[GitHub] flink pull request #4006: [FLINK-6478] [doc] Document how to upgrade state s...
Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118906172 --- Diff: docs/dev/stream/state.md --- @@ -429,3 +429,140 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +{% highlight java %} +ListStateDescriptor> descriptor = +new ListStateDescriptor<>( +"state-name", +new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`. This +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure +the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + + Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`. + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { + public abstract int getVersio
[jira] [Commented] (FLINK-6478) Add documentation on how to upgrade serializers for managed state
[ https://issues.apache.org/jira/browse/FLINK-6478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028188#comment-16028188 ] ASF GitHub Bot commented on FLINK-6478: --- Github user alpinegizmo commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118906172 --- Diff: docs/dev/stream/state.md --- @@ -429,3 +429,140 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +{% highlight java %} +ListStateDescriptor> descriptor = +new ListStateDescriptor<>( +"state-name", +new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`. This +method serves as a check for whether or not the new serializer is compatible, as well as a hook to possibly reconfigure +the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + + Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base cla
[jira] [Commented] (FLINK-6478) Add documentation on how to upgrade serializers for managed state
[ https://issues.apache.org/jira/browse/FLINK-6478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028194#comment-16028194 ] ASF GitHub Bot commented on FLINK-6478: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118906667 --- Diff: docs/dev/stream/state.md --- @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +{% highlight java %} +ListStateDescriptor> descriptor = +new ListStateDescriptor<>( +"state-name", +new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`. This --- End diff -- It was just to explain that the checkpointed serializer config snapshot will be provided to the new serializer to verify its compatibility. I'll use "provide" instead. > Add documentation on how to upgrade serializers for managed state > - > > Key: FLINK-6478 > URL: https://issues.apache.org/jira/browse/FLINK-6478 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > There needs to be a documentation that explains how to use the new serializer > upgrade APIs in {{TypeSerializer}}, and how the methods work with > checkpoints. This documentation should probably be placed under "Application > development --> Streaming --> Working with State". > Ideally, it should also come with a minimal example for users that perhaps > use serialization frameworks that already have built-in backwards > compatibility (such as Thrift). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #4006: [FLINK-6478] [doc] Document how to upgrade state s...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4006#discussion_r118906667 --- Diff: docs/dev/stream/state.md --- @@ -429,3 +429,120 @@ public static class CounterSource Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. +## Custom serialization for managed state + +This section is targeted as a guideline for users who require using custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this section is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +{% highlight java %} +ListStateDescriptor> descriptor = +new ListStateDescriptor<>( +"state-name", +new TypeSerializer<> {...}); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be used to confront the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`. This --- End diff -- It was just to explain that the checkpointed serializer config snapshot will be provided to the new serializer to verify its compatibility. I'll use "provide" instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4006: [FLINK-6478] [doc] Document how to upgrade state serializ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4006 Thanks a lot for the helpful suggestions @alpinegizmo! I've addressed them. Also, an additional subsection "Managing `TypeSerializer` and `TypeSerializerConfigSnapshot` classes in user code" was added after your review. Really sorry for the race in updating that. Could you also have a look at that subsection? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6478) Add documentation on how to upgrade serializers for managed state
[ https://issues.apache.org/jira/browse/FLINK-6478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028199#comment-16028199 ] ASF GitHub Bot commented on FLINK-6478: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4006 Thanks a lot for the helpful suggestions @alpinegizmo! I've addressed them. Also, an additional subsection "Managing `TypeSerializer` and `TypeSerializerConfigSnapshot` classes in user code" was added after your review. Really sorry for the race in updating that. Could you also have a look at that subsection? > Add documentation on how to upgrade serializers for managed state > - > > Key: FLINK-6478 > URL: https://issues.apache.org/jira/browse/FLINK-6478 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > There needs to be a documentation that explains how to use the new serializer > upgrade APIs in {{TypeSerializer}}, and how the methods work with > checkpoints. This documentation should probably be placed under "Application > development --> Streaming --> Working with State". > Ideally, it should also come with a minimal example for users that perhaps > use serialization frameworks that already have built-in backwards > compatibility (such as Thrift). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6717) NullPointerException from MessageAcknowledgingSourceBase
[ https://issues.apache.org/jira/browse/FLINK-6717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-6717: Component/s: (was: Streaming) Streaming Connectors > NullPointerException from MessageAcknowledgingSourceBase > > > Key: FLINK-6717 > URL: https://issues.apache.org/jira/browse/FLINK-6717 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.2.1 > Environment: Ubuntu 12.04.4 > java version "1.8.0_111" >Reporter: Yonatan Most >Priority: Trivial > > It seems that if {{close}} is called before {{initializeState}}, then > {{idsForCurrentCheckpoint}} is not initialized. > {code} > java.lang.NullPointerException: null > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:170) > ~[flink-dist_2.10-1.2.0.jar:1.2.0] > at > org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:94) > ~[flink-dist_2.10-1.2.0.jar:1.2.0] > at > org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:179) > ~[blob_e8e6ccdf6cefe7e6370db4b1b2753baf9e977a24:na] > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > ~[flink-dist_2.10-1.2.0.jar:1.2.0] > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127) > ~[flink-dist_2.10-1.2.0.jar:1.2.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) > [flink-dist_2.10-1.2.0.jar:1.2.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) > [flink-dist_2.10-1.2.0.jar:1.2.0] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > [flink-dist_2.10-1.2.0.jar:1.2.0] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6719) Add details about fault-tolerance of timers to ProcessFunction docs
[ https://issues.apache.org/jira/browse/FLINK-6719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-6719: Component/s: DataStream API > Add details about fault-tolerance of timers to ProcessFunction docs > --- > > Key: FLINK-6719 > URL: https://issues.apache.org/jira/browse/FLINK-6719 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Documentation >Reporter: Tzu-Li (Gordon) Tai > > The fault-tolerance of timers is a frequently asked questions on the mailing > lists. We should add details about the topic in the {{ProcessFunction}} docs. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6726) Allow setting Timers in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-6726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-6726: Component/s: (was: Streaming) DataStream API > Allow setting Timers in ProcessWindowFunction > - > > Key: FLINK-6726 > URL: https://issues.apache.org/jira/browse/FLINK-6726 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Vishnu Viswanath >Assignee: Vishnu Viswanath >Priority: Minor > > Allow registration of timers in ProcessWindowFunction. > {code} > public abstract void registerEventTimeTimer(long time); > public abstract void registerProcessingTimeTimer(long time); > {code} > This is based on one of the use case that I have, where I need to register an > EventTimeTimer that will clean the elements in the Window State based on some > condition. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6759) storm-examples cannot be built without cached dependencies
[ https://issues.apache.org/jira/browse/FLINK-6759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028209#comment-16028209 ] Aljoscha Krettek commented on FLINK-6759: - I would actually be in favour of removing the Storm Compatibility layer. I have never seen anyone using it, it is additional code that we have to maintain and it makes the build run longer. > storm-examples cannot be built without cached dependencies > -- > > Key: FLINK-6759 > URL: https://issues.apache.org/jira/browse/FLINK-6759 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler > > The {{flink-storm-examples}} module fails to build if the > {{flink-examples-batch}} dependency is not present in the local cache. > {code} > [ERROR] Failed to execute goal on project flink-storm-examples_2.10: Could > not resolve dependencies for project > org.apache.flink:flink-storm-examples_2.10:jar:1.4-SNAPSHOT: > Failed to collect dependenc > ies at org.apache.flink:flink-examples-batch_2.10:jar:1.4-SNAPSHOT: Failed to > read artifact descriptor for > org.apache.flink:flink-examples-batch_2.10:jar:1.4-SNAPSHOT: > Failure to find org.apache.flink > :flink-examples_${scala.binary.version}:pom:1.4-SNAPSHOT in > https://repository.apache.org/snapshots was cached in the local repository, > resolution will not be reattempted until the update interval of > apache.snapshots has elapsed or updates are forced -> [Help 1] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6333) Utilize Bloomfilters in RocksDb
[ https://issues.apache.org/jira/browse/FLINK-6333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028212#comment-16028212 ] Stefan Richter commented on FLINK-6333: --- This issue still exists after some testing and stops us from upgrading: https://github.com/facebook/rocksdb/issues/1964 > Utilize Bloomfilters in RocksDb > --- > > Key: FLINK-6333 > URL: https://issues.apache.org/jira/browse/FLINK-6333 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Ted Yu > > Bloom Filters would speed up RocksDb lookups. > When we upgrade to RocksDb 5.2.1+, we would be able to do: > {code} > new BlockBasedTableConfig() > .setBlockCacheSize(blockCacheSize) > .setBlockSize(blockSize) > .setFilter(new BloomFilter()) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6760) Fix OverWindowTest alias test error
sunjincheng created FLINK-6760: -- Summary: Fix OverWindowTest alias test error Key: FLINK-6760 URL: https://issues.apache.org/jira/browse/FLINK-6760 Project: Flink Issue Type: Sub-task Components: Table API & SQL Affects Versions: 1.3.0 Reporter: sunjincheng Assignee: sunjincheng For Sql: {code} val sql = "SELECT c, count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND CURRENT ROW) as cnt1 CURRENT ROW from MyTable" {code} The alias `cnt1` The alias did not take effect when we generated the plan string. But we can using the alias in outer layer query, for example: {code} val sql = "SELECT cnt1 from (SELECT c, count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND CURRENT ROW) as cnt1 CURRENT ROW from MyTable)" {code} So in this JIRA. we just fix the test case for 1.3 release. In another JIRA. will improve the alias. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #4007: [FLINK-6760][table]Fix OverWindowTest alias test e...
GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/4007 [FLINK-6760][table]Fix OverWindowTest alias test error - [x] General - The pull request references the related JIRA issue ("[FLINK-6760][table]Fix OverWindowTest alias test error") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6760-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4007.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4007 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6760) Fix OverWindowTest alias test error
[ https://issues.apache.org/jira/browse/FLINK-6760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028243#comment-16028243 ] ASF GitHub Bot commented on FLINK-6760: --- GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/4007 [FLINK-6760][table]Fix OverWindowTest alias test error - [x] General - The pull request references the related JIRA issue ("[FLINK-6760][table]Fix OverWindowTest alias test error") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6760-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4007.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4007 > Fix OverWindowTest alias test error > --- > > Key: FLINK-6760 > URL: https://issues.apache.org/jira/browse/FLINK-6760 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > For Sql: > {code} > val sql = "SELECT c, count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 > preceding AND CURRENT ROW) as cnt1 CURRENT ROW from MyTable" > {code} > The alias `cnt1` The alias did not take effect when we generated the plan > string. But we can using the alias in outer layer query, for example: > {code} > val sql = "SELECT cnt1 from (SELECT c, count(a) OVER (ORDER BY proctime ROWS > BETWEEN 2 preceding AND CURRENT ROW) as cnt1 CURRENT ROW from MyTable)" > {code} > So in this JIRA. we just fix the test case for 1.3 release. In another JIRA. > will improve the alias. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6619) Check Table API & SQL support for 1.3.0 RC01 Release
[ https://issues.apache.org/jira/browse/FLINK-6619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6619: --- Description: In this JIRA. I will do the following tasks for Flink 1.3.0 RC01 Release. * Check that the JAVA and SCALA logical plans are consistent. * Check that the SQL and Table API logical plans are consistent. * Check that UDF, UDTF, and UDAF are working properly in group-windows and over-windows. * Check that all built-in Agg on Batch and Stream are working properly. * Let types such as Timestamp, BigDecimal or Pojo flow through UDF. UDTF, UDAF (input and output types) I'll created some sub-task if i find some bugs when I do the task above. was: In this JIRA. I will do the following tasks for Flink 1.3.0 RC01 Release. * Check that the JAVA and SCALA logical plans are consistent. * Check that the SQL and Table API logical plans are consistent. * Check that UDF, UDTF, and UDAF are working properly in group-windows and over-windows. * Check that all built-in Agg on Batch and Stream are working properly. I'll created some sub-task if i find some bugs when I do the task above. > Check Table API & SQL support for 1.3.0 RC01 Release > > > Key: FLINK-6619 > URL: https://issues.apache.org/jira/browse/FLINK-6619 > Project: Flink > Issue Type: Test > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > In this JIRA. I will do the following tasks for Flink 1.3.0 RC01 Release. > * Check that the JAVA and SCALA logical plans are consistent. > * Check that the SQL and Table API logical plans are consistent. > * Check that UDF, UDTF, and UDAF are working properly in group-windows and > over-windows. > * Check that all built-in Agg on Batch and Stream are working properly. > * Let types such as Timestamp, BigDecimal or Pojo flow through UDF. UDTF, > UDAF (input and output types) > I'll created some sub-task if i find some bugs when I do the task above. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6736) Fix UDTF codegen bug when window follow by join( UDTF)
[ https://issues.apache.org/jira/browse/FLINK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028282#comment-16028282 ] Timo Walther commented on FLINK-6736: - I think I found the bug that caused this exception. I will open a PR for it, it is just a one liner. > Fix UDTF codegen bug when window follow by join( UDTF) > -- > > Key: FLINK-6736 > URL: https://issues.apache.org/jira/browse/FLINK-6736 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > When we run the tableAPI as follows: > {code} > val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, > 'bigdec, 'date,'pojo, 'string) > val windowedTable = table > .join(udtf2('string) as ('a, 'b)) > .window(Slide over 5.milli every 2.milli on 'long as 'w) > .groupBy('w) > .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end) > {code} > We will get the error message: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:377) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.codehaus.commons.compiler.CompileException: Line 77, Column > 62: Unknown variable or type "in2" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11523) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6292) > at org.codehaus.janino.UnitCompiler.access$12900(UnitCompiler.java:209) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5904) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5901) > at org.codehaus.janino.Java$Package.accept(Java.java:4074) > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5901) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6287) > at org.codehaus.janino.UnitCompiler.access$13500(UnitCompiler.java:209) > {code} > The reason is {{val generator = new CodeGenerator(config, false, > inputSchema.physicalTypeInfo)}} `physicalTypeInfo` will remove the > TimeIndicator. > I think we should fix this. What do you think [~fhueske] [~twalthr] , And > hope your suggestions. :) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6761) Limitation for maximum state size per key in RocksDB backend
Stefan Richter created FLINK-6761: - Summary: Limitation for maximum state size per key in RocksDB backend Key: FLINK-6761 URL: https://issues.apache.org/jira/browse/FLINK-6761 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.2.1, 1.3.0 Reporter: Stefan Richter Priority: Critical RocksDB`s JNI bridge allows for putting and getting `byte[]` as keys and values. States that internally use RocksDB's merge operator, e.g. `ListState`, can currently merge multiple `byte[]` under one key, which will be internally concatenated to one value in RocksDB. This becomes problematic, as soon as the accumulated state size under one key grows larger than `Integer.MAX_VALUE` bytes. Whenever Java code tries to access a state that grew beyond this limit through merging, we will encounter an `ArrayIndexOutOfBoundsException` at best and a segfault at worst. This behaviour is problematic, because RocksDB silently stores states that exceed this limitation, but on access (e.g. in checkpointing), the code fails unexpectedly. I think the only proper solution to this is for RocksDB's JNI bridge to build on `(Direct)ByteBuffer` - which can go around the size limitation - as input and output types, instead of simple `byte[]`. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6736) Fix UDTF codegen bug when window follow by join( UDTF)
[ https://issues.apache.org/jira/browse/FLINK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028291#comment-16028291 ] sunjincheng commented on FLINK-6736: Sounds very good to me!! > Fix UDTF codegen bug when window follow by join( UDTF) > -- > > Key: FLINK-6736 > URL: https://issues.apache.org/jira/browse/FLINK-6736 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > When we run the tableAPI as follows: > {code} > val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, > 'bigdec, 'date,'pojo, 'string) > val windowedTable = table > .join(udtf2('string) as ('a, 'b)) > .window(Slide over 5.milli every 2.milli on 'long as 'w) > .groupBy('w) > .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end) > {code} > We will get the error message: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:377) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.codehaus.commons.compiler.CompileException: Line 77, Column > 62: Unknown variable or type "in2" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11523) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6292) > at org.codehaus.janino.UnitCompiler.access$12900(UnitCompiler.java:209) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5904) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5901) > at org.codehaus.janino.Java$Package.accept(Java.java:4074) > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5901) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6287) > at org.codehaus.janino.UnitCompiler.access$13500(UnitCompiler.java:209) > {code} > The reason is {{val generator = new CodeGenerator(config, false, > inputSchema.physicalTypeInfo)}} `physicalTypeInfo` will remove the > TimeIndicator. > I think we should fix this. What do you think [~fhueske] [~twalthr] , And > hope your suggestions. :) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6761) Limitation for maximum state size per key in RocksDB backend
[ https://issues.apache.org/jira/browse/FLINK-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-6761: -- Description: RocksDB`s JNI bridge allows for putting and getting {{byte[]}} as keys and values. States that internally use RocksDB's merge operator, e.g. {{ListState}}, can currently merge multiple {{byte[]}} under one key, which will be internally concatenated to one value in RocksDB. This becomes problematic, as soon as the accumulated state size under one key grows larger than {{Integer.MAX_VALUE}} bytes. Whenever Java code tries to access a state that grew beyond this limit through merging, we will encounter an {{ArrayIndexOutOfBoundsException}} at best and a segfault at worst. This behaviour is problematic, because RocksDB silently stores states that exceed this limitation, but on access (e.g. in checkpointing), the code fails unexpectedly. I think the only proper solution to this is for RocksDB's JNI bridge to build on {{(Direct)ByteBuffer}} - which can go around the size limitation - as input and output types, instead of simple {{byte[]}}. was: RocksDB`s JNI bridge allows for putting and getting `byte[]` as keys and values. States that internally use RocksDB's merge operator, e.g. `ListState`, can currently merge multiple `byte[]` under one key, which will be internally concatenated to one value in RocksDB. This becomes problematic, as soon as the accumulated state size under one key grows larger than `Integer.MAX_VALUE` bytes. Whenever Java code tries to access a state that grew beyond this limit through merging, we will encounter an `ArrayIndexOutOfBoundsException` at best and a segfault at worst. This behaviour is problematic, because RocksDB silently stores states that exceed this limitation, but on access (e.g. in checkpointing), the code fails unexpectedly. I think the only proper solution to this is for RocksDB's JNI bridge to build on `(Direct)ByteBuffer` - which can go around the size limitation - as input and output types, instead of simple `byte[]`. > Limitation for maximum state size per key in RocksDB backend > > > Key: FLINK-6761 > URL: https://issues.apache.org/jira/browse/FLINK-6761 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.0, 1.2.1 >Reporter: Stefan Richter >Priority: Critical > > RocksDB`s JNI bridge allows for putting and getting {{byte[]}} as keys and > values. > States that internally use RocksDB's merge operator, e.g. {{ListState}}, can > currently merge multiple {{byte[]}} under one key, which will be internally > concatenated to one value in RocksDB. > This becomes problematic, as soon as the accumulated state size under one key > grows larger than {{Integer.MAX_VALUE}} bytes. Whenever Java code tries to > access a state that grew beyond this limit through merging, we will encounter > an {{ArrayIndexOutOfBoundsException}} at best and a segfault at worst. > This behaviour is problematic, because RocksDB silently stores states that > exceed this limitation, but on access (e.g. in checkpointing), the code fails > unexpectedly. > I think the only proper solution to this is for RocksDB's JNI bridge to build > on {{(Direct)ByteBuffer}} - which can go around the size limitation - as > input and output types, instead of simple {{byte[]}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #4008: [FLINK-6736] [table] Fix UDTF field access with ti...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/4008 [FLINK-6736] [table] Fix UDTF field access with time attribute record A wrong physical index caused the code generation to fail. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-6736 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4008.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4008 commit 0ba039f2555c916088fe71743002e395989701ea Author: twalthr Date: 2017-05-29T12:48:14Z [FLINK-6736] [table] Fix UDTF field access with time attribute record --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6736) Fix UDTF codegen bug when window follow by join( UDTF)
[ https://issues.apache.org/jira/browse/FLINK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028294#comment-16028294 ] ASF GitHub Bot commented on FLINK-6736: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/4008 [FLINK-6736] [table] Fix UDTF field access with time attribute record A wrong physical index caused the code generation to fail. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-6736 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4008.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4008 commit 0ba039f2555c916088fe71743002e395989701ea Author: twalthr Date: 2017-05-29T12:48:14Z [FLINK-6736] [table] Fix UDTF field access with time attribute record > Fix UDTF codegen bug when window follow by join( UDTF) > -- > > Key: FLINK-6736 > URL: https://issues.apache.org/jira/browse/FLINK-6736 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > When we run the tableAPI as follows: > {code} > val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, > 'bigdec, 'date,'pojo, 'string) > val windowedTable = table > .join(udtf2('string) as ('a, 'b)) > .window(Slide over 5.milli every 2.milli on 'long as 'w) > .groupBy('w) > .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end) > {code} > We will get the error message: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:377) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.codehaus.commons.compiler.CompileException: Line 77, Column > 62: Unknown variable or type "in2" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11523) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6292) > at org.codehaus.janino.UnitCompiler.access$12900(UnitCompiler.java:209) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5904) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5901) > at org.codehaus.janino.Java$Package.accept(Java.java:4074) > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5901) > at org.codehaus.janino.UnitCompiler.getType
[GitHub] flink issue #3984: [FLINK-6710] Remove Twitter-InputFormat
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3984 Maybe we should discuss this on the ML. I know at least one user that used it for fast prototyping a Flink demo application. Maybe it adds not much value but allows for quickly showing a nice streaming demo. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6710) Remove twitter-inputformat
[ https://issues.apache.org/jira/browse/FLINK-6710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028297#comment-16028297 ] ASF GitHub Bot commented on FLINK-6710: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3984 Maybe we should discuss this on the ML. I know at least one user that used it for fast prototyping a Flink demo application. Maybe it adds not much value but allows for quickly showing a nice streaming demo. > Remove twitter-inputformat > -- > > Key: FLINK-6710 > URL: https://issues.apache.org/jira/browse/FLINK-6710 > Project: Flink > Issue Type: Improvement > Components: Batch Connectors and Input/Output Formats >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.4.0 > > > I propose removing the twitter-inputformat under flink-contrib. > It provides no interesting properties in terms of accessing tweets (since it > just reads them from a file) in contrast to the streaming {{TwitterSource}}, > nor provides any significant functionality that cannot be achieved using the > jackson databind API. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6737) Fix over expression parse String error.
[ https://issues.apache.org/jira/browse/FLINK-6737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028306#comment-16028306 ] Timo Walther commented on FLINK-6737: - I'm also against this change. We should keep Java and Scala API strictly separated. A string in Scala API should stay a String and not a String-Expression. > Fix over expression parse String error. > --- > > Key: FLINK-6737 > URL: https://issues.apache.org/jira/browse/FLINK-6737 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > When we run the TableAPI as follows: > {code} > val windowedTable = table > .window(Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW > as 'w) > .select('c, "countFun(b)" over 'w as 'mycount, weightAvgFun('a, 'b) > over 'w as 'wAvg) > {code} > We get the error: > {code} > org.apache.flink.table.api.TableException: The over method can only using > with aggregation expression. > at > org.apache.flink.table.api.scala.ImplicitExpressionOperations$class.over(expressionDsl.scala:469) > at > org.apache.flink.table.api.scala.ImplicitExpressionConversions$LiteralStringExpression.over(expressionDsl.scala:756) > {code} > The reason is, the `over` method of `expressionDsl` not parse the String case. > I think we should fix this before 1.3 release. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6736) Fix UDTF codegen bug when window follow by join( UDTF)
[ https://issues.apache.org/jira/browse/FLINK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028310#comment-16028310 ] ASF GitHub Bot commented on FLINK-6736: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4008 @sunjincheng121 does it fix your issue? > Fix UDTF codegen bug when window follow by join( UDTF) > -- > > Key: FLINK-6736 > URL: https://issues.apache.org/jira/browse/FLINK-6736 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > When we run the tableAPI as follows: > {code} > val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, > 'bigdec, 'date,'pojo, 'string) > val windowedTable = table > .join(udtf2('string) as ('a, 'b)) > .window(Slide over 5.milli every 2.milli on 'long as 'w) > .groupBy('w) > .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end) > {code} > We will get the error message: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:377) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.codehaus.commons.compiler.CompileException: Line 77, Column > 62: Unknown variable or type "in2" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11523) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6292) > at org.codehaus.janino.UnitCompiler.access$12900(UnitCompiler.java:209) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5904) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5901) > at org.codehaus.janino.Java$Package.accept(Java.java:4074) > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5901) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6287) > at org.codehaus.janino.UnitCompiler.access$13500(UnitCompiler.java:209) > {code} > The reason is {{val generator = new CodeGenerator(config, false, > inputSchema.physicalTypeInfo)}} `physicalTypeInfo` will remove the > TimeIndicator. > I think we should fix this. What do you think [~fhueske] [~twalthr] , And > hope your suggestions. :) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #4008: [FLINK-6736] [table] Fix UDTF field access with time attr...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4008 @sunjincheng121 does it fix your issue? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-6760) Fix OverWindowTest alias test error
[ https://issues.apache.org/jira/browse/FLINK-6760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng reassigned FLINK-6760: -- Assignee: Timo Walther (was: sunjincheng) > Fix OverWindowTest alias test error > --- > > Key: FLINK-6760 > URL: https://issues.apache.org/jira/browse/FLINK-6760 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: Timo Walther > > For Sql: > {code} > val sql = "SELECT c, count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 > preceding AND CURRENT ROW) as cnt1 CURRENT ROW from MyTable" > {code} > The alias `cnt1` The alias did not take effect when we generated the plan > string. But we can using the alias in outer layer query, for example: > {code} > val sql = "SELECT cnt1 from (SELECT c, count(a) OVER (ORDER BY proctime ROWS > BETWEEN 2 preceding AND CURRENT ROW) as cnt1 CURRENT ROW from MyTable)" > {code} > So in this JIRA. we just fix the test case for 1.3 release. In another JIRA. > will improve the alias. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6760) Fix OverWindowTest alias test error
[ https://issues.apache.org/jira/browse/FLINK-6760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng reassigned FLINK-6760: -- Assignee: sunjincheng (was: Timo Walther) > Fix OverWindowTest alias test error > --- > > Key: FLINK-6760 > URL: https://issues.apache.org/jira/browse/FLINK-6760 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > For Sql: > {code} > val sql = "SELECT c, count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 > preceding AND CURRENT ROW) as cnt1 CURRENT ROW from MyTable" > {code} > The alias `cnt1` The alias did not take effect when we generated the plan > string. But we can using the alias in outer layer query, for example: > {code} > val sql = "SELECT cnt1 from (SELECT c, count(a) OVER (ORDER BY proctime ROWS > BETWEEN 2 preceding AND CURRENT ROW) as cnt1 CURRENT ROW from MyTable)" > {code} > So in this JIRA. we just fix the test case for 1.3 release. In another JIRA. > will improve the alias. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6736) Fix UDTF codegen bug when window follow by join( UDTF)
[ https://issues.apache.org/jira/browse/FLINK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng reassigned FLINK-6736: -- Assignee: Timo Walther (was: sunjincheng) > Fix UDTF codegen bug when window follow by join( UDTF) > -- > > Key: FLINK-6736 > URL: https://issues.apache.org/jira/browse/FLINK-6736 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: Timo Walther > > When we run the tableAPI as follows: > {code} > val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, > 'bigdec, 'date,'pojo, 'string) > val windowedTable = table > .join(udtf2('string) as ('a, 'b)) > .window(Slide over 5.milli every 2.milli on 'long as 'w) > .groupBy('w) > .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end) > {code} > We will get the error message: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:377) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.codehaus.commons.compiler.CompileException: Line 77, Column > 62: Unknown variable or type "in2" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11523) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6292) > at org.codehaus.janino.UnitCompiler.access$12900(UnitCompiler.java:209) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5904) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5901) > at org.codehaus.janino.Java$Package.accept(Java.java:4074) > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5901) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6287) > at org.codehaus.janino.UnitCompiler.access$13500(UnitCompiler.java:209) > {code} > The reason is {{val generator = new CodeGenerator(config, false, > inputSchema.physicalTypeInfo)}} `physicalTypeInfo` will remove the > TimeIndicator. > I think we should fix this. What do you think [~fhueske] [~twalthr] , And > hope your suggestions. :) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6689) Remote StreamExecutionEnvironment fails to submit jobs against LocalFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-6689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber closed FLINK-6689. -- Resolution: Won't Fix Fix Version/s: (was: 1.3.0) {{org.apache.flink.streaming.util.TestStreamEnvironment}} works fine for connections to {{org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster}}. It's only strange that the former ended up in {{flink-test-utils}} while the latter is in {{flink-runtime}}, but I guess {{LocalFlinkMiniCluster}} could/should not be moved to {{flink-test-utils}} due to legacy reasons. > Remote StreamExecutionEnvironment fails to submit jobs against > LocalFlinkMiniCluster > > > Key: FLINK-6689 > URL: https://issues.apache.org/jira/browse/FLINK-6689 > Project: Flink > Issue Type: Bug > Components: Client, Job-Submission >Affects Versions: 1.3.0 >Reporter: Nico Kruber > > The following Flink programs fails to execute with the current 1.3 branch > (1.2 works) because the leader session ID being used is wrong: > {code:java} > final String jobManagerAddress = "localhost"; > final int jobManagerPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT; > final Configuration config = new Configuration(); > config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, > jobManagerAddress); > config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, > jobManagerPort); > config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); > final LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, > false); > cluster.start(true); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.createRemoteEnvironment(jobManagerAddress, > jobManagerPort); > env.fromElements(1l).addSink(new DiscardingSink()); > // fails due to leader session id being wrong: > env.execute("test"); > {code} > Output from logs contais: > {code} > ... > 16:24:57,551 INFO org.apache.flink.runtime.webmonitor.JobManagerRetriever >- New leader reachable under > akka.tcp://flink@localhost:6123/user/jobmanager:ff0d56cf-6205-4dd4-a266-03847f4d6944. > 16:24:57,894 INFO > org.apache.flink.streaming.api.environment.RemoteStreamEnvironment - Running > remotely at localhost:6123 > 16:24:58,121 INFO org.apache.flink.client.program.StandaloneClusterClient >- Starting client actor system. > 16:24:58,123 INFO org.apache.flink.runtime.util.LeaderRetrievalUtils >- Trying to select the network interface and address to use by connecting > to the leading JobManager. > 16:24:58,128 INFO org.apache.flink.runtime.util.LeaderRetrievalUtils >- TaskManager will try to connect for 1 milliseconds before falling > back to heuristics > 16:24:58,132 INFO org.apache.flink.runtime.net.ConnectionUtils >- Retrieved new target address localhost/127.0.0.1:6123. > 16:24:58,258 INFO akka.event.slf4j.Slf4jLogger >- Slf4jLogger started > 16:24:58,262 INFO Remoting >- Starting remoting > 16:24:58,375 INFO Remoting >- Remoting started; listening on addresses > :[akka.tcp://fl...@nico-work.fritz.box:43413] > 16:24:58,376 INFO org.apache.flink.client.program.StandaloneClusterClient >- Submitting job with JobID: 9bef4793a4b7f4caaad96bd28211cbb9. Waiting for > job completion. > Submitting job with JobID: 9bef4793a4b7f4caaad96bd28211cbb9. Waiting for job > completion. > 16:24:58,382 INFO org.apache.flink.runtime.client.JobSubmissionClientActor >- Disconnect from JobManager null. > 16:24:58,398 INFO org.apache.flink.runtime.client.JobSubmissionClientActor >- Received SubmitJobAndWait(JobGraph(jobId: > 9bef4793a4b7f4caaad96bd28211cbb9)) but there is no connection to a JobManager > yet. > 16:24:58,398 INFO org.apache.flink.runtime.client.JobSubmissionClientActor >- Received job test (9bef4793a4b7f4caaad96bd28211cbb9). > 16:24:58,429 INFO org.apache.flink.runtime.client.JobSubmissionClientActor >- Connect to JobManager > Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998]. > 16:24:58,432 INFO org.apache.flink.runtime.client.JobSubmissionClientActor >- Connected to JobManager at > Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998] with leader > session id ----. > Connected to JobManager at > Actor[akka.tcp://flink@localhost:6123/user/jobmanager#1881889998] with leader > session id ----. > 16:24:58,432 INFO org.apache.flink.runtime.client.JobSubmissionClientActor >- Sending message to JobManager > akka.tcp://flink@localhost:6123/user/jobmanager to submit
[GitHub] flink issue #3984: [FLINK-6710] Remove Twitter-InputFormat
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3984 Then let's replace it with a generic JsonInputFormat that supports even more use-cases without 1900 lines of bloat. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4007: [FLINK-6760][table]Fix OverWindowTest alias test error
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4007 Thanks @sunjincheng121, I will merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6710) Remove twitter-inputformat
[ https://issues.apache.org/jira/browse/FLINK-6710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028319#comment-16028319 ] ASF GitHub Bot commented on FLINK-6710: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3984 Then let's replace it with a generic JsonInputFormat that supports even more use-cases without 1900 lines of bloat. > Remove twitter-inputformat > -- > > Key: FLINK-6710 > URL: https://issues.apache.org/jira/browse/FLINK-6710 > Project: Flink > Issue Type: Improvement > Components: Batch Connectors and Input/Output Formats >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.4.0 > > > I propose removing the twitter-inputformat under flink-contrib. > It provides no interesting properties in terms of accessing tweets (since it > just reads them from a file) in contrast to the streaming {{TwitterSource}}, > nor provides any significant functionality that cannot be achieved using the > jackson databind API. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6760) Fix OverWindowTest alias test error
[ https://issues.apache.org/jira/browse/FLINK-6760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028320#comment-16028320 ] ASF GitHub Bot commented on FLINK-6760: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4007 Thanks @sunjincheng121, I will merge this. > Fix OverWindowTest alias test error > --- > > Key: FLINK-6760 > URL: https://issues.apache.org/jira/browse/FLINK-6760 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > For Sql: > {code} > val sql = "SELECT c, count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 > preceding AND CURRENT ROW) as cnt1 CURRENT ROW from MyTable" > {code} > The alias `cnt1` The alias did not take effect when we generated the plan > string. But we can using the alias in outer layer query, for example: > {code} > val sql = "SELECT cnt1 from (SELECT c, count(a) OVER (ORDER BY proctime ROWS > BETWEEN 2 preceding AND CURRENT ROW) as cnt1 CURRENT ROW from MyTable)" > {code} > So in this JIRA. we just fix the test case for 1.3 release. In another JIRA. > will improve the alias. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #4009: [FLINK-6732] Activate strict-checkstyle for flink-...
GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/4009 [FLINK-6732] Activate strict-checkstyle for flink-java Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink flink-java-checkstyle Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4009.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4009 commit 55e59f49d6487e0c90fa5896cb3090129ad7bd91 Author: Dawid Wysakowicz Date: 2017-05-29T13:15:54Z [FLINK-6732] Removed trailing whitespaces commit b4ec5f0642281e4916efc781729a124f743253cf Author: Dawid Wysakowicz Date: 2017-05-26T13:04:05Z [FLINK-6734] Exclude org.apache.flink.api.java.tuple from checkstyle AvoidStarImport commit 3a849de2d0291a4ba90c6466b1c898bde94e12d0 Author: Dawid Wysakowicz Date: 2017-05-26T13:09:42Z [FLINK-6732] Reorganized imports commit 2eaae31c7fccfc052303c032d8a98a31319966dc Author: Dawid Wysakowicz Date: 2017-05-28T08:50:37Z [FLINK-6732] Fixed empty lines issues commit 0e136e81f1c9036993ab5159fee3fc0addf7291b Author: Dawid Wysakowicz Date: 2017-05-28T10:08:28Z [FLINK-6732] Fixed existing javadocs commit 202aa35aea2501ec75d21a8762c04880dae63f5f Author: Dawid Wysakowicz Date: 2017-05-29T13:23:22Z [FLINK-6732] Added missing javadocs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6732) Activate strict checkstyle for flink-java
[ https://issues.apache.org/jira/browse/FLINK-6732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028327#comment-16028327 ] ASF GitHub Bot commented on FLINK-6732: --- GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/4009 [FLINK-6732] Activate strict-checkstyle for flink-java Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink flink-java-checkstyle Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4009.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4009 commit 55e59f49d6487e0c90fa5896cb3090129ad7bd91 Author: Dawid Wysakowicz Date: 2017-05-29T13:15:54Z [FLINK-6732] Removed trailing whitespaces commit b4ec5f0642281e4916efc781729a124f743253cf Author: Dawid Wysakowicz Date: 2017-05-26T13:04:05Z [FLINK-6734] Exclude org.apache.flink.api.java.tuple from checkstyle AvoidStarImport commit 3a849de2d0291a4ba90c6466b1c898bde94e12d0 Author: Dawid Wysakowicz Date: 2017-05-26T13:09:42Z [FLINK-6732] Reorganized imports commit 2eaae31c7fccfc052303c032d8a98a31319966dc Author: Dawid Wysakowicz Date: 2017-05-28T08:50:37Z [FLINK-6732] Fixed empty lines issues commit 0e136e81f1c9036993ab5159fee3fc0addf7291b Author: Dawid Wysakowicz Date: 2017-05-28T10:08:28Z [FLINK-6732] Fixed existing javadocs commit 202aa35aea2501ec75d21a8762c04880dae63f5f Author: Dawid Wysakowicz Date: 2017-05-29T13:23:22Z [FLINK-6732] Added missing javadocs > Activate strict checkstyle for flink-java > - > > Key: FLINK-6732 > URL: https://issues.apache.org/jira/browse/FLINK-6732 > Project: Flink > Issue Type: Sub-task > Components: Java API >Reporter: Chesnay Schepler >Assignee: Dawid Wysakowicz > > Long term issue for incrementally introducing the strict checkstyle to > flink-java. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6747) Table API / SQL Docs: Streaming Page
[ https://issues.apache.org/jira/browse/FLINK-6747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-6747: --- Assignee: Timo Walther > Table API / SQL Docs: Streaming Page > > > Key: FLINK-6747 > URL: https://issues.apache.org/jira/browse/FLINK-6747 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > > Update and refine {{./docs/dev/table/streaming.md}} in feature branch > https://github.com/apache/flink/tree/tableDocs -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #4009: [FLINK-6732] Activate strict-checkstyle for flink-java
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4009 I've created set of changes that remove ~3k checkstyle violations in flink-java. I had doubts about changing some `public static final classes` into `private static final classes` e.g in `MinAggregationFunction.java`. Is it safe to do so? Can I check it does not interfere with any part of the system. Or should I just revert it and add appropriate javadoc? @zentol What is your opinion? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4008: [FLINK-6736] [table] Fix UDTF field access with ti...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4008#discussion_r118935465 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala --- @@ -391,8 +391,8 @@ object RelTimeIndicatorConverterTest { class TableFunc extends TableFunction[String] { val t = new Timestamp(0L) -def eval(time1: Long, time2: Timestamp): Unit = { - collect(time1.toString + time2.after(t)) +def eval(time1: Long, time2: Timestamp, string: String): Unit = { --- End diff -- Does this change cover changes in functionality? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page
[ https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-6750: --- Assignee: Timo Walther > Table API / SQL Docs: Table Sources & Sinks Page > > > Key: FLINK-6750 > URL: https://issues.apache.org/jira/browse/FLINK-6750 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > > Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch > https://github.com/apache/flink/tree/tableDocs -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5354) Split up Table API documentation into multiple pages
[ https://issues.apache.org/jira/browse/FLINK-5354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028332#comment-16028332 ] Timo Walther commented on FLINK-5354: - [~fhueske] I will work on the streaming page. > Split up Table API documentation into multiple pages > - > > Key: FLINK-5354 > URL: https://issues.apache.org/jira/browse/FLINK-5354 > Project: Flink > Issue Type: Improvement > Components: Documentation, Table API & SQL >Reporter: Timo Walther > > The Table API documentation page is quite large at the moment. We should > split it up into multiple pages: > Here is my suggestion: > - Overview (Datatypes, Config, Registering Tables, Examples) > - TableSources and Sinks > - Table API > - SQL > - Functions -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6736) Fix UDTF codegen bug when window follow by join( UDTF)
[ https://issues.apache.org/jira/browse/FLINK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028333#comment-16028333 ] ASF GitHub Bot commented on FLINK-6736: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4008#discussion_r118935465 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala --- @@ -391,8 +391,8 @@ object RelTimeIndicatorConverterTest { class TableFunc extends TableFunction[String] { val t = new Timestamp(0L) -def eval(time1: Long, time2: Timestamp): Unit = { - collect(time1.toString + time2.after(t)) +def eval(time1: Long, time2: Timestamp, string: String): Unit = { --- End diff -- Does this change cover changes in functionality? > Fix UDTF codegen bug when window follow by join( UDTF) > -- > > Key: FLINK-6736 > URL: https://issues.apache.org/jira/browse/FLINK-6736 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: Timo Walther > > When we run the tableAPI as follows: > {code} > val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, > 'bigdec, 'date,'pojo, 'string) > val windowedTable = table > .join(udtf2('string) as ('a, 'b)) > .window(Slide over 5.milli every 2.milli on 'long as 'w) > .groupBy('w) > .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end) > {code} > We will get the error message: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:377) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.codehaus.commons.compiler.CompileException: Line 77, Column > 62: Unknown variable or type "in2" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11523) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6292) > at org.codehaus.janino.UnitCompiler.access$12900(UnitCompiler.java:209) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5904) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5901) > at org.codehaus.janino.Java$Package.accept(Java.java:4074) > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5901) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6287) > at org.codehaus.janino.UnitCompiler.access$13500(UnitCompiler.java:209) > {code} > T
[jira] [Commented] (FLINK-6732) Activate strict checkstyle for flink-java
[ https://issues.apache.org/jira/browse/FLINK-6732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028330#comment-16028330 ] ASF GitHub Bot commented on FLINK-6732: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/4009 I've created set of changes that remove ~3k checkstyle violations in flink-java. I had doubts about changing some `public static final classes` into `private static final classes` e.g in `MinAggregationFunction.java`. Is it safe to do so? Can I check it does not interfere with any part of the system. Or should I just revert it and add appropriate javadoc? @zentol What is your opinion? > Activate strict checkstyle for flink-java > - > > Key: FLINK-6732 > URL: https://issues.apache.org/jira/browse/FLINK-6732 > Project: Flink > Issue Type: Sub-task > Components: Java API >Reporter: Chesnay Schepler >Assignee: Dawid Wysakowicz > > Long term issue for incrementally introducing the strict checkstyle to > flink-java. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5354) Split up Table API documentation into multiple pages
[ https://issues.apache.org/jira/browse/FLINK-5354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028332#comment-16028332 ] Timo Walther edited comment on FLINK-5354 at 5/29/17 1:30 PM: -- [~fhueske] I will work on the streaming page and sources & sinks. was (Author: twalthr): [~fhueske] I will work on the streaming page. > Split up Table API documentation into multiple pages > - > > Key: FLINK-5354 > URL: https://issues.apache.org/jira/browse/FLINK-5354 > Project: Flink > Issue Type: Improvement > Components: Documentation, Table API & SQL >Reporter: Timo Walther > > The Table API documentation page is quite large at the moment. We should > split it up into multiple pages: > Here is my suggestion: > - Overview (Datatypes, Config, Registering Tables, Examples) > - TableSources and Sinks > - Table API > - SQL > - Functions -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrea Spina reassigned FLINK-6756: --- Assignee: Andrea Spina > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #4009: [FLINK-6732] Activate strict-checkstyle for flink-java
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4009 If a function is tagged as `@Internal` or in a class tagged as such then we should be able to restrict the visibility. Would be good if you could reply in the JIRA for the tuple checkstyle exclusion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6732) Activate strict checkstyle for flink-java
[ https://issues.apache.org/jira/browse/FLINK-6732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028339#comment-16028339 ] ASF GitHub Bot commented on FLINK-6732: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4009 If a function is tagged as `@Internal` or in a class tagged as such then we should be able to restrict the visibility. Would be good if you could reply in the JIRA for the tuple checkstyle exclusion. > Activate strict checkstyle for flink-java > - > > Key: FLINK-6732 > URL: https://issues.apache.org/jira/browse/FLINK-6732 > Project: Flink > Issue Type: Sub-task > Components: Java API >Reporter: Chesnay Schepler >Assignee: Dawid Wysakowicz > > Long term issue for incrementally introducing the strict checkstyle to > flink-java. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6736) Fix UDTF codegen bug when window follow by join( UDTF)
[ https://issues.apache.org/jira/browse/FLINK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028342#comment-16028342 ] ASF GitHub Bot commented on FLINK-6736: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4008#discussion_r118938233 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala --- @@ -391,8 +391,8 @@ object RelTimeIndicatorConverterTest { class TableFunc extends TableFunction[String] { val t = new Timestamp(0L) -def eval(time1: Long, time2: Timestamp): Unit = { - collect(time1.toString + time2.after(t)) +def eval(time1: Long, time2: Timestamp, string: String): Unit = { --- End diff -- Yes, it is used in `TimeAttributeITCase`. > Fix UDTF codegen bug when window follow by join( UDTF) > -- > > Key: FLINK-6736 > URL: https://issues.apache.org/jira/browse/FLINK-6736 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: Timo Walther > > When we run the tableAPI as follows: > {code} > val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, > 'bigdec, 'date,'pojo, 'string) > val windowedTable = table > .join(udtf2('string) as ('a, 'b)) > .window(Slide over 5.milli every 2.milli on 'long as 'w) > .groupBy('w) > .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end) > {code} > We will get the error message: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:377) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.codehaus.commons.compiler.CompileException: Line 77, Column > 62: Unknown variable or type "in2" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11523) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6292) > at org.codehaus.janino.UnitCompiler.access$12900(UnitCompiler.java:209) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5904) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5901) > at org.codehaus.janino.Java$Package.accept(Java.java:4074) > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5901) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6287) > at org.codehaus.janino.UnitCompiler.access$13500(UnitCompiler.java:209) > {code} > The reason is
[GitHub] flink pull request #4008: [FLINK-6736] [table] Fix UDTF field access with ti...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4008#discussion_r118938233 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala --- @@ -391,8 +391,8 @@ object RelTimeIndicatorConverterTest { class TableFunc extends TableFunction[String] { val t = new Timestamp(0L) -def eval(time1: Long, time2: Timestamp): Unit = { - collect(time1.toString + time2.after(t)) +def eval(time1: Long, time2: Timestamp, string: String): Unit = { --- End diff -- Yes, it is used in `TimeAttributeITCase`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6736) Fix UDTF codegen bug when window follow by join( UDTF)
[ https://issues.apache.org/jira/browse/FLINK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028343#comment-16028343 ] ASF GitHub Bot commented on FLINK-6736: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4008 @sunjincheng121 I will add another ITCase just to be sure. > Fix UDTF codegen bug when window follow by join( UDTF) > -- > > Key: FLINK-6736 > URL: https://issues.apache.org/jira/browse/FLINK-6736 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: Timo Walther > > When we run the tableAPI as follows: > {code} > val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, > 'bigdec, 'date,'pojo, 'string) > val windowedTable = table > .join(udtf2('string) as ('a, 'b)) > .window(Slide over 5.milli every 2.milli on 'long as 'w) > .groupBy('w) > .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end) > {code} > We will get the error message: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35) > at > org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:377) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.codehaus.commons.compiler.CompileException: Line 77, Column > 62: Unknown variable or type "in2" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11523) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6292) > at org.codehaus.janino.UnitCompiler.access$12900(UnitCompiler.java:209) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5904) > at > org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5901) > at org.codehaus.janino.Java$Package.accept(Java.java:4074) > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5901) > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6287) > at org.codehaus.janino.UnitCompiler.access$13500(UnitCompiler.java:209) > {code} > The reason is {{val generator = new CodeGenerator(config, false, > inputSchema.physicalTypeInfo)}} `physicalTypeInfo` will remove the > TimeIndicator. > I think we should fix this. What do you think [~fhueske] [~twalthr] , And > hope your suggestions. :) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #4008: [FLINK-6736] [table] Fix UDTF field access with time attr...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4008 @sunjincheng121 I will add another ITCase just to be sure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6737) Fix over expression parse String error.
[ https://issues.apache.org/jira/browse/FLINK-6737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028344#comment-16028344 ] sunjincheng commented on FLINK-6737: [~twalthr][~fhueske] Agree with you. I have updated the PR. And this PR only `Fix string reference variable error.` Please look at the PR. Thanks, SunJincheng > Fix over expression parse String error. > --- > > Key: FLINK-6737 > URL: https://issues.apache.org/jira/browse/FLINK-6737 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > > When we run the TableAPI as follows: > {code} > val windowedTable = table > .window(Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW > as 'w) > .select('c, "countFun(b)" over 'w as 'mycount, weightAvgFun('a, 'b) > over 'w as 'wAvg) > {code} > We get the error: > {code} > org.apache.flink.table.api.TableException: The over method can only using > with aggregation expression. > at > org.apache.flink.table.api.scala.ImplicitExpressionOperations$class.over(expressionDsl.scala:469) > at > org.apache.flink.table.api.scala.ImplicitExpressionConversions$LiteralStringExpression.over(expressionDsl.scala:756) > {code} > The reason is, the `over` method of `expressionDsl` not parse the String case. > I think we should fix this before 1.3 release. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6760) Fix OverWindowTest alias test error
[ https://issues.apache.org/jira/browse/FLINK-6760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028346#comment-16028346 ] ASF GitHub Bot commented on FLINK-6760: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4007 > Fix OverWindowTest alias test error > --- > > Key: FLINK-6760 > URL: https://issues.apache.org/jira/browse/FLINK-6760 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.4.0 > > > For Sql: > {code} > val sql = "SELECT c, count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 > preceding AND CURRENT ROW) as cnt1 CURRENT ROW from MyTable" > {code} > The alias `cnt1` The alias did not take effect when we generated the plan > string. But we can using the alias in outer layer query, for example: > {code} > val sql = "SELECT cnt1 from (SELECT c, count(a) OVER (ORDER BY proctime ROWS > BETWEEN 2 preceding AND CURRENT ROW) as cnt1 CURRENT ROW from MyTable)" > {code} > So in this JIRA. we just fix the test case for 1.3 release. In another JIRA. > will improve the alias. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #4007: [FLINK-6760][table]Fix OverWindowTest alias test e...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4007 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-6760) Fix OverWindowTest alias test error
[ https://issues.apache.org/jira/browse/FLINK-6760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-6760. - Resolution: Fixed Fix Version/s: 1.4.0 Fixed in 1.4.0: 6b69c588df866c7b1694a58a433f7957bee456c6 > Fix OverWindowTest alias test error > --- > > Key: FLINK-6760 > URL: https://issues.apache.org/jira/browse/FLINK-6760 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.4.0 > > > For Sql: > {code} > val sql = "SELECT c, count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 > preceding AND CURRENT ROW) as cnt1 CURRENT ROW from MyTable" > {code} > The alias `cnt1` The alias did not take effect when we generated the plan > string. But we can using the alias in outer layer query, for example: > {code} > val sql = "SELECT cnt1 from (SELECT c, count(a) OVER (ORDER BY proctime ROWS > BETWEEN 2 preceding AND CURRENT ROW) as cnt1 CURRENT ROW from MyTable)" > {code} > So in this JIRA. we just fix the test case for 1.3 release. In another JIRA. > will improve the alias. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6734) Exclude org.apache.flink.api.java.tuple from checkstyle AvoidStarImport
[ https://issues.apache.org/jira/browse/FLINK-6734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028348#comment-16028348 ] Dawid Wysakowicz commented on FLINK-6734: - 6 in flink-java After a quick search in other modules I did not find any other classes. > Exclude org.apache.flink.api.java.tuple from checkstyle AvoidStarImport > --- > > Key: FLINK-6734 > URL: https://issues.apache.org/jira/browse/FLINK-6734 > Project: Flink > Issue Type: Sub-task > Components: Build System >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz > -- This message was sent by Atlassian JIRA (v6.3.15#6346)