[jira] [Created] (FLINK-7678) SQL UserDefineTableFunction does not take CompositeType input correctly
Rong Rong created FLINK-7678: Summary: SQL UserDefineTableFunction does not take CompositeType input correctly Key: FLINK-7678 URL: https://issues.apache.org/jira/browse/FLINK-7678 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.3.2 Reporter: Rong Rong UDF is using FlinkTypeFactory to infer operand type while UDTF does not go through the same code path. This result in: {code:console} org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 38 to line 1, column 44: No match found for function signature func() Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 38 to line 1, column 44: No match found for function signature func() {code} Please see github code for more info: https://github.com/walterddr/flink/blob/bug_report/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/UDTFCompositeTypeTestFailure.scala -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7373) Using URLClassLoader to load UDF triggers HepPlanner unexpected ClassNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-7373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16206774#comment-16206774 ] Rong Rong commented on FLINK-7373: -- Hi [~fhueske], no we haven't found a great solution so we decided to work around it by not allowing any extra jar which is not already included in the UDF classpath. > Using URLClassLoader to load UDF triggers HepPlanner unexpected > ClassNotFoundException > -- > > Key: FLINK-7373 > URL: https://issues.apache.org/jira/browse/FLINK-7373 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Rong Rong >Assignee: Rong Rong > > Using URLClassLoader to load, say from Artifactory, and instantiate UDF > instances will cause some Rule failed during runHepPlanner or > runVolcanoPlanner. > One example could add an ITCase in: > {code:title=flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala} > @Test > def testUserDefinedFunctionDynamicClassloader() { > val env = ExecutionEnvironment.getExecutionEnvironment > val tableEnv = TableEnvironment.getTableEnvironment(env, config) > val jarFileURI = "file://org/apache/flink/table/udf/HelloWorld.jar" > val udfClassLoader: ClassLoader = new URLClassLoader(List(new > URI(jarFileURI).toURL).toArray) > val clazz = > udfClassLoader.loadClass("org.apache.flink.table.udf.HelloWorld") > val helloWorldUDF: ScalarFunction = > clazz.newInstance().asInstanceOf[ScalarFunction] > tableEnv.registerFunction("helloWorld", helloWorldUDF) > val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text) > val result = table.select("text.helloWorld()") > val results = result.toDataSet[Row].collect() > val expected = "Hello World" > TestBaseUtils.compareResultAsText(results.asJava, expected) > } > {code} > where > {code:title=HelloWorld.java} > package org.apache.flink.table.udf; > import org.apache.flink.table.functions.ScalarFunction; > public class HelloWorld extends ScalarFunction { > public String eval() { > return "Hello World"; > } > } > {code} > This triggers the following Exception: > {panel:title=Exception} > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.FlatMapRunner.compile(FlatMapRunner.scala:31) > at > org.apache.flink.table.runtime.FlatMapRunner.open(FlatMapRunner.scala:45) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.api.common.operators.base.FlatMapOperatorBase.executeOnCollections(FlatMapOperatorBase.java:62) > .. > Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 22: > Cannot determine simple type name "org" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) > at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) > .. > {panel} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-10556) Integration with Apache Hive
[ https://issues.apache.org/jira/browse/FLINK-10556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-10556: -- Description: This is an umbrella JIRA tracking all enhancement and issues related to integrating Flink with Hive ecosystem. This is an outcome of a discussion in the community, and thanks go to everyone that provided feedback and interest. Specifically, we'd like to see the following features and capabilities immediately in Flink: # Metadata interoperability # Data interoperability # Data type compatibility # Hive UDF support # DDL/DML/Query language compatibility For a longer term, we'd also like to add or improve: # Compatible SQL service, client tools, JDBC/ODBC drivers # Better task failure tolerance and task scheduling # Support other user customizations in Hive (storage handlers, serdes, etc). I will provide more details regarding the proposal in a doc shortly. Design doc, if deemed necessary, will be provided in each related sub tasks under this JIRA. Feedback and contributions are greatly welcome! was: This is an umbrella JIRA tracking all enhancement and issues related to integrating Flink with Hive ecosystem. This is an outcome of a discussion in the community, and thanks go to everyone that provided feedback and interest. Specifically, we'd like to see the following features and capabilities immediately in Flink: # Detadata interoperability # Data interoperability # Data type compatibility # Hive UDF support # DDL/DML/Query language compatibility For a longer term, we'd also like to add or improve: # Compatible SQL service, client tools, JDBC/ODBC drivers # Better task failure tolerance and task scheduling # Support other user customizations in Hive (storage handlers, serdes, etc). I will provide more details regarding the proposal in a doc shortly. Design doc, if deemed necessary, will be provided in each related sub tasks under this JIRA. Feedback and contributions are greatly welcome! > Integration with Apache Hive > > > Key: FLINK-10556 > URL: https://issues.apache.org/jira/browse/FLINK-10556 > Project: Flink > Issue Type: New Feature > Components: Batch Connectors and Input/Output Formats, SQL Client, > Table API & SQL >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > > This is an umbrella JIRA tracking all enhancement and issues related to > integrating Flink with Hive ecosystem. This is an outcome of a discussion in > the community, and thanks go to everyone that provided feedback and interest. > Specifically, we'd like to see the following features and capabilities > immediately in Flink: > # Metadata interoperability > # Data interoperability > # Data type compatibility > # Hive UDF support > # DDL/DML/Query language compatibility > For a longer term, we'd also like to add or improve: > # Compatible SQL service, client tools, JDBC/ODBC drivers > # Better task failure tolerance and task scheduling > # Support other user customizations in Hive (storage handlers, serdes, etc). > I will provide more details regarding the proposal in a doc shortly. Design > doc, if deemed necessary, will be provided in each related sub tasks under > this JIRA. > Feedback and contributions are greatly welcome! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11006) Update Calcite dependency to 1.18
Rong Rong created FLINK-11006: - Summary: Update Calcite dependency to 1.18 Key: FLINK-11006 URL: https://issues.apache.org/jira/browse/FLINK-11006 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Rong Rong Umbrella task to track all dependencies and tasks needs to be done for upgrading to Calcite 1.18 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8739) Optimize runtime support for distinct filter
[ https://issues.apache.org/jira/browse/FLINK-8739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704949#comment-16704949 ] Rong Rong commented on FLINK-8739: -- No I haven't started. please feel free to assign to yourself. > Optimize runtime support for distinct filter > > > Key: FLINK-8739 > URL: https://issues.apache.org/jira/browse/FLINK-8739 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Rong Rong >Priority: Major > > Possible optimizaitons: > 1. Decouple distinct map and actual accumulator so that they can separately > be created in codegen. > 2. Reuse same distinct accumulator for filtering, e.g. `SELECT > COUNT(DISTINCT(a)), SUM(DISTINCT(a))` should reuse the same distinct map. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11010) Flink SQL timestamp is inconsistent with currentProcessingTime()
[ https://issues.apache.org/jira/browse/FLINK-11010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16706398#comment-16706398 ] Rong Rong commented on FLINK-11010: --- Based on the discussion in the mailing list: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Question-about-Timestamp-in-Flink-SQL-td16928.html, I think Flink's internal time service using GMT as default would require a more sophisticated solution. Currently I think the best solution is to use DATE_FORMAT or other SQL timezone-based operation to convert proctime. and always treat proctime as GMT on queries like: {{select proctime from tbl}}. I've also linked https://issues.apache.org/jira/browse/FLINK-8353 for the reference. > Flink SQL timestamp is inconsistent with currentProcessingTime() > > > Key: FLINK-11010 > URL: https://issues.apache.org/jira/browse/FLINK-11010 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.2 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > > Flink SQL timestamp is inconsistent with currentProcessingTime(). > > the ProcessingTime is just implemented by invoking System.currentTimeMillis() > but the long value will be automatically wrapped to a Timestamp with the > following statement: > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode
Rong Rong created FLINK-11088: - Summary: Improve Kerberos Authentication using Keytab in YARN proxy user mode Key: FLINK-11088 URL: https://issues.apache.org/jira/browse/FLINK-11088 Project: Flink Issue Type: Improvement Components: YARN Reporter: Rong Rong Currently flink-yarn assumes keytab is shipped as application master environment local resource on client side and will be distributed to all the TMs. This does not work for YARN proxy user mode since proxy user or super user does not have access to actual user's keytab but only delegation tokens. We propose to have the keytab file path discovery configurable depending on the launch mode of the YARN client. Reference: https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode
[ https://issues.apache.org/jira/browse/FLINK-11088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-11088: -- Component/s: Security > Improve Kerberos Authentication using Keytab in YARN proxy user mode > > > Key: FLINK-11088 > URL: https://issues.apache.org/jira/browse/FLINK-11088 > Project: Flink > Issue Type: Improvement > Components: Security, YARN >Reporter: Rong Rong >Priority: Major > > Currently flink-yarn assumes keytab is shipped as application master > environment local resource on client side and will be distributed to all the > TMs. This does not work for YARN proxy user mode since proxy user or super > user does not have access to actual user's keytab but only delegation tokens. > We propose to have the keytab file path discovery configurable depending on > the launch mode of the YARN client. > Reference: > https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode
[ https://issues.apache.org/jira/browse/FLINK-11088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-11088: - Assignee: Rong Rong > Improve Kerberos Authentication using Keytab in YARN proxy user mode > > > Key: FLINK-11088 > URL: https://issues.apache.org/jira/browse/FLINK-11088 > Project: Flink > Issue Type: Improvement > Components: Security, YARN >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently flink-yarn assumes keytab is shipped as application master > environment local resource on client side and will be distributed to all the > TMs. This does not work for YARN proxy user mode since proxy user or super > user does not have access to actual user's keytab but only delegation tokens. > We propose to have the keytab file path discovery configurable depending on > the launch mode of the YARN client. > Reference: > https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode
[ https://issues.apache.org/jira/browse/FLINK-11088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714128#comment-16714128 ] Rong Rong commented on FLINK-11088: --- Initial investigation needs to find a way to distinguish between the two types of authentication method: Keytab and Delegation token. However, since delegation tokens normally expires within a week, see: https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-kerberos.html#using-kinit-yarn-only, there should've been a configurable API to specify the way to pass over the Kerberos keytab to YARN application master. The proposal consists of several combination of scenarios: 1. Delegation token only - Cluster is short living. No keytab file 2. Delegation token on launch - Cluster can be long living if keytab file was supplied, or keytab acquisition method is defined. 3. Keytab on launch - Cluster is long living, Keytab is passed as YARN local resource (current method) Please comment if you think there's any other ways of authenticating Flink app. > Improve Kerberos Authentication using Keytab in YARN proxy user mode > > > Key: FLINK-11088 > URL: https://issues.apache.org/jira/browse/FLINK-11088 > Project: Flink > Issue Type: Improvement > Components: Security, YARN >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently flink-yarn assumes keytab is shipped as application master > environment local resource on client side and will be distributed to all the > TMs. This does not work for YARN proxy user mode since proxy user or super > user does not have access to actual user's keytab but only delegation tokens. > We propose to have the keytab file path discovery configurable depending on > the launch mode of the YARN client. > Reference: > https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11081) Support binding port range for REST server
[ https://issues.apache.org/jira/browse/FLINK-11081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716173#comment-16716173 ] Rong Rong commented on FLINK-11081: --- +1 to this feature. Very helpful actually for lots of our use cases. just quick higher level questions, 1) what is the binding strategy of the port range to the actual port number? (smallest? random?); and 2) should we introduce a range limit to maximum range size? > Support binding port range for REST server > -- > > Key: FLINK-11081 > URL: https://issues.apache.org/jira/browse/FLINK-11081 > Project: Flink > Issue Type: Improvement > Components: REST >Affects Versions: 1.7.0, 1.8.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Currently the {{RestServerEndpoint}} binds to the port specified by > {{RestOptions#PORT}}. {{PORT}} is of type integer. Sometimes, it would be > useful to being able to specify not only a single port but a port range to > pick a port from. Therefore, I propose to add similar to > {{RestOptions#BIND_ADDRESS}} another option {{RestOptions#BIND_PORT}} which > allows to specify a port range for the {{RestServerEndpoint}} to pick a port > from. {{RestOptions#PORT}} would then only be used by the client to connect > to the started {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode
[ https://issues.apache.org/jira/browse/FLINK-11088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-11088: -- Description: Currently flink-yarn assumes keytab is shipped as application master environment local resource on client side and will be distributed to all the TMs. This does not work for YARN proxy user mode since proxy user or super user does not have access to actual user's keytab but only delegation tokens. We propose to have the keytab file path discovery configurable depending on the launch mode of the YARN client. Reference: [1] https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html [2] https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services was: Currently flink-yarn assumes keytab is shipped as application master environment local resource on client side and will be distributed to all the TMs. This does not work for YARN proxy user mode since proxy user or super user does not have access to actual user's keytab but only delegation tokens. We propose to have the keytab file path discovery configurable depending on the launch mode of the YARN client. Reference: https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html > Improve Kerberos Authentication using Keytab in YARN proxy user mode > > > Key: FLINK-11088 > URL: https://issues.apache.org/jira/browse/FLINK-11088 > Project: Flink > Issue Type: Improvement > Components: Security, YARN >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently flink-yarn assumes keytab is shipped as application master > environment local resource on client side and will be distributed to all the > TMs. This does not work for YARN proxy user mode since proxy user or super > user does not have access to actual user's keytab but only delegation tokens. > We propose to have the keytab file path discovery configurable depending on > the launch mode of the YARN client. > Reference: > [1] > https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html > [2] > https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10474) Don't translate IN to JOIN with VALUES for streaming queries
[ https://issues.apache.org/jira/browse/FLINK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16634867#comment-16634867 ] Rong Rong commented on FLINK-10474: --- Voting for the first approach. I was actually wondering. Is there benefit from translating the IN to JOIN operator (even on batch side) if the operand after IN is statically defined. > Don't translate IN to JOIN with VALUES for streaming queries > > > Key: FLINK-10474 > URL: https://issues.apache.org/jira/browse/FLINK-10474 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.6.1, 1.7.0 >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Major > > IN clauses are translated to JOIN with VALUES if the number of elements in > the IN clause exceeds a certain threshold. This should not be done, because a > streaming join is very heavy and materializes both inputs (which is fine for > the VALUES) input but not for the other. > There are two ways to solve this: > # don't translate IN to a JOIN at all > # translate it to a JOIN but have a special join strategy if one input is > bound and final (non-updating) > Option 1. should be easy to do, option 2. requires much more effort. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10474) Don't translate IN to JOIN with VALUES for streaming queries
[ https://issues.apache.org/jira/browse/FLINK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635666#comment-16635666 ] Rong Rong commented on FLINK-10474: --- Yes. I think having cascaded predicates are definitely not the best idea. However I was wondering if there's anything better we can do when we already knew the right operand for the IN operation is static list of LITERALs, maybe we can replace it will a more efficient scalar function? > Don't translate IN to JOIN with VALUES for streaming queries > > > Key: FLINK-10474 > URL: https://issues.apache.org/jira/browse/FLINK-10474 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.6.1, 1.7.0 >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Major > > IN clauses are translated to JOIN with VALUES if the number of elements in > the IN clause exceeds a certain threshold. This should not be done, because a > streaming join is very heavy and materializes both inputs (which is fine for > the VALUES) input but not for the other. > There are two ways to solve this: > # don't translate IN to a JOIN at all > # translate it to a JOIN but have a special join strategy if one input is > bound and final (non-updating) > Option 1. should be easy to do, option 2. requires much more effort. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8880) Validate configurations for SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16638159#comment-16638159 ] Rong Rong commented on FLINK-8880: -- [~twalthr] yes aboslutely. please go ahead. > Validate configurations for SQL Client > -- > > Key: FLINK-8880 > URL: https://issues.apache.org/jira/browse/FLINK-8880 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Priority: Major > > Currently, the configuration items for SQL client are stored in maps and > accessed with default values. They should be validated when creating the > client. Also, logger warnings should be shown when using default values. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8880) Validate configurations for SQL Client
[ https://issues.apache.org/jira/browse/FLINK-8880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-8880: Assignee: (was: Rong Rong) > Validate configurations for SQL Client > -- > > Key: FLINK-8880 > URL: https://issues.apache.org/jira/browse/FLINK-8880 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Xingcan Cui >Priority: Major > > Currently, the configuration items for SQL client are stored in maps and > accessed with default values. They should be validated when creating the > client. Also, logger warnings should be shown when using default values. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-7357) HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY HOP window
Rong Rong created FLINK-7357: Summary: HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY HOP window Key: FLINK-7357 URL: https://issues.apache.org/jira/browse/FLINK-7357 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.3.1 Reporter: Rong Rong The following SQL does not compile: {code:title=invalid_having_hop_start_sql} SELECT c AS k, COUNT(a) AS v, HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowStart, HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd FROM T1 GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), c HAVING SUM(b) > 1 {code} While individually keeping HAVING clause or HOP_START field compiles and runs without issue. more details: https://github.com/apache/flink/compare/master...walterddr:having_does_not_work_with_hop_start_end -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7373) Using URLClassLoader to load UDF triggers HepPlanner unexpected ClassNotFoundException
Rong Rong created FLINK-7373: Summary: Using URLClassLoader to load UDF triggers HepPlanner unexpected ClassNotFoundException Key: FLINK-7373 URL: https://issues.apache.org/jira/browse/FLINK-7373 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.3.1 Reporter: Rong Rong Using URLClassLoader to load, say from Artifactory, and instantiate UDF instances will cause some Rule failed during runHepPlanner or runVolcanoPlanner. One example could add an ITCase in: {code:title=flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala} @Test def testUserDefinedFunctionDynamicClassloader() { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env, config) val jarFileURI = "file://org/apache/flink/table/udf/HelloWorld.jar" val udfClassLoader: ClassLoader = new URLClassLoader(List(new URI(jarFileURI).toURL).toArray) val clazz = udfClassLoader.loadClass("org.apache.flink.table.udf.HelloWorld") val helloWorldUDF: ScalarFunction = clazz.newInstance().asInstanceOf[ScalarFunction] tableEnv.registerFunction("helloWorld", helloWorldUDF) val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text) val result = table.select("text.helloWorld()") val results = result.toDataSet[Row].collect() val expected = "Hello World!" TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} where {code:title=HelloWorld.java} package org.apache.flink.table.udf; import org.apache.flink.table.functions.ScalarFunction; public class HelloWorld extends ScalarFunction { public String eval(String o) { if (o == null) { return "Hello World"; } else { return "Hellow World " + o.toString(); } } } {code} This triggers the following Exception: {panel:title=Exception} org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.FlatMapRunner.compile(FlatMapRunner.scala:31) at org.apache.flink.table.runtime.FlatMapRunner.open(FlatMapRunner.scala:45) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.api.common.operators.base.FlatMapOperatorBase.executeOnCollections(FlatMapOperatorBase.java:62) .. Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 22: Cannot determine simple type name "org" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) .. {panel} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7373) Using URLClassLoader to load UDF triggers HepPlanner unexpected ClassNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-7373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-7373: - Description: Using URLClassLoader to load, say from Artifactory, and instantiate UDF instances will cause some Rule failed during runHepPlanner or runVolcanoPlanner. One example could add an ITCase in: {code:title=flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala} @Test def testUserDefinedFunctionDynamicClassloader() { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env, config) val jarFileURI = "file://org/apache/flink/table/udf/HelloWorld.jar" val udfClassLoader: ClassLoader = new URLClassLoader(List(new URI(jarFileURI).toURL).toArray) val clazz = udfClassLoader.loadClass("org.apache.flink.table.udf.HelloWorld") val helloWorldUDF: ScalarFunction = clazz.newInstance().asInstanceOf[ScalarFunction] tableEnv.registerFunction("helloWorld", helloWorldUDF) val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text) val result = table.select("text.helloWorld()") val results = result.toDataSet[Row].collect() val expected = "Hello World!" TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} where {code:title=HelloWorld.java} package org.apache.flink.table.udf; import org.apache.flink.table.functions.ScalarFunction; public class HelloWorld extends ScalarFunction { public String eval() { return "Hello World"; } } {code} This triggers the following Exception: {panel:title=Exception} org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.FlatMapRunner.compile(FlatMapRunner.scala:31) at org.apache.flink.table.runtime.FlatMapRunner.open(FlatMapRunner.scala:45) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.api.common.operators.base.FlatMapOperatorBase.executeOnCollections(FlatMapOperatorBase.java:62) .. Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 22: Cannot determine simple type name "org" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) .. {panel} was: Using URLClassLoader to load, say from Artifactory, and instantiate UDF instances will cause some Rule failed during runHepPlanner or runVolcanoPlanner. One example could add an ITCase in: {code:title=flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala} @Test def testUserDefinedFunctionDynamicClassloader() { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env, config) val jarFileURI = "file://org/apache/flink/table/udf/HelloWorld.jar" val udfClassLoader: ClassLoader = new URLClassLoader(List(new URI(jarFileURI).toURL).toArray) val clazz = udfClassLoader.loadClass("org.apache.flink.table.udf.HelloWorld") val helloWorldUDF: ScalarFunction = clazz.newInstance().asInstanceOf[ScalarFunction] tableEnv.registerFunction("helloWorld", helloWorldUDF) val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text) val result = table.select("text.helloWorld()") val results = result.toDataSet[Row].collect() val expected = "Hello World!" TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} where {code:title=HelloWorld.java} package org.apache.flink.table.udf; import org.apache.flink.table.functions.ScalarFunction; public class HelloWorld extends ScalarFunction { public String eval(String o) { if (o == null) { return "Hello World"; } else { return "Hellow World " + o.toString(); } } } {code} This triggers the following Exception: {panel:title=Exception} org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.FlatMapRunner.compile(FlatMapRunner.scala:31) at org.apache.flink.table.runtime.FlatMapRunner.open(FlatMapRunner.scala:45) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.api.common.operators.base.FlatMapOperatorBase.executeOnCollections(FlatMapOperatorBase.java:62) .. Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 22:
[jira] [Updated] (FLINK-7373) Using URLClassLoader to load UDF triggers HepPlanner unexpected ClassNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-7373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-7373: - Description: Using URLClassLoader to load, say from Artifactory, and instantiate UDF instances will cause some Rule failed during runHepPlanner or runVolcanoPlanner. One example could add an ITCase in: {code:title=flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala} @Test def testUserDefinedFunctionDynamicClassloader() { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env, config) val jarFileURI = "file://org/apache/flink/table/udf/HelloWorld.jar" val udfClassLoader: ClassLoader = new URLClassLoader(List(new URI(jarFileURI).toURL).toArray) val clazz = udfClassLoader.loadClass("org.apache.flink.table.udf.HelloWorld") val helloWorldUDF: ScalarFunction = clazz.newInstance().asInstanceOf[ScalarFunction] tableEnv.registerFunction("helloWorld", helloWorldUDF) val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text) val result = table.select("text.helloWorld()") val results = result.toDataSet[Row].collect() val expected = "Hello World" TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} where {code:title=HelloWorld.java} package org.apache.flink.table.udf; import org.apache.flink.table.functions.ScalarFunction; public class HelloWorld extends ScalarFunction { public String eval() { return "Hello World"; } } {code} This triggers the following Exception: {panel:title=Exception} org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.FlatMapRunner.compile(FlatMapRunner.scala:31) at org.apache.flink.table.runtime.FlatMapRunner.open(FlatMapRunner.scala:45) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.api.common.operators.base.FlatMapOperatorBase.executeOnCollections(FlatMapOperatorBase.java:62) .. Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 22: Cannot determine simple type name "org" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) .. {panel} was: Using URLClassLoader to load, say from Artifactory, and instantiate UDF instances will cause some Rule failed during runHepPlanner or runVolcanoPlanner. One example could add an ITCase in: {code:title=flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala} @Test def testUserDefinedFunctionDynamicClassloader() { val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env, config) val jarFileURI = "file://org/apache/flink/table/udf/HelloWorld.jar" val udfClassLoader: ClassLoader = new URLClassLoader(List(new URI(jarFileURI).toURL).toArray) val clazz = udfClassLoader.loadClass("org.apache.flink.table.udf.HelloWorld") val helloWorldUDF: ScalarFunction = clazz.newInstance().asInstanceOf[ScalarFunction] tableEnv.registerFunction("helloWorld", helloWorldUDF) val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text) val result = table.select("text.helloWorld()") val results = result.toDataSet[Row].collect() val expected = "Hello World!" TestBaseUtils.compareResultAsText(results.asJava, expected) } {code} where {code:title=HelloWorld.java} package org.apache.flink.table.udf; import org.apache.flink.table.functions.ScalarFunction; public class HelloWorld extends ScalarFunction { public String eval() { return "Hello World"; } } {code} This triggers the following Exception: {panel:title=Exception} org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.FlatMapRunner.compile(FlatMapRunner.scala:31) at org.apache.flink.table.runtime.FlatMapRunner.open(FlatMapRunner.scala:45) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.api.common.operators.base.FlatMapOperatorBase.executeOnCollections(FlatMapOperatorBase.java:62) .. Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 22: Cannot determine simple type name "org" at org.codehaus.janino.UnitCompiler.compileEr
[jira] [Assigned] (FLINK-7373) Using URLClassLoader to load UDF triggers HepPlanner unexpected ClassNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-7373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-7373: Assignee: Rong Rong > Using URLClassLoader to load UDF triggers HepPlanner unexpected > ClassNotFoundException > -- > > Key: FLINK-7373 > URL: https://issues.apache.org/jira/browse/FLINK-7373 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Rong Rong >Assignee: Rong Rong > > Using URLClassLoader to load, say from Artifactory, and instantiate UDF > instances will cause some Rule failed during runHepPlanner or > runVolcanoPlanner. > One example could add an ITCase in: > {code:title=flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala} > @Test > def testUserDefinedFunctionDynamicClassloader() { > val env = ExecutionEnvironment.getExecutionEnvironment > val tableEnv = TableEnvironment.getTableEnvironment(env, config) > val jarFileURI = "file://org/apache/flink/table/udf/HelloWorld.jar" > val udfClassLoader: ClassLoader = new URLClassLoader(List(new > URI(jarFileURI).toURL).toArray) > val clazz = > udfClassLoader.loadClass("org.apache.flink.table.udf.HelloWorld") > val helloWorldUDF: ScalarFunction = > clazz.newInstance().asInstanceOf[ScalarFunction] > tableEnv.registerFunction("helloWorld", helloWorldUDF) > val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text) > val result = table.select("text.helloWorld()") > val results = result.toDataSet[Row].collect() > val expected = "Hello World" > TestBaseUtils.compareResultAsText(results.asJava, expected) > } > {code} > where > {code:title=HelloWorld.java} > package org.apache.flink.table.udf; > import org.apache.flink.table.functions.ScalarFunction; > public class HelloWorld extends ScalarFunction { > public String eval() { > return "Hello World"; > } > } > {code} > This triggers the following Exception: > {panel:title=Exception} > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.FlatMapRunner.compile(FlatMapRunner.scala:31) > at > org.apache.flink.table.runtime.FlatMapRunner.open(FlatMapRunner.scala:45) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.api.common.operators.base.FlatMapOperatorBase.executeOnCollections(FlatMapOperatorBase.java:62) > .. > Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column 22: > Cannot determine simple type name "org" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) > at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) > .. > {panel} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7357) HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY HOP window
[ https://issues.apache.org/jira/browse/FLINK-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120483#comment-16120483 ] Rong Rong commented on FLINK-7357: -- Seems like the logical plan generated from Having inserts another `LogicalFilter` in between the outer project and inner project. Thus the `WindowStartEndPropertiesRule` did not get fired. Working on a fix now, [~wheat9] can I assign to myself? > HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY > HOP window > - > > Key: FLINK-7357 > URL: https://issues.apache.org/jira/browse/FLINK-7357 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Rong Rong >Assignee: Haohui Mai > > The following SQL does not compile: > {code:title=invalid_having_hop_start_sql} > SELECT > c AS k, > COUNT(a) AS v, > HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS > windowStart, > HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd > FROM > T1 > GROUP BY > HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), > c > HAVING > SUM(b) > 1 > {code} > While individually keeping HAVING clause or HOP_START field compiles and runs > without issue. > more details: > https://github.com/apache/flink/compare/master...walterddr:having_does_not_work_with_hop_start_end -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7357) HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY HOP window
[ https://issues.apache.org/jira/browse/FLINK-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-7357: Assignee: Rong Rong (was: Haohui Mai) > HOP_START() HOP_END() does not work when using HAVING clause with GROUP BY > HOP window > - > > Key: FLINK-7357 > URL: https://issues.apache.org/jira/browse/FLINK-7357 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Rong Rong >Assignee: Rong Rong > > The following SQL does not compile: > {code:title=invalid_having_hop_start_sql} > SELECT > c AS k, > COUNT(a) AS v, > HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS > windowStart, > HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd > FROM > T1 > GROUP BY > HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), > c > HAVING > SUM(b) > 1 > {code} > While individually keeping HAVING clause or HOP_START field compiles and runs > without issue. > more details: > https://github.com/apache/flink/compare/master...walterddr:having_does_not_work_with_hop_start_end -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-12399) FilterableTableSource does not use filters on job run
[ https://issues.apache.org/jira/browse/FLINK-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16921546#comment-16921546 ] Rong Rong commented on FLINK-12399: --- Hi [~fhueske]. would you please kindly take a look at the approach to address this issue? The problem has been created some problems for us and also some multiple threads in the mailing list. It would be nice to address this before the next release. Much appreciated. > FilterableTableSource does not use filters on job run > - > > Key: FLINK-12399 > URL: https://issues.apache.org/jira/browse/FLINK-12399 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.8.0 >Reporter: Josh Bradt >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Attachments: flink-filter-bug.tar.gz > > Time Spent: 0.5h > Remaining Estimate: 0h > > As discussed [on the mailing > list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html], > there appears to be a bug where a job that uses a custom > FilterableTableSource does not keep the filters that were pushed down into > the table source. More specifically, the table source does receive filters > via applyPredicates, and a new table source with those filters is returned, > but the final job graph appears to use the original table source, which does > not contain any filters. > I attached a minimal example program to this ticket. The custom table source > is as follows: > {code:java} > public class CustomTableSource implements BatchTableSource, > FilterableTableSource { > private static final Logger LOG = > LoggerFactory.getLogger(CustomTableSource.class); > private final Filter[] filters; > private final FilterConverter converter = new FilterConverter(); > public CustomTableSource() { > this(null); > } > private CustomTableSource(Filter[] filters) { > this.filters = filters; > } > @Override > public DataSet getDataSet(ExecutionEnvironment execEnv) { > if (filters == null) { >LOG.info(" No filters defined "); > } else { > LOG.info(" Found filters "); > for (Filter filter : filters) { > LOG.info("FILTER: {}", filter); > } > } > return execEnv.fromCollection(allModels()); > } > @Override > public TableSource applyPredicate(List predicates) { > LOG.info("Applying predicates"); > List acceptedFilters = new ArrayList<>(); > for (final Expression predicate : predicates) { > converter.convert(predicate).ifPresent(acceptedFilters::add); > } > return new CustomTableSource(acceptedFilters.toArray(new Filter[0])); > } > @Override > public boolean isFilterPushedDown() { > return filters != null; > } > @Override > public TypeInformation getReturnType() { > return TypeInformation.of(Model.class); > } > @Override > public TableSchema getTableSchema() { > return TableSchema.fromTypeInfo(getReturnType()); > } > private List allModels() { > List models = new ArrayList<>(); > models.add(new Model(1, 2, 3, 4)); > models.add(new Model(10, 11, 12, 13)); > models.add(new Model(20, 21, 22, 23)); > return models; > } > } > {code} > > When run, it logs > {noformat} > 15:24:54,888 INFO com.klaviyo.filterbug.CustomTableSource >- Applying predicates > 15:24:54,901 INFO com.klaviyo.filterbug.CustomTableSource >- Applying predicates > 15:24:54,910 INFO com.klaviyo.filterbug.CustomTableSource >- Applying predicates > 15:24:54,977 INFO com.klaviyo.filterbug.CustomTableSource >- No filters defined {noformat} > which appears to indicate that although filters are getting pushed down, the > final job does not use them. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Comment Edited] (FLINK-12399) FilterableTableSource does not use filters on job run
[ https://issues.apache.org/jira/browse/FLINK-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833107#comment-16833107 ] Rong Rong edited comment on FLINK-12399 at 9/4/19 4:28 PM: --- Hi [~josh.bradt]. I think I found the root cause of this issue. Apparently you have to override the method {{explainSource}} in order to let calcite know that the new created TableSource with filter pushedDown is different from the original created CustomeTableSource (where you have not applyPredicates). I think this might be related to the #4 changelog point https://github.com/apache/flink/pull/8324 when I try upgrading to CALCITE 1.19.0 I also encounter some weird issues where calcite tries to find the correct tablesource from the digest strings. I will assigned to myself and start looking into this issue. Please let me know if adding the override resolves your issue at this moment. was (Author: walterddr): Hi [~josh.bradt]. I think I found the root cause of this issue. Apparently you have to override the method {{explainSource}} in order to let calcite know that the new created TableSource with filter pushedDown is different from the original created CustomeTableSource (where you have not applyPredicates). I think this might be related to the #4 changelog point https://github.com/apache/flink/pull/8324: when I try upgrading to CALCITE 1.19.0 I also encounter some weird issues where calcite tries to find the correct tablesource from the digest strings. I will assigned to myself and start looking into this issue. Please let me know if adding the override resolves your issue at this moment. > FilterableTableSource does not use filters on job run > - > > Key: FLINK-12399 > URL: https://issues.apache.org/jira/browse/FLINK-12399 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.8.0 >Reporter: Josh Bradt >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Attachments: flink-filter-bug.tar.gz > > Time Spent: 0.5h > Remaining Estimate: 0h > > As discussed [on the mailing > list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html], > there appears to be a bug where a job that uses a custom > FilterableTableSource does not keep the filters that were pushed down into > the table source. More specifically, the table source does receive filters > via applyPredicates, and a new table source with those filters is returned, > but the final job graph appears to use the original table source, which does > not contain any filters. > I attached a minimal example program to this ticket. The custom table source > is as follows: > {code:java} > public class CustomTableSource implements BatchTableSource, > FilterableTableSource { > private static final Logger LOG = > LoggerFactory.getLogger(CustomTableSource.class); > private final Filter[] filters; > private final FilterConverter converter = new FilterConverter(); > public CustomTableSource() { > this(null); > } > private CustomTableSource(Filter[] filters) { > this.filters = filters; > } > @Override > public DataSet getDataSet(ExecutionEnvironment execEnv) { > if (filters == null) { >LOG.info(" No filters defined "); > } else { > LOG.info(" Found filters "); > for (Filter filter : filters) { > LOG.info("FILTER: {}", filter); > } > } > return execEnv.fromCollection(allModels()); > } > @Override > public TableSource applyPredicate(List predicates) { > LOG.info("Applying predicates"); > List acceptedFilters = new ArrayList<>(); > for (final Expression predicate : predicates) { > converter.convert(predicate).ifPresent(acceptedFilters::add); > } > return new CustomTableSource(acceptedFilters.toArray(new Filter[0])); > } > @Override > public boolean isFilterPushedDown() { > return filters != null; > } > @Override > public TypeInformation getReturnType() { > return TypeInformation.of(Model.class); > } > @Override > public TableSchema getTableSchema() { > return TableSchema.fromTypeInfo(getReturnType()); > } > private List allModels() { > List models = new ArrayList<>(); > models.add(new Model(1, 2, 3, 4)); > models.add(new Model(10, 11, 12, 13)); > models.add(new Model(20, 21, 22, 23)); > return models; > } > } > {code} > > When run, it logs > {noformat} > 15:24:54,888 INFO com.klaviyo.filterbug.Cu
[jira] [Commented] (FLINK-11936) Remove AuxiliaryConverter pull-in from calcite and fix auxiliary converter issue.
[ https://issues.apache.org/jira/browse/FLINK-11936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16932569#comment-16932569 ] Rong Rong commented on FLINK-11936: --- [~danny0405] yes. I think that would be great. > Remove AuxiliaryConverter pull-in from calcite and fix auxiliary converter > issue. > - > > Key: FLINK-11936 > URL: https://issues.apache.org/jira/browse/FLINK-11936 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > AuxiliaryConverter was pulled in FLINK-6409. Since CALCITE-1761 has been > fixed, we should sync back with the calcite version. > After a quick glance, I think it is not so simple to just delete the class so > I opened a follow up Jira on this issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10232) Add a SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-10232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841511#comment-16841511 ] Rong Rong commented on FLINK-10232: --- Thanks Danny for bringing this up. I think we can definitely collaborate on this together. I think based on the [email thread|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-SQL-DDL-Design-tp25006.html] we've pretty much agreeing with going forward with the MVP design on the {{CREATE TABLE}} functionality first. I would follow up with [~suez1224] offline to see what his opinion on the status on FLINK-6962. I think it is appropriate to follow up on this is multiple angle. 1. [~danny0405] can you share more details on the *flink-sql-parser* you mentioned and how you envision to first support DDL on the batch side first? AFAIK currently the flink-table-planner is supporting both the dataset and datastream side at the same time and I think flink-table-planner-blink is a single module. I think it would be nice to layout what's the structure/dependency vision for discussion of backward compatibility; migration and plans. 2. I will follow up with the discussion thread shortly to bring back some of the points regarding the post MVP discussion which has some discrepancies between the design doc, such as attribute / source,sink. 3. I think we can also follow up with some of the orthogonal implementations for the rest of the discussion, such as: {{CREATE FUNCTION}}. Move this piece by piece is a good idea IMO. what do you guys think? [~danny0405][~ykt836][~suez1224][~twalthr] > Add a SQL DDL > - > > Key: FLINK-10232 > URL: https://issues.apache.org/jira/browse/FLINK-10232 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API >Reporter: Timo Walther >Assignee: Danny Chan >Priority: Major > > This is an umbrella issue for all efforts related to supporting a SQL Data > Definition Language (DDL) in Flink's Table & SQL API. > Such a DDL includes creating, deleting, replacing: > - tables > - views > - functions > - types > - libraries > - catalogs > If possible, the parsing/validating/logical part should be done using > Calcite. Related issues are CALCITE-707, CALCITE-2045, CALCITE-2046, > CALCITE-2214, and others. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11921) Upgrade Calcite dependency to 1.19
[ https://issues.apache.org/jira/browse/FLINK-11921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16852147#comment-16852147 ] Rong Rong commented on FLINK-11921: --- Hi [~twalthr], [~danny0405] and I discussed briefly that we will skip 1.19 and directly to with 1.20, I will rename this to upgrade 1.20 if that's all good with everyone (since dependencies and todos are still valid) > Upgrade Calcite dependency to 1.19 > -- > > Key: FLINK-11921 > URL: https://issues.apache.org/jira/browse/FLINK-11921 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Timo Walther >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Umbrella issue for all tasks related to the next Calcite upgrade. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11921) Upgrade Calcite dependency to 1.20
[ https://issues.apache.org/jira/browse/FLINK-11921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-11921: -- Summary: Upgrade Calcite dependency to 1.20 (was: Upgrade Calcite dependency to 1.19) > Upgrade Calcite dependency to 1.20 > -- > > Key: FLINK-11921 > URL: https://issues.apache.org/jira/browse/FLINK-11921 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Timo Walther >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Umbrella issue for all tasks related to the next Calcite upgrade. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11921) Upgrade Calcite dependency to 1.20
[ https://issues.apache.org/jira/browse/FLINK-11921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-11921: -- Description: Umbrella issue for all tasks related to the next Calcite upgrade to 1.20.x release We will skip 1.19.x since the change required is minor. was:Umbrella issue for all tasks related to the next Calcite upgrade. > Upgrade Calcite dependency to 1.20 > -- > > Key: FLINK-11921 > URL: https://issues.apache.org/jira/browse/FLINK-11921 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Timo Walther >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Umbrella issue for all tasks related to the next Calcite upgrade to 1.20.x > release > We will skip 1.19.x since the change required is minor. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11921) Upgrade Calcite dependency to 1.20
[ https://issues.apache.org/jira/browse/FLINK-11921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16858835#comment-16858835 ] Rong Rong commented on FLINK-11921: --- changed title as discussed: we will skip 1.19, directly go with 1.20 > Upgrade Calcite dependency to 1.20 > -- > > Key: FLINK-11921 > URL: https://issues.apache.org/jira/browse/FLINK-11921 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Timo Walther >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Umbrella issue for all tasks related to the next Calcite upgrade to 1.20.x > release > We will skip 1.19.x since the change required is minor. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12872) WindowOperator may fail with UnsupportedOperationException when merging windows
[ https://issues.apache.org/jira/browse/FLINK-12872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16866178#comment-16866178 ] Rong Rong commented on FLINK-12872: --- This might be a [related reading|https://stackoverflow.com/questions/2978598/will-system-currenttimemillis-always-return-a-value-previous-calls] > WindowOperator may fail with UnsupportedOperationException when merging > windows > --- > > Key: FLINK-12872 > URL: https://issues.apache.org/jira/browse/FLINK-12872 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Piotr Nowojski >Priority: Major > > [Reported > |http://mail-archives.apache.org/mod_mbox/flink-user/201906.mbox/%3CCALDWsfhbP6D9+pnTzYuGaP0V4nReKJ4s9VsG_Xe1hZJq4O=z...@mail.gmail.com%3E] > by a user. > {noformat} > I have a job that uses processing time session window with inactivity gap of > 60ms where I intermittently run into the following exception. I'm trying to > figure out what happened here. Haven't been able to reproduce this scenario. > Any thoughts? > java.lang.UnsupportedOperationException: The end timestamp of a > processing-time window cannot become earlier than the current processing time > by merging. Current processing time: 1560493731808 window: > TimeWindow{start=1560493731654, end=1560493731778} > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > {noformat} > This is happening probably because {{System.currentTimeMillis()}} is not a > monotonic function and {{WindowOperator}} accesses it at least twice: once > when it creates a window and second time during performing the above > mentioned check (that has failed). However I would guess there are more > places like this, not only in {{WindowOperator}}. > The fix could be either to make sure that processing time is monotonic, or to > access it only once per operator per record or to drop processing time in > favour of ingestion time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-16522) Use type hints to declare the signature of the methods
[ https://issues.apache.org/jira/browse/FLINK-16522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17072854#comment-17072854 ] Rong Rong commented on FLINK-16522: --- Is this Jira referring to TypeChecking as well? [https://realpython.com/python-type-checking/] or just introducing Type Hints [https://www.python.org/dev/peps/pep-0484/] ? with type checking/hints it might also help improvement the API documentations (not sure if there's any document auto-gen on Python side but at least now the function itself contains the Type Hints when looking them up) IMO this is a great idea. +1 to have this implemented. > Use type hints to declare the signature of the methods > -- > > Key: FLINK-16522 > URL: https://issues.apache.org/jira/browse/FLINK-16522 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Fix For: 1.11.0 > > > Type Hints was introduced in Python 3.5 and it would be great if we can > declare the signature of the methods using type hints. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16522) Use type hints to declare the signature of the methods
[ https://issues.apache.org/jira/browse/FLINK-16522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-16522: -- Description: [Type Hints|https://www.python.org/dev/peps/pep-0484/] was introduced in Python 3.5 and it would be great if we can declare the signature of the methods using type hints and introduce type check in the python APIs (was: Type Hints was introduced in Python 3.5 and it would be great if we can declare the signature of the methods using type hints.) > Use type hints to declare the signature of the methods > -- > > Key: FLINK-16522 > URL: https://issues.apache.org/jira/browse/FLINK-16522 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Fix For: 1.11.0 > > > [Type Hints|https://www.python.org/dev/peps/pep-0484/] was introduced in > Python 3.5 and it would be great if we can declare the signature of the > methods using type hints and introduce type check in the python APIs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16522) Use type hints to declare the signature of the methods
[ https://issues.apache.org/jira/browse/FLINK-16522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-16522: -- Description: [Type Hints|https://www.python.org/dev/peps/pep-0484/] was introduced in Python 3.5 and it would be great if we can declare the signature of the methods using type hints and introduce [type check|https://realpython.com/python-type-checking/] in the python APIs (was: [Type Hints|https://www.python.org/dev/peps/pep-0484/] was introduced in Python 3.5 and it would be great if we can declare the signature of the methods using type hints and introduce type check in the python APIs) > Use type hints to declare the signature of the methods > -- > > Key: FLINK-16522 > URL: https://issues.apache.org/jira/browse/FLINK-16522 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Fix For: 1.11.0 > > > [Type Hints|https://www.python.org/dev/peps/pep-0484/] was introduced in > Python 3.5 and it would be great if we can declare the signature of the > methods using type hints and introduce [type > check|https://realpython.com/python-type-checking/] in the python APIs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16522) Use type hints to declare the signature of the methods
[ https://issues.apache.org/jira/browse/FLINK-16522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17076873#comment-17076873 ] Rong Rong commented on FLINK-16522: --- Thanks for the clarification. huge +1. looking forward to this :) > Use type hints to declare the signature of the methods > -- > > Key: FLINK-16522 > URL: https://issues.apache.org/jira/browse/FLINK-16522 > Project: Flink > Issue Type: Improvement > Components: API / Python >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Fix For: 1.11.0 > > > [Type Hints|https://www.python.org/dev/peps/pep-0484/] was introduced in > Python 3.5 and it would be great if we can declare the signature of the > methods using type hints and introduce [type > check|https://realpython.com/python-type-checking/] in the python APIs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9630: - Comment: was deleted (was: Hi All, Is this a duplicate of FLINK-8497 ?) > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.4.2, 1.5.0 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16997755#comment-16997755 ] Rong Rong commented on FLINK-9630: -- Hi All, Is this a duplicate of FLINK-8497 ? > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.4.2, 1.5.0 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16997757#comment-16997757 ] Rong Rong commented on FLINK-9630: -- Hi All, is this connected somehow with FLINK-8497? We are investigating some similar issues with the PartitionDiscovery. Also I think based on the bug report. This doesn't seem to affect Kafka 0.11 and up, yes ? -- Rong > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.4.2, 1.5.0 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-15451) TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure failed on azure
[ https://issues.apache.org/jira/browse/FLINK-15451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-15451: - Assignee: Rong Rong > TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure > failed on azure > -- > > Key: FLINK-15451 > URL: https://issues.apache.org/jira/browse/FLINK-15451 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.9.1 >Reporter: Congxian Qiu(klion26) >Assignee: Rong Rong >Priority: Major > > 2019-12-31T02:43:39.4766254Z [ERROR] Tests run: 2, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 42.801 s <<< FAILURE! - in > org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase > 2019-12-31T02:43:39.4768373Z [ERROR] > testTaskManagerProcessFailure[0](org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase) > Time elapsed: 2.699 s <<< ERROR! 2019-12-31T02:43:39.4768834Z > java.net.BindException: Address already in use 2019-12-31T02:43:39.4769096Z > > > [https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/3995/logs/15] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-15451) TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure failed on azure
[ https://issues.apache.org/jira/browse/FLINK-15451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-15451: - Assignee: (was: Rong Rong) > TaskManagerProcessFailureBatchRecoveryITCase.testTaskManagerProcessFailure > failed on azure > -- > > Key: FLINK-15451 > URL: https://issues.apache.org/jira/browse/FLINK-15451 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.9.1 >Reporter: Congxian Qiu(klion26) >Priority: Major > > 2019-12-31T02:43:39.4766254Z [ERROR] Tests run: 2, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 42.801 s <<< FAILURE! - in > org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase > 2019-12-31T02:43:39.4768373Z [ERROR] > testTaskManagerProcessFailure[0](org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase) > Time elapsed: 2.699 s <<< ERROR! 2019-12-31T02:43:39.4768834Z > java.net.BindException: Address already in use 2019-12-31T02:43:39.4769096Z > > > [https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_apis/build/builds/3995/logs/15] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11120) The bug of timestampadd handles time
[ https://issues.apache.org/jira/browse/FLINK-11120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007605#comment-17007605 ] Rong Rong commented on FLINK-11120: --- Hi [~x1q1j1] yes it will be in 1.10 release and it has already been backported to 1.9.x as well. Thanks [~jark] for fixing the JIRA status :-) > The bug of timestampadd handles time > - > > Key: FLINK-11120 > URL: https://issues.apache.org/jira/browse/FLINK-11120 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Forward Xu >Assignee: Forward Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.2, 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The error occur when {{timestampadd(MINUTE, 1, time '01:00:00')}} is executed: > java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.Long > at org.apache.calcite.rex.RexBuilder.clean(RexBuilder.java:1520) > at org.apache.calcite.rex.RexBuilder.makeLiteral(RexBuilder.java:1318) > at > org.apache.flink.table.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:135) > at > org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressionsInternal(ReduceExpressionsRule.java:620) > at > org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressions(ReduceExpressionsRule.java:540) > at > org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:288) > I think it should meet the following conditions: > ||expression||Expect the result|| > |timestampadd(MINUTE, -1, time '00:00:00')|23:59:00| > |timestampadd(MINUTE, 1, time '00:00:00')|00:01:00| > |timestampadd(MINUTE, 1, time '23:59:59')|00:00:59| > |timestampadd(SECOND, 1, time '23:59:59')|00:00:00| > |timestampadd(HOUR, 1, time '23:59:59')|00:59:59| > This problem seems to be a bug in calcite. I have submitted isuse to calcite. > The following is the link. > CALCITE-2699 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15475) Add isOutputTypeUsed() API to Transformation
Rong Rong created FLINK-15475: - Summary: Add isOutputTypeUsed() API to Transformation Key: FLINK-15475 URL: https://issues.apache.org/jira/browse/FLINK-15475 Project: Flink Issue Type: Improvement Components: API / Core, API / DataSet, API / DataStream Reporter: Rong Rong Assignee: Rong Rong Currently there's no way to "peek" into a Transformation and see if OutputType has been used or not. The only way is to invoke the {{setOutputType}} API and wrap around it with a try-catch block. It would be nice if we have a `isTypeUsed()` or `isOutputTypeUsed()` API to check whether a particular transformation has a definitive output type set / used or not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15475) Add isOutputTypeUsed() API to Transformation
[ https://issues.apache.org/jira/browse/FLINK-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-15475: -- Description: Currently there's no way to "peek" into a `Transformation` object and see if `typeUsed` has been set or not. The only way is to invoke the `setOutputType` API and wrap around it with a try-catch block. It would be nice if we have a `isTypeUsed()` or `isOutputTypeUsed()` API to check whether a particular transformation has a definitive output type set / used or not. was: Currently there's no way to "peek" into a Transformation and see if OutputType has been used or not. The only way is to invoke the {{setOutputType}} API and wrap around it with a try-catch block. It would be nice if we have a `isTypeUsed()` or `isOutputTypeUsed()` API to check whether a particular transformation has a definitive output type set / used or not. > Add isOutputTypeUsed() API to Transformation > > > Key: FLINK-15475 > URL: https://issues.apache.org/jira/browse/FLINK-15475 > Project: Flink > Issue Type: Improvement > Components: API / Core, API / DataSet, API / DataStream >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Minor > > Currently there's no way to "peek" into a `Transformation` object and see if > `typeUsed` has been set or not. The only way is to invoke the `setOutputType` > API and wrap around it with a try-catch block. > It would be nice if we have a `isTypeUsed()` or `isOutputTypeUsed()` API to > check whether a particular transformation has a definitive output type set / > used or not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15475) Add isOutputTypeUsed() API to Transformation
[ https://issues.apache.org/jira/browse/FLINK-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-15475: -- Description: Currently there's no way to "peek" into a `Transformation` object and see if `typeUsed` has been set or not. The only way is to invoke the `setOutputType` API and wrap around it with a try-catch block. See: It would be nice if we have a `isTypeUsed()` or `isOutputTypeUsed()` API to check whether a particular transformation has a definitive output type set / used or not. was: Currently there's no way to "peek" into a `Transformation` object and see if `typeUsed` has been set or not. The only way is to invoke the `setOutputType` API and wrap around it with a try-catch block. It would be nice if we have a `isTypeUsed()` or `isOutputTypeUsed()` API to check whether a particular transformation has a definitive output type set / used or not. > Add isOutputTypeUsed() API to Transformation > > > Key: FLINK-15475 > URL: https://issues.apache.org/jira/browse/FLINK-15475 > Project: Flink > Issue Type: Improvement > Components: API / Core, API / DataSet, API / DataStream >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Minor > > Currently there's no way to "peek" into a `Transformation` object and see if > `typeUsed` has been set or not. The only way is to invoke the `setOutputType` > API and wrap around it with a try-catch block. See: > It would be nice if we have a `isTypeUsed()` or `isOutputTypeUsed()` API to > check whether a particular transformation has a definitive output type set / > used or not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15475) Add isOutputTypeUsed() API to Transformation
[ https://issues.apache.org/jira/browse/FLINK-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-15475: -- Description: Currently there's no way to "peek" into a `Transformation` object and see if `typeUsed` has been set or not. The only way is to invoke the `setOutputType` API and wrap around it with a try-catch block similar to: {code:java} try { (SingleOutputStreamOperator)dataStream .returns(myOutputType); } catch (ValidationException ex) { // ... handle exception when type has been used. } {code} It would be nice if we have a `isTypeUsed()` or `isOutputTypeUsed()` API to check whether a particular transformation has a definitive output type set / used or not. was: Currently there's no way to "peek" into a `Transformation` object and see if `typeUsed` has been set or not. The only way is to invoke the `setOutputType` API and wrap around it with a try-catch block. See: It would be nice if we have a `isTypeUsed()` or `isOutputTypeUsed()` API to check whether a particular transformation has a definitive output type set / used or not. > Add isOutputTypeUsed() API to Transformation > > > Key: FLINK-15475 > URL: https://issues.apache.org/jira/browse/FLINK-15475 > Project: Flink > Issue Type: Improvement > Components: API / Core, API / DataSet, API / DataStream >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Minor > > Currently there's no way to "peek" into a `Transformation` object and see if > `typeUsed` has been set or not. The only way is to invoke the `setOutputType` > API and wrap around it with a try-catch block similar to: > {code:java} > try { > (SingleOutputStreamOperator)dataStream > .returns(myOutputType); > } catch (ValidationException ex) { > // ... handle exception when type has been used. > } > {code} > It would be nice if we have a `isTypeUsed()` or `isOutputTypeUsed()` API to > check whether a particular transformation has a definitive output type set / > used or not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.
[ https://issues.apache.org/jira/browse/FLINK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong closed FLINK-17386. - Fix Version/s: 1.11.0 Resolution: Fixed > Exception in HadoopSecurityContextFactory.createContext while no > shaded-hadoop-lib provided. > > > Key: FLINK-17386 > URL: https://issues.apache.org/jira/browse/FLINK-17386 > Project: Flink > Issue Type: Bug >Reporter: Wenlong Lyu >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > > java.io.IOException: Process execution failed due error. Error > output:java.lang.NoClassDefFoundError: Could not initialize class > org.apache.hadoop.security.UserGroupInformation\n\tat > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat > > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat > java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat > java.lang.Thread.run(Thread.java:834) > I think it is because exception throw in the static code block of > UserInformation, we should catch Throwable instead of Exception in > HadoopSecurityContextFactory#createContext? > [~rongr] what do you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.
[ https://issues.apache.org/jira/browse/FLINK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17109703#comment-17109703 ] Rong Rong commented on FLINK-17386: --- closed via: 1892bedeea9fa118b6e3bcb572f63c2e7f6d83e3 > Exception in HadoopSecurityContextFactory.createContext while no > shaded-hadoop-lib provided. > > > Key: FLINK-17386 > URL: https://issues.apache.org/jira/browse/FLINK-17386 > Project: Flink > Issue Type: Bug >Reporter: Wenlong Lyu >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > > java.io.IOException: Process execution failed due error. Error > output:java.lang.NoClassDefFoundError: Could not initialize class > org.apache.hadoop.security.UserGroupInformation\n\tat > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat > > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat > java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat > java.lang.Thread.run(Thread.java:834) > I think it is because exception throw in the static code block of > UserInformation, we should catch Throwable instead of Exception in > HadoopSecurityContextFactory#createContext? > [~rongr] what do you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.
[ https://issues.apache.org/jira/browse/FLINK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-17386: -- Component/s: Deployment / YARN > Exception in HadoopSecurityContextFactory.createContext while no > shaded-hadoop-lib provided. > > > Key: FLINK-17386 > URL: https://issues.apache.org/jira/browse/FLINK-17386 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.11.0 >Reporter: Wenlong Lyu >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > > java.io.IOException: Process execution failed due error. Error > output:java.lang.NoClassDefFoundError: Could not initialize class > org.apache.hadoop.security.UserGroupInformation\n\tat > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat > > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat > java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat > java.lang.Thread.run(Thread.java:834) > I think it is because exception throw in the static code block of > UserInformation, we should catch Throwable instead of Exception in > HadoopSecurityContextFactory#createContext? > [~rongr] what do you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17386) Exception in HadoopSecurityContextFactory.createContext while no shaded-hadoop-lib provided.
[ https://issues.apache.org/jira/browse/FLINK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-17386: -- Affects Version/s: 1.11.0 > Exception in HadoopSecurityContextFactory.createContext while no > shaded-hadoop-lib provided. > > > Key: FLINK-17386 > URL: https://issues.apache.org/jira/browse/FLINK-17386 > Project: Flink > Issue Type: Bug >Affects Versions: 1.11.0 >Reporter: Wenlong Lyu >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > > java.io.IOException: Process execution failed due error. Error > output:java.lang.NoClassDefFoundError: Could not initialize class > org.apache.hadoop.security.UserGroupInformation\n\tat > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory.createContext(HadoopSecurityContextFactory.java:59)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.installContext(SecurityUtils.java:92)\n\tat > > org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:60)\n\tat > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:964)\n\n\tat > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:144)\n\tat > > com.alibaba.flink.vvr.util.AutoClosableProcess$AutoClosableProcessBuilder.runBlocking(AutoClosableProcess.java:126)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.runSingleJobCompileCheck(VVRCompileTest.java:173)\n\tat > > com.alibaba.flink.vvr.VVRCompileTest.lambda$runJobsCompileCheck$0(VVRCompileTest.java:101)\n\tat > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat > java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)\n\tat > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)\n\tat > java.lang.Thread.run(Thread.java:834) > I think it is because exception throw in the static code block of > UserInformation, we should catch Throwable instead of Exception in > HadoopSecurityContextFactory#createContext? > [~rongr] what do you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
Rong Rong created FLINK-7922: Summary: leastRestrictive in FlinkTypeFactory does not resolve composite type correctly Key: FLINK-7922 URL: https://issues.apache.org/jira/browse/FLINK-7922 Project: Flink Issue Type: Bug Reporter: Rong Rong FlinkTypeFactory does not override the following function correctly: `leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... }` to deal with situations like ``` CASE WHEN THEN ELSE NULL END ``` will trigger runtime exception -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
[ https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-7922: - Component/s: Table API & SQL > leastRestrictive in FlinkTypeFactory does not resolve composite type correctly > -- > > Key: FLINK-7922 > URL: https://issues.apache.org/jira/browse/FLINK-7922 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Rong Rong > > FlinkTypeFactory does not override the following function correctly: > {code:scala} > def leastRestrictive(types: util.List[RelDataType]): RelDataType = { > //... > } > {code} > to deal with situations like > {code:sql} > CASE > WHEN THEN > > ELSE > NULL > END > {code} > will trigger runtime exception. > See following test sample for more details: > https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
[ https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-7922: - Description: FlinkTypeFactory does not override the following function correctly: {code:java} def leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... } {code} to deal with situations like {code:sql} CASE WHEN THEN ELSE NULL END {code} will trigger runtime exception. See following test sample for more details: https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 was: FlinkTypeFactory does not override the following function correctly: {code:scala} def leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... } {code} to deal with situations like {code:sql} CASE WHEN THEN ELSE NULL END {code} will trigger runtime exception. See following test sample for more details: https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 > leastRestrictive in FlinkTypeFactory does not resolve composite type correctly > -- > > Key: FLINK-7922 > URL: https://issues.apache.org/jira/browse/FLINK-7922 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Rong Rong > > FlinkTypeFactory does not override the following function correctly: > {code:java} > def leastRestrictive(types: util.List[RelDataType]): RelDataType = { > //... > } > {code} > to deal with situations like > {code:sql} > CASE > WHEN THEN > > ELSE > NULL > END > {code} > will trigger runtime exception. > See following test sample for more details: > https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
[ https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-7922: - Affects Version/s: 1.4.0 > leastRestrictive in FlinkTypeFactory does not resolve composite type correctly > -- > > Key: FLINK-7922 > URL: https://issues.apache.org/jira/browse/FLINK-7922 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Rong Rong > > FlinkTypeFactory does not override the following function correctly: > {code:scala} > def leastRestrictive(types: util.List[RelDataType]): RelDataType = { > //... > } > {code} > to deal with situations like > {code:sql} > CASE > WHEN THEN > > ELSE > NULL > END > {code} > will trigger runtime exception. > See following test sample for more details: > https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
[ https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-7922: Assignee: Rong Rong > leastRestrictive in FlinkTypeFactory does not resolve composite type correctly > -- > > Key: FLINK-7922 > URL: https://issues.apache.org/jira/browse/FLINK-7922 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Rong Rong > > FlinkTypeFactory does not override the following function correctly: > {code:scala} > def leastRestrictive(types: util.List[RelDataType]): RelDataType = { > //... > } > {code} > to deal with situations like > {code:sql} > CASE > WHEN THEN > > ELSE > NULL > END > {code} > will trigger runtime exception. > See following test sample for more details: > https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
[ https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-7922: - Description: FlinkTypeFactory does not override the following function correctly: {code:scala} def leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... } {code} to deal with situations like {code:sql} CASE WHEN THEN ELSE NULL END {code} will trigger runtime exception. See following test sample for more details: https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 was: FlinkTypeFactory does not override the following function correctly: `leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... }` to deal with situations like ``` CASE WHEN THEN ELSE NULL END ``` will trigger runtime exception > leastRestrictive in FlinkTypeFactory does not resolve composite type correctly > -- > > Key: FLINK-7922 > URL: https://issues.apache.org/jira/browse/FLINK-7922 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong > > FlinkTypeFactory does not override the following function correctly: > {code:scala} > def leastRestrictive(types: util.List[RelDataType]): RelDataType = { > //... > } > {code} > to deal with situations like > {code:sql} > CASE > WHEN THEN > > ELSE > NULL > END > {code} > will trigger runtime exception. > See following test sample for more details: > https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
[ https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-7922: - Description: FlinkTypeFactory does not override the following function correctly: {code:java} def leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... } {code} dealing with SQL such as: {code:sql} CASE WHEN THEN ELSE NULL END {code} will trigger runtime exception. See following test sample for more details: https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 was: FlinkTypeFactory does not override the following function correctly: {code:java} def leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... } {code} to deal with situations like {code:sql} CASE WHEN THEN ELSE NULL END {code} will trigger runtime exception. See following test sample for more details: https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 > leastRestrictive in FlinkTypeFactory does not resolve composite type correctly > -- > > Key: FLINK-7922 > URL: https://issues.apache.org/jira/browse/FLINK-7922 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong >Assignee: Rong Rong > > FlinkTypeFactory does not override the following function correctly: > {code:java} > def leastRestrictive(types: util.List[RelDataType]): RelDataType = { > //... > } > {code} > dealing with SQL such as: > {code:sql} > CASE > WHEN THEN > > ELSE > NULL > END > {code} > will trigger runtime exception. > See following test sample for more details: > https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column
Rong Rong created FLINK-7923: Summary: SQL parser exception when accessing subfields of a Composite element in an Object Array type column Key: FLINK-7923 URL: https://issues.apache.org/jira/browse/FLINK-7923 Project: Flink Issue Type: Bug Affects Versions: 1.4.0 Reporter: Rong Rong Access type such as: {code:SQL} SELECT a[1].f0 FROM MyTable {code} will cause problem. See following test sample for more details: https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-7923: - Component/s: Table API & SQL > SQL parser exception when accessing subfields of a Composite element in an > Object Array type column > --- > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7934) Upgrade Calcite dependency to 1.15
Rong Rong created FLINK-7934: Summary: Upgrade Calcite dependency to 1.15 Key: FLINK-7934 URL: https://issues.apache.org/jira/browse/FLINK-7934 Project: Flink Issue Type: Bug Reporter: Rong Rong Umbrella issue for all related issues for Apache Calcite 1.15 release. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column
[ https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220895#comment-16220895 ] Rong Rong commented on FLINK-7923: -- Done, created umbrella task for Calcite 1.15 release. > SQL parser exception when accessing subfields of a Composite element in an > Object Array type column > --- > > Key: FLINK-7923 > URL: https://issues.apache.org/jira/browse/FLINK-7923 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Rong Rong > > Access type such as: > {code:SQL} > SELECT > a[1].f0 > FROM > MyTable > {code} > will cause problem. > See following test sample for more details: > https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8038) Support MAP literal
Rong Rong created FLINK-8038: Summary: Support MAP literal Key: FLINK-8038 URL: https://issues.apache.org/jira/browse/FLINK-8038 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Rong Rong Assignee: Rong Rong Similar to https://issues.apache.org/jira/browse/FLINK-4554 We want to support Map literals which is supported by Calcite: https://calcite.apache.org/docs/reference.html#value-constructors {code:sql} SELECT MAP['key1', f0, 'key2', f1] AS stringKeyedMap, MAP['key', 'value'] AS literalMap, MAP[f0, f1] AS fieldMap FROM table {code} This should enable users to construct MapTypeInfo, one of the CompositeType. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8038) Support MAP value constructor
[ https://issues.apache.org/jira/browse/FLINK-8038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-8038: - Summary: Support MAP value constructor (was: Support MAP literal) > Support MAP value constructor > - > > Key: FLINK-8038 > URL: https://issues.apache.org/jira/browse/FLINK-8038 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Similar to https://issues.apache.org/jira/browse/FLINK-4554 > We want to support Map literals which is supported by Calcite: > https://calcite.apache.org/docs/reference.html#value-constructors > {code:sql} > SELECT > MAP['key1', f0, 'key2', f1] AS stringKeyedMap, > MAP['key', 'value'] AS literalMap, > MAP[f0, f1] AS fieldMap > FROM > table > {code} > This should enable users to construct MapTypeInfo, one of the CompositeType. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8038) Support MAP value constructor
[ https://issues.apache.org/jira/browse/FLINK-8038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-8038: - Description: Similar to https://issues.apache.org/jira/browse/FLINK-4554 We want to support Map value constructor which is supported by Calcite: https://calcite.apache.org/docs/reference.html#value-constructors {code:sql} SELECT MAP['key1', f0, 'key2', f1] AS stringKeyedMap, MAP['key', 'value'] AS literalMap, MAP[f0, f1] AS fieldMap FROM table {code} This should enable users to construct MapTypeInfo, one of the CompositeType. was: Similar to https://issues.apache.org/jira/browse/FLINK-4554 We want to support Map literals which is supported by Calcite: https://calcite.apache.org/docs/reference.html#value-constructors {code:sql} SELECT MAP['key1', f0, 'key2', f1] AS stringKeyedMap, MAP['key', 'value'] AS literalMap, MAP[f0, f1] AS fieldMap FROM table {code} This should enable users to construct MapTypeInfo, one of the CompositeType. > Support MAP value constructor > - > > Key: FLINK-8038 > URL: https://issues.apache.org/jira/browse/FLINK-8038 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Similar to https://issues.apache.org/jira/browse/FLINK-4554 > We want to support Map value constructor which is supported by Calcite: > https://calcite.apache.org/docs/reference.html#value-constructors > {code:sql} > SELECT > MAP['key1', f0, 'key2', f1] AS stringKeyedMap, > MAP['key', 'value'] AS literalMap, > MAP[f0, f1] AS fieldMap > FROM > table > {code} > This should enable users to construct MapTypeInfo, one of the CompositeType. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8003) Support Calcite's ROW value constructor in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-8003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-8003: - Description: We want to use the row value constructor and CAST to create a named row, or even nested named row, e.g. {code:sql} CREATE TYPE myrowtype AS (f1 INTEGER, f2 VARCHAR(10)) SELECT CAST(ROW(intField, "test") AS myrowtype) AS myRow FROM myTable; {code} So if converted to JSON, the output will be {code:java} {"myRow":{"f0": ${intField}, "f1":"test"}} {code}. was: We want to use the row value constructor and CAST to create a named row, or even nested named row, e.g. {code:sql} CREATE TYPE myrowtype AS (f1 INTEGER, f2 VARCHAR(10)) SELECT CAST(ROW(intField, "test") AS myrowtype) AS myRow FROM myTable; {code} So if converted to JSON, the output will be {code:json} {"myRow":{"f0": ${intField}, "f1":"test"}} {code}. > Support Calcite's ROW value constructor in Flink SQL > > > Key: FLINK-8003 > URL: https://issues.apache.org/jira/browse/FLINK-8003 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > > We want to use the row value constructor and CAST to create a named row, or > even nested named row, e.g. > {code:sql} > CREATE TYPE myrowtype AS (f1 INTEGER, f2 VARCHAR(10)) > SELECT CAST(ROW(intField, "test") AS myrowtype) AS myRow FROM myTable; > {code} > So if converted to JSON, the output will be > {code:java} > {"myRow":{"f0": ${intField}, "f1":"test"}} > {code}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8003) Support Calcite's ROW value constructor in Flink SQL
[ https://issues.apache.org/jira/browse/FLINK-8003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-8003: - Description: We want to use the row value constructor and CAST to create a named row, or even nested named row, e.g. {code:sql} CREATE TYPE myrowtype AS (f1 INTEGER, f2 VARCHAR(10)) SELECT CAST(ROW(intField, "test") AS myrowtype) AS myRow FROM myTable; {code} So if converted to JSON, the output will be {code:json} {"myRow":{"f0": ${intField}, "f1":"test"}} {code}. was: We want to use the row value constructor and CAST to create a named row, or even nested named row, e.g. {code:java} CREATE TYPE myrowtype AS (f1 INTEGER, f2 VARCHAR(10)) SELECT CAST(ROW(intField, "test") AS myrowtype) AS myRow FROM myTable; {code} So if converted to JSON, the output will be {"myRow":{"f0": ${intField}, "f1":"test"}}. > Support Calcite's ROW value constructor in Flink SQL > > > Key: FLINK-8003 > URL: https://issues.apache.org/jira/browse/FLINK-8003 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shuyi Chen >Assignee: Shuyi Chen > > We want to use the row value constructor and CAST to create a named row, or > even nested named row, e.g. > {code:sql} > CREATE TYPE myrowtype AS (f1 INTEGER, f2 VARCHAR(10)) > SELECT CAST(ROW(intField, "test") AS myrowtype) AS myRow FROM myTable; > {code} > So if converted to JSON, the output will be > {code:json} > {"myRow":{"f0": ${intField}, "f1":"test"}} > {code}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8104) Fix Row value constructor
Rong Rong created FLINK-8104: Summary: Fix Row value constructor Key: FLINK-8104 URL: https://issues.apache.org/jira/browse/FLINK-8104 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Rong Rong Support Row value constructor which is currently broken. See {code:java} // flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala @Test def testValueConstructorFunctions(): Unit = { // TODO we need a special code path that flattens ROW types // testSqlApi("ROW('hello world', 12)", "hello world") // test base only returns field 0 // testSqlApi("('hello world', 12)", "hello world") // test base only returns field 0 // ... } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-8104) Fix Row value constructor
[ https://issues.apache.org/jira/browse/FLINK-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-8104: Assignee: Rong Rong > Fix Row value constructor > - > > Key: FLINK-8104 > URL: https://issues.apache.org/jira/browse/FLINK-8104 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Support Row value constructor which is currently broken. > See > {code:java} > // > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala > @Test > def testValueConstructorFunctions(): Unit = { > // TODO we need a special code path that flattens ROW types > // testSqlApi("ROW('hello world', 12)", "hello world") // test base only > returns field 0 > // testSqlApi("('hello world', 12)", "hello world") // test base only > returns field 0 > // ... > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8151) [Table] removing map value equality check
Rong Rong created FLINK-8151: Summary: [Table] removing map value equality check Key: FLINK-8151 URL: https://issues.apache.org/jira/browse/FLINK-8151 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Rong Rong Following up with FLINK-8038. The equality check is not working as Map does not support element-wise equality. Suggest to remove it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-8151) [Table] removing map value equality check
[ https://issues.apache.org/jira/browse/FLINK-8151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-8151: Assignee: Rong Rong > [Table] removing map value equality check > - > > Key: FLINK-8151 > URL: https://issues.apache.org/jira/browse/FLINK-8151 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Following up with FLINK-8038. The equality check is not working as Map does > not support element-wise equality. Suggest to remove it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8151) [Table] Clean up Map equality check
[ https://issues.apache.org/jira/browse/FLINK-8151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-8151: - Summary: [Table] Clean up Map equality check (was: [Table] removing map value equality check) > [Table] Clean up Map equality check > --- > > Key: FLINK-8151 > URL: https://issues.apache.org/jira/browse/FLINK-8151 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Following up with FLINK-8038. The equality check is not working as Map does > not support element-wise equality. Suggest to remove it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8151) [Table] Clean up Map equality check
[ https://issues.apache.org/jira/browse/FLINK-8151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-8151: - Description: Following up with FLINK-8038. The equality check currently is broken. Plan to support element-wise equality check by always using the base class: "java.util.Map.equals" method. was:Following up with FLINK-8038. The equality check is not working as Map does not support element-wise equality. Suggest to remove it. > [Table] Clean up Map equality check > --- > > Key: FLINK-8151 > URL: https://issues.apache.org/jira/browse/FLINK-8151 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Following up with FLINK-8038. The equality check currently is broken. Plan to > support element-wise equality check by always using the base class: > "java.util.Map.equals" method. > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8151) [Table] Map equality check to use entrySet equality
[ https://issues.apache.org/jira/browse/FLINK-8151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-8151: - Summary: [Table] Map equality check to use entrySet equality (was: [Table] Clean up Map equality check) > [Table] Map equality check to use entrySet equality > --- > > Key: FLINK-8151 > URL: https://issues.apache.org/jira/browse/FLINK-8151 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Following up with FLINK-8038. The equality check currently is broken. Plan to > support element-wise equality check by always using the base class: > "java.util.Map.equals" method. > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8985) End-to-end test: CLI
[ https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16539256#comment-16539256 ] Rong Rong commented on FLINK-8985: -- Thanks [~florianschmidt] for the exhausted listing for all CLI. Currently I only implemented for some of the key code paths and components. I found out that there are situations where we need some specific combination of the option flags in order to trigger different code path, which makes it even more messy. I was actually thinking of something similar to FLINK-8986, and having some sort of auto-generated collection of CLI option flags for e2e testing. [~till.rohrmann] [~Zentol] what do you guys think? > End-to-end test: CLI > > > Key: FLINK-8985 > URL: https://issues.apache.org/jira/browse/FLINK-8985 > Project: Flink > Issue Type: Sub-task > Components: Client, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Rong Rong >Priority: Major > > We should an end-to-end test which verifies that all client commands are > working correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541084#comment-16541084 ] Rong Rong commented on FLINK-9294: -- hmm. nvm. I think I messed up with scala/java implicit conversion. I will ignore Scala class for now. > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > *{{MAP}}*, *{{ARRAY}}*, etc would require user to override > *{{getParameterType}}* or *{{getResultType}}* method explicitly. > It should be able to resolve the composite type based on the function > signature, such as: > {code:java} > public String[] eval(Map mapArg) { /* ... */ } > {code} > The function catalog search should do either of the following: > - Automatically resolve that: > 1. *{{ObjectArrayTypeInfo}}* to be the result type. > 2. *{{MapTypeInfo}}* to be > the parameter type. > - Improved function mapping to find and locate function with such signatures. > During compilation, should do the following: > - Consistent resolution for: (Scala.Map / java.util.Map) and (Scala.Seq / > Java array) > - Automatically ingest type cast function (see FLINK-9430) to match the > correct type, or automatically generate the counter part of the corresponding > Scala / Java implementation of the eval function. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8985) End-to-end test: CLI
[ https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547269#comment-16547269 ] Rong Rong commented on FLINK-8985: -- Yeah I agree, I would put more thought on this in a follow-up JIRA if that's ok with you? In the meantime could you kindly take a look at the PR? I created some very basic tests for verification on a periodic running streaming job; there are some test cases I deliberately removed because it was tested specifically in other E2E suites (thanks to [~tzulitai]'s comment) > End-to-end test: CLI > > > Key: FLINK-8985 > URL: https://issues.apache.org/jira/browse/FLINK-8985 > Project: Flink > Issue Type: Sub-task > Components: Client, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Rong Rong >Priority: Major > > We should an end-to-end test which verifies that all client commands are > working correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9294: - Description: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public String[] eval(Map mapArg) { /* ... */ } {code} The function catalog search should do either of the following: - Automatically resolve that: 1. *{{ObjectArrayTypeInfo}}* to be the result type. 2. *{{MapTypeInfo}}* to be the parameter type. - Improved function mapping to find and locate function with such signatures. During compilation, should do the following: - Consistent resolution for: (Scala.Map / java.util.Map) and (Scala.Seq / Java array) - Automatically ingest type cast function (see FLINK-9430) to match the correct type, or automatically generate the counter part of the corresponding Scala / Java implementation of the eval function. was: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public String[] eval(Map mapArg) { /* ... */ } {code} The function catalog search should do either of the following: - Automatically resolve that: 1. *{{ObjectArrayTypeInfo}}* to be the result type. 2. *{{MapTypeInfo}}* to be the parameter type. - Improved function mapping to find and locate function with such signatures. During compilation, should do the following: - Consistent resolution for: (Scala.Map / java.util.Map) and (Scala.Seq / Java array) - Automatically ingest type cast function (see FLINK-9430) to match the correct type, or automatically generate the counter part of the corresponding Scala / Java implementation of the eval function. > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > *{{MAP}}*, *{{ARRAY}}*, etc would require user to override > *{{getParameterType}}* or *{{getResultType}}* method explicitly. > It should be able to resolve the composite type based on the function > signature, such as: > {code:java} > public String[] eval(Map mapArg) { /* ... */ } > {code} > The function catalog search should do either of the following: > - Automatically resolve that: > 1. *{{ObjectArrayTypeInfo}}* to be the result type. > 2. *{{MapTypeInfo}}* to be the > parameter type. > - Improved function mapping to find and locate function with such signatures. > During compilation, should do the following: > - Consistent resolution for: (Scala.Map / java.util.Map) and (Scala.Seq / > Java array) > - Automatically ingest type cast function (see FLINK-9430) to match the > correct type, or automatically generate the counter part of the corresponding > Scala / Java implementation of the eval function. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9294: - Description: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public String[] eval(Map mapArg) { /* ... */ } {code} The function catalog search should do either of the following: [Update] since we have backward compatibility issue with resolving to a different type, we will not go with the modify type option. - -Automatically resolve that:- -1. *{{ObjectArrayTypeInfo}}* to be the result type.- -2. *{{MapTypeInfo}}* to be the parameter type.- - Improved function mapping to find and locate function with such signatures ** Change the following method to relax the check:*UserDefinedFunctionUtils.{{*getUserDefinedMethod*}}* ** *{{UserDefinedFunctionUtils.}}{{getEvalMethodSignature}}* - During compilation, should do the following: - Consistent resolution for: (Scala.Map / java.util.Map) and (Scala.Seq / Java array) - Automatically ingest type cast function (see FLINK-9430) to match the correct type, or automatically generate the counter part of the corresponding Scala / Java implementation of the eval function. was: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public String[] eval(Map mapArg) { /* ... */ } {code} The function catalog search should do either of the following: - Automatically resolve that: 1. *{{ObjectArrayTypeInfo}}* to be the result type. 2. *{{MapTypeInfo}}* to be the parameter type. - Improved function mapping to find and locate function with such signatures. During compilation, should do the following: - Consistent resolution for: (Scala.Map / java.util.Map) and (Scala.Seq / Java array) - Automatically ingest type cast function (see FLINK-9430) to match the correct type, or automatically generate the counter part of the corresponding Scala / Java implementation of the eval function. > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > *{{MAP}}*, *{{ARRAY}}*, etc would require user to override > *{{getParameterType}}* or *{{getResultType}}* method explicitly. > It should be able to resolve the composite type based on the function > signature, such as: > {code:java} > public String[] eval(Map mapArg) { /* ... */ } > {code} > The function catalog search should do either of the following: > [Update] since we have backward compatibility issue with resolving to a > different type, we will not go with the modify type option. > - -Automatically resolve that:- > -1. *{{ObjectArrayTypeInfo}}* to be the result type.- > -2. *{{MapTypeInfo}}* to be the > parameter type.- > - Improved function mapping to find and locate function with such signatures > ** Change the following method to relax the > check:*UserDefinedFunctionUtils.{{*getUserDefinedMethod*}}* > ** *{{UserDefinedFunctionUtils.}}{{getEvalMethodSignature}}* > - > During compilation, should do the following: > - Consistent resolution for: (Scala.Map / java.util.Map) and (Scala.Seq / > Java array) > - Automatically ingest type cast function (see FLINK-9430) to match the > correct type, or automatically generate the counter part of the corresponding > Scala / Java implementation of the eval function. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9294: - Description: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public String[] eval(Map mapArg) { /* ... */ } {code} The function catalog search should do either of the following: [Update] since we have backward compatibility issue with resolving to a different type, we will not go with the modify type option. - -Automatically resolve that:- -1. *{{ObjectArrayTypeInfo}}* to be the result type.- -2. *{{MapTypeInfo}}* to be the parameter type.- - Improved function mapping to find and locate function with such signatures [Update] This ticket should only cover Map and Row type, and does not cover Array, since Array is actually resolved by eval method signature correctly. This ticket should consolidate some discrepancy between how TableFunction, AggregateFunction and ScalarFunction resolves types. which at this moment goes through different code path. The rest of the optimization should go to follow up tickets in FLINK-9484 was: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public String[] eval(Map mapArg) { /* ... */ } {code} The function catalog search should do either of the following: [Update] since we have backward compatibility issue with resolving to a different type, we will not go with the modify type option. - -Automatically resolve that:- -1. *{{ObjectArrayTypeInfo}}* to be the result type.- -2. *{{MapTypeInfo}}* to be the parameter type.- - Improved function mapping to find and locate function with such signatures ** Change the following method to relax the check:*UserDefinedFunctionUtils.{{*getUserDefinedMethod*}}* ** *{{UserDefinedFunctionUtils.}}{{getEvalMethodSignature}}* - During compilation, should do the following: - Consistent resolution for: (Scala.Map / java.util.Map) and (Scala.Seq / Java array) - Automatically ingest type cast function (see FLINK-9430) to match the correct type, or automatically generate the counter part of the corresponding Scala / Java implementation of the eval function. > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > *{{MAP}}*, *{{ARRAY}}*, etc would require user to override > *{{getParameterType}}* or *{{getResultType}}* method explicitly. > It should be able to resolve the composite type based on the function > signature, such as: > {code:java} > public String[] eval(Map mapArg) { /* ... */ } > {code} > The function catalog search should do either of the following: > [Update] since we have backward compatibility issue with resolving to a > different type, we will not go with the modify type option. > - -Automatically resolve that:- > -1. *{{ObjectArrayTypeInfo}}* to be the result type.- > -2. *{{MapTypeInfo}}* to be the > parameter type.- > - Improved function mapping to find and locate function with such signatures > [Update] > This ticket should only cover Map and Row type, and does not cover Array, > since Array is actually resolved by eval method signature correctly. > This ticket should consolidate some discrepancy between how TableFunction, > AggregateFunction and ScalarFunction resolves types. which at this moment > goes through different code path. > The rest of the optimization should go to follow up tickets in FLINK-9484 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9294: - Description: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public String[] eval(Map mapArg) { /* ... */ } {code} The function catalog search should do either of the following: [Update] since we have backward compatibility issue with resolving to a different type, we will not go with the modify type option. - -Automatically resolve that:- -1. *{{ObjectArrayTypeInfo}}* to be the result type.- -2. *{{MapTypeInfo}}* to be the parameter type.- - Improved function mapping to find and locate function with such signatures [Update] This ticket should only cover Map and Row type, and does not cover Array, since Array is actually resolved by eval method signature correctly. This ticket should consolidate some discrepancy between how TableFunction, AggregateFunction and ScalarFunction resolves types. which at this moment goes through different code path. The rest of the optimization should go to follow up tickets in FLINK-9484 was: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public String[] eval(Map mapArg) { /* ... */ } {code} The function catalog search should do either of the following: [Update] since we have backward compatibility issue with resolving to a different type, we will not go with the modify type option. - -Automatically resolve that:- -1. *{{ObjectArrayTypeInfo}}* to be the result type.- -2. *{{MapTypeInfo}}* to be the parameter type.- - Improved function mapping to find and locate function with such signatures [Update] This ticket should only cover Map and Row type, and does not cover Array, since Array is actually resolved by eval method signature correctly. This ticket should consolidate some discrepancy between how TableFunction, AggregateFunction and ScalarFunction resolves types. which at this moment goes through different code path. The rest of the optimization should go to follow up tickets in FLINK-9484 > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > *{{MAP}}*, *{{ARRAY}}*, etc would require user to override > *{{getParameterType}}* or *{{getResultType}}* method explicitly. > It should be able to resolve the composite type based on the function > signature, such as: > {code:java} > public String[] eval(Map mapArg) { /* ... */ } > {code} > The function catalog search should do either of the following: > [Update] > since we have backward compatibility issue with resolving to a different > type, we will not go with the modify type option. > - -Automatically resolve that:- > -1. *{{ObjectArrayTypeInfo}}* to be the result type.- > -2. *{{MapTypeInfo}}* to be the > parameter type.- > - Improved function mapping to find and locate function with such signatures > > [Update] > This ticket should only cover Map and Row type, and does not cover Array, > since Array is actually resolved by eval method signature correctly. > This ticket should consolidate some discrepancy between how TableFunction, > AggregateFunction and ScalarFunction resolves types. which at this moment > goes through different code path. > The rest of the optimization should go to follow up tickets in FLINK-9484 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9294: - Description: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public String[] eval(Map mapArg) { /* ... */ } {code} The function catalog search should do either of the following: [Update] since we have backward compatibility issue with resolving to a different type, we will not go with the modify type option. - -Automatically resolve that:- -1. *{{ObjectArrayTypeInfo}}* to be the result type.- -2. *{{MapTypeInfo}}* to be the parameter type.- - Improved function mapping to find and locate function with such signatures [Update] This ticket should only cover Map and Row type, It does not cover * ObjectArrayType, since Array is actually resolved by eval method signature correctly. * Pojo types, Pojo will be addressed separately. This ticket should consolidate some discrepancy between how TableFunction, AggregateFunction and ScalarFunction resolves types. which at this moment goes through different code path. The rest of the optimization should go to follow up tickets in FLINK-9484 was: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public String[] eval(Map mapArg) { /* ... */ } {code} The function catalog search should do either of the following: [Update] since we have backward compatibility issue with resolving to a different type, we will not go with the modify type option. - -Automatically resolve that:- -1. *{{ObjectArrayTypeInfo}}* to be the result type.- -2. *{{MapTypeInfo}}* to be the parameter type.- - Improved function mapping to find and locate function with such signatures [Update] This ticket should only cover Map and Row type, and does not cover Array, since Array is actually resolved by eval method signature correctly. This ticket should consolidate some discrepancy between how TableFunction, AggregateFunction and ScalarFunction resolves types. which at this moment goes through different code path. The rest of the optimization should go to follow up tickets in FLINK-9484 > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > *{{MAP}}*, *{{ARRAY}}*, etc would require user to override > *{{getParameterType}}* or *{{getResultType}}* method explicitly. > It should be able to resolve the composite type based on the function > signature, such as: > {code:java} > public String[] eval(Map mapArg) { /* ... */ } > {code} > The function catalog search should do either of the following: > [Update] > since we have backward compatibility issue with resolving to a different > type, we will not go with the modify type option. > - -Automatically resolve that:- > -1. *{{ObjectArrayTypeInfo}}* to be the result type.- > -2. *{{MapTypeInfo}}* to be the > parameter type.- > - Improved function mapping to find and locate function with such signatures > > [Update] > This ticket should only cover Map and Row type, It does not cover > * ObjectArrayType, since Array is actually resolved by eval method signature > correctly. > * Pojo types, Pojo will be addressed separately. > This ticket should consolidate some discrepancy between how TableFunction, > AggregateFunction and ScalarFunction resolves types. which at this moment > goes through different code path. > The rest of the optimization should go to follow up tickets in FLINK-9484 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9134) Update Calcite dependency to 1.17
[ https://issues.apache.org/jira/browse/FLINK-9134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551884#comment-16551884 ] Rong Rong commented on FLINK-9134: -- Calcite 1.17 has been [released|https://mail-archives.apache.org/mod_mbox/calcite-dev/201807.mbox/%3CCAHFToO04LF6opte0%3DNYvb3q9145jmxq%2BFHAdv7mBUv844W66xg%40mail.gmail.com%3E]. > Update Calcite dependency to 1.17 > - > > Key: FLINK-9134 > URL: https://issues.apache.org/jira/browse/FLINK-9134 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Shuyi Chen >Priority: Major > > This is an umbrella issue for tasks that need to be performed when upgrading > to Calcite 1.17 once it is released. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-9294: - Description: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public String[] eval(Map mapArg) { /* ... */ } {code} The function catalog search should do either of the following: [Update] since we have backward compatibility issue with resolving to a different type, we will not go with the modify type option. - -Automatically resolve that:- -1. *{{ObjectArrayTypeInfo}}* to be the result type.- -2. *{{MapTypeInfo}}* to be the parameter type.- - Improved function mapping to find and locate function with such signatures [Update] This ticket should only cover *Map* and *Row* type, It does not cover * ObjectArrayType, since Array is actually resolved by eval method signature correctly. * Pojo types, Pojo will be addressed separately. This ticket should consolidate some discrepancy between how TableFunction, AggregateFunction and ScalarFunction resolves types. which at this moment goes through different code path. The rest of the optimization should go to follow up tickets in FLINK-9484 was: Most of the UDF function signatures that includes composite types such as *{{MAP}}*, *{{ARRAY}}*, etc would require user to override *{{getParameterType}}* or *{{getResultType}}* method explicitly. It should be able to resolve the composite type based on the function signature, such as: {code:java} public String[] eval(Map mapArg) { /* ... */ } {code} The function catalog search should do either of the following: [Update] since we have backward compatibility issue with resolving to a different type, we will not go with the modify type option. - -Automatically resolve that:- -1. *{{ObjectArrayTypeInfo}}* to be the result type.- -2. *{{MapTypeInfo}}* to be the parameter type.- - Improved function mapping to find and locate function with such signatures [Update] This ticket should only cover Map and Row type, It does not cover * ObjectArrayType, since Array is actually resolved by eval method signature correctly. * Pojo types, Pojo will be addressed separately. This ticket should consolidate some discrepancy between how TableFunction, AggregateFunction and ScalarFunction resolves types. which at this moment goes through different code path. The rest of the optimization should go to follow up tickets in FLINK-9484 > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > *{{MAP}}*, *{{ARRAY}}*, etc would require user to override > *{{getParameterType}}* or *{{getResultType}}* method explicitly. > It should be able to resolve the composite type based on the function > signature, such as: > {code:java} > public String[] eval(Map mapArg) { /* ... */ } > {code} > The function catalog search should do either of the following: > [Update] > since we have backward compatibility issue with resolving to a different > type, we will not go with the modify type option. > - -Automatically resolve that:- > -1. *{{ObjectArrayTypeInfo}}* to be the result type.- > -2. *{{MapTypeInfo}}* to be the > parameter type.- > - Improved function mapping to find and locate function with such signatures > > [Update] > This ticket should only cover *Map* and *Row* type, It does not cover > * ObjectArrayType, since Array is actually resolved by eval method signature > correctly. > * Pojo types, Pojo will be addressed separately. > This ticket should consolidate some discrepancy between how TableFunction, > AggregateFunction and ScalarFunction resolves types. which at this moment > goes through different code path. > The rest of the optimization should go to follow up tickets in FLINK-9484 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8691) Update table API to support distinct operator on data stream
[ https://issues.apache.org/jira/browse/FLINK-8691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-8691: - Issue Type: Improvement (was: Sub-task) Parent: (was: FLINK-8688) > Update table API to support distinct operator on data stream > > > Key: FLINK-8691 > URL: https://issues.apache.org/jira/browse/FLINK-8691 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Rong Rong >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-5315: Assignee: Rong Rong > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > > Such as > {code} > t.select("count(distinct a), sum(b)") > {code} > or > {code} > t.select('a.count.distinct), 'b.sum) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-5315: - Issue Type: Sub-task (was: New Feature) Parent: FLINK-8688 > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Priority: Major > > Such as > {code} > t.select("count(distinct a), sum(b)") > {code} > or > {code} > t.select('a.count.distinct), 'b.sum) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554326#comment-16554326 ] Rong Rong commented on FLINK-5315: -- Since FLINK-8689 and FLINK-8690 has been checked in, I think we can enable this. I am in favor of this syntax: {code:java} t.select("count(distinct a), sum(b)") {code} Since doing a *.distinct* after a *.count* can be confused with getting all the distinct count aggregate results. > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > > Such as > {code} > t.select("count(distinct a), sum(b)") > {code} > or > {code} > t.select('a.count.distinct), 'b.sum) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type
[ https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554866#comment-16554866 ] Rong Rong commented on FLINK-9294: -- Had an initial diff for discussion on what needs to be done here: [https://github.com/walterddr/flink/compare/f60aa6e72ca860bb30730472c0c7db8fef500e5c...b8b9c62c2f0226517aee661fb3d8332bdf7dedb8] Seems like with some recent relaxation in the UDF lookup, all of the above mentioned problems are fixed and can be used correctly. The only remaining problem is: Too much relaxation causes incorrect runtime exception. I can go ahead and close this task by commit the test changes and continue the work in FLINK-9501. Any thoughts and concerns? > Improve type inference for UDFs with composite parameter or result type > > > Key: FLINK-9294 > URL: https://issues.apache.org/jira/browse/FLINK-9294 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Most of the UDF function signatures that includes composite types such as > *{{MAP}}*, *{{ARRAY}}*, etc would require user to override > *{{getParameterType}}* or *{{getResultType}}* method explicitly. > It should be able to resolve the composite type based on the function > signature, such as: > {code:java} > public String[] eval(Map mapArg) { /* ... */ } > {code} > The function catalog search should do either of the following: > [Update] > since we have backward compatibility issue with resolving to a different > type, we will not go with the modify type option. > - -Automatically resolve that:- > -1. *{{ObjectArrayTypeInfo}}* to be the result type.- > -2. *{{MapTypeInfo}}* to be the > parameter type.- > - Improved function mapping to find and locate function with such signatures > > [Update] > This ticket should only cover *Map* and *Row* type, It does not cover > * ObjectArrayType, since Array is actually resolved by eval method signature > correctly. > * Pojo types, Pojo will be addressed separately. > This ticket should consolidate some discrepancy between how TableFunction, > AggregateFunction and ScalarFunction resolves types. which at this moment > goes through different code path. > The rest of the optimization should go to follow up tickets in FLINK-9484 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555898#comment-16555898 ] Rong Rong edited comment on FLINK-5315 at 7/25/18 4:16 PM: --- Oops. sorry that was what I meant [~twalthr] to use {{'a.distinct.count}} for expressions such as count, sum etc. My point was doing {{'a.count.distinct}} doesn't make sense. [~fhueske] [~hequn8128] Regrading UDAGG. I am actually more favoring the syntax of {{udagg('a.distinct)}}. To my own understanding {{*.distinct}} seems more of an expression towards how the operand is suppose to be treated. {{myUdagg.distinct}} seems confusing to me since we are not applying the distinct towards the aggregate function. Ideally something like {{'a.distinct.myUdagg}} might be better if it were supported. was (Author: walterddr): Oops. sorry that was what I meant [~twalthr] to use {{'a.distinct.count}} for expressions such as count, sum etc. My point was doing {{'a.count.distinct}} doesn't make sense. [~fhueske] [~hequn8128] Regrading UDAGG. I am actually more favoring the syntax of {{udagg('a.distinct)}}. To my own understanding {{*.distinct}} seems more of an expression towards how the operand is suppose to be treated. {{myUdagg.distinct}} seems confusing to me since we are not applying the distinct towards the aggregate function. something like {{'a.distinct.myUdagg}} might be better if it can be supported. > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > > Such as > {code} > t.select("count(distinct a), sum(b)") > {code} > or > {code} > t.select('a.count.distinct), 'b.sum) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555898#comment-16555898 ] Rong Rong commented on FLINK-5315: -- Oops. sorry that was what I meant [~twalthr] to use {{'a.distinct.count}} for expressions such as count, sum etc. My point was doing {{'a.count.distinct}} doesn't make sense. [~fhueske] [~hequn8128] Regrading UDAGG. I am actually more favoring the syntax of {{udagg('a.distinct)}}. To my own understanding {{*.distinct}} seems more of an expression towards how the operand is suppose to be treated. {{myUdagg.distinct}} seems confusing to me since we are not applying the distinct towards the aggregate function. something like {{'a.distinct.myUdagg}} might be better if it can be supported. > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > > Such as > {code} > t.select("count(distinct a), sum(b)") > {code} > or > {code} > t.select('a.count.distinct), 'b.sum) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16559889#comment-16559889 ] Rong Rong commented on FLINK-5315: -- Yes [~hequn8128]. Currently I am transferring the distinct modifier for the parameters towards the aggregate function. However, I think {{a.count.distinct}} is confusing, since {{a.count}} returns a valid expression, and {{.distinct}} is as well a valid expression, but chaining 2 aggregates together is not a valid expression though. So it is not immediately clear to end users what this means. But the point seems to be interesting when considering multiple parameters like [~fhueske] mentioned: {{multiParamAgg('a.distinct, 'b.distinct)}} seems to duplicate the unnecessary {{distinct}} twice which can be a burden to user to specify multiple times. So on the UDAGG side, I think the proposal to use `udagg.distinct('a, 'b)` seems to be a good idea. Thoughts? > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > > Such as > {code} > t.select("count(distinct a), sum(b)") > {code} > or > {code} > t.select('a.count.distinct), 'b.sum) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16559889#comment-16559889 ] Rong Rong edited comment on FLINK-5315 at 7/27/18 5:05 PM: --- Yes [~hequn8128]. Currently I am transferring the distinct modifier for the parameters towards the aggregate function. However, I think {{a.count.distinct}} is confusing, since {{a.count}} returns a valid expression, and {{.distinct}} is as well a valid expression, but chaining 2 aggregates together is not a valid expression though. So it is not immediately clear to end users what this means. But the point seems to be interesting when considering multiple parameters like [~fhueske] mentioned: {{multiParamAgg('a.distinct, 'b.distinct)}} seems to duplicate the unnecessary {{distinct}} twice which can be a burden to user to specify multiple times. So on the UDAGG side, I think the proposal to use {{udagg.distinct('a, 'b)}} seems to be a good idea. Thoughts? was (Author: walterddr): Yes [~hequn8128]. Currently I am transferring the distinct modifier for the parameters towards the aggregate function. However, I think {{a.count.distinct}} is confusing, since {{a.count}} returns a valid expression, and {{.distinct}} is as well a valid expression, but chaining 2 aggregates together is not a valid expression though. So it is not immediately clear to end users what this means. But the point seems to be interesting when considering multiple parameters like [~fhueske] mentioned: {{multiParamAgg('a.distinct, 'b.distinct)}} seems to duplicate the unnecessary {{distinct}} twice which can be a burden to user to specify multiple times. So on the UDAGG side, I think the proposal to use `udagg.distinct('a, 'b)` seems to be a good idea. Thoughts? > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > > Such as > {code} > t.select("count(distinct a), sum(b)") > {code} > or > {code} > t.select('a.count.distinct), 'b.sum) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16560803#comment-16560803 ] Rong Rong commented on FLINK-5315: -- Hmm. Yes that's true we cannot have {{a.distinct}} by itself if {{a}} is column expression. But if {{a}} is a table, for example: {{select(*).distinct}}, then this is very much valid. Yes but I agree, the best way to express this would've been {{ agg.distinct('arg1[, 'arg2]...) }} regardless whether this is an expression or function since it is a modifier for the method. I think we can definitely do that for function. I am ok with the {{'col.agg.distinct}} approach actually despite a little confusion. what do you guys think [~fhueske][~twalthr]? > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > > Such as > {code} > t.select("count(distinct a), sum(b)") > {code} > or > {code} > t.select('a.count.distinct), 'b.sum) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16560803#comment-16560803 ] Rong Rong edited comment on FLINK-5315 at 7/28/18 9:38 PM: --- Hmm. Yes that's true we cannot have {{a.distinct}} by itself if {{a}} is column expression. But if {{a}} is a table, for example: {{select ( * ).distinct}}, then this is very much valid. (oops, (*) is not correct) Yes but I agree, the best way to express this would've been {{ agg.distinct('arg1[, 'arg2]...) }} regardless whether this is an expression or function since it is a modifier for the method. I think we can definitely do that for function. I am ok with the {{'col.agg.distinct}} approach actually despite a little confusion. what do you guys think [~fhueske][~twalthr]? was (Author: walterddr): Hmm. Yes that's true we cannot have {{a.distinct}} by itself if {{a}} is column expression. But if {{a}} is a table, for example: {{select(*).distinct}}, then this is very much valid. Yes but I agree, the best way to express this would've been {{ agg.distinct('arg1[, 'arg2]...) }} regardless whether this is an expression or function since it is a modifier for the method. I think we can definitely do that for function. I am ok with the {{'col.agg.distinct}} approach actually despite a little confusion. what do you guys think [~fhueske][~twalthr]? > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > > Such as > {code} > t.select("count(distinct a), sum(b)") > {code} > or > {code} > t.select('a.count.distinct), 'b.sum) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16563766#comment-16563766 ] Rong Rong commented on FLINK-5315: -- Actually I agree with [~hequn8128] that these are two kinds of distincts. However I think the point [~fhueske] made here is that having {{`b.count.distinct}} by itself, it is not clear which kind of distinct it is referring to (syntactically). However, I think [~hequn8128] made one very good point. If I understand correctly, {{distinct}} operation has to operate on a whole table and it doesn't make sense to operator on a single column. e.g. {{table.select('a.distinct, 'b)}} will never be valid. I even think the actual correct representation of distinct in this situation would be {{select.distinct('a, 'b)}}. What do you guys think? > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > > Such as > {code} > t.select("count(distinct a), sum(b)") > {code} > or > {code} > t.select('a.count.distinct), 'b.sum) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10010) Deprecate unused BaseAlignedWindowAssigner related components
Rong Rong created FLINK-10010: - Summary: Deprecate unused BaseAlignedWindowAssigner related components Key: FLINK-10010 URL: https://issues.apache.org/jira/browse/FLINK-10010 Project: Flink Issue Type: Bug Components: DataStream API Reporter: Rong Rong Assignee: Rong Rong {{BaseAlignedWindowAssigner}} should be marked as deprecated and {{SlidingAlignedProcessingTimeWindows}} should be removed from the Flink Repo. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10019) Fix Composite getResultType of UDF cannot be chained with other operators
Rong Rong created FLINK-10019: - Summary: Fix Composite getResultType of UDF cannot be chained with other operators Key: FLINK-10019 URL: https://issues.apache.org/jira/browse/FLINK-10019 Project: Flink Issue Type: Sub-task Reporter: Rong Rong Assignee: Rong Rong If explicitly return a CompositeType in {{udf.getResultType}}, will result in some failures in chained operators. For example: consider a simple UDF, {code:scala} object Func extends ScalarFunction { def eval(row: Row): Row = { row } override def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = Array(Types.ROW(Types.INT)) override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.ROW(Types.INT) } {code} This should work perfectly since it's just a simple pass through, however {code:scala} @Test def testRowType(): Unit = { val data = List( Row.of(Row.of(12.asInstanceOf[Integer]), "1") ) val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.fromCollection(data)(Types.ROW(Types.ROW(Types.INT), Types.STRING)) val tEnv = TableEnvironment.getTableEnvironment(env) val table = stream.toTable(tEnv, 'a, 'b) tEnv.registerFunction("func", Func20) tEnv.registerTable("t", table) // This works perfectly val result1 = tEnv.sqlQuery("SELECT func(a) FROM t").toAppendStream[Row] result1.addSink(new StreamITCase.StringSink[Row]) // This throws exception val result2 = tEnv.sqlQuery("SELECT func(a) as myRow FROM t").toAppendStream[Row] result2.addSink(new StreamITCase.StringSink[Row]) env.execute() } {code} Exception code: {code:java} java.lang.IndexOutOfBoundsException: index (1) must be less than size (1) at com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:310) at com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:293) at com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:41) at org.apache.calcite.sql.type.InferTypes$2.inferOperandTypes(InferTypes.java:83) at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1777) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:459) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandStar(SqlValidatorImpl.java:349) ... {code} This is due to the fact that Calcite inferOperandTypes does not expect to infer a struct RelDataType. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10019) Fix Composite getResultType of UDF cannot be chained with other operators
[ https://issues.apache.org/jira/browse/FLINK-10019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-10019: -- Description: If explicitly return a CompositeType in {{udf.getResultType}}, will result in some failures in chained operators. For example: consider a simple UDF, {code:scala} object Func extends ScalarFunction { def eval(row: Row): Row = { row } override def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = Array(Types.ROW(Types.INT)) override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.ROW(Types.INT) } {code} This should work perfectly since it's just a simple pass through, however {code:scala} @Test def testRowType(): Unit = { val data = List( Row.of(Row.of(12.asInstanceOf[Integer]), "1") ) val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.fromCollection(data)(Types.ROW(Types.ROW(Types.INT), Types.STRING)) val tEnv = TableEnvironment.getTableEnvironment(env) val table = stream.toTable(tEnv, 'a, 'b) tEnv.registerFunction("func", Func) tEnv.registerTable("t", table) // This works perfectly val result1 = tEnv.sqlQuery("SELECT func(a) FROM t").toAppendStream[Row] result1.addSink(new StreamITCase.StringSink[Row]) // This throws exception val result2 = tEnv.sqlQuery("SELECT func(a) as myRow FROM t").toAppendStream[Row] result2.addSink(new StreamITCase.StringSink[Row]) env.execute() } {code} Exception code: {code:java} java.lang.IndexOutOfBoundsException: index (1) must be less than size (1) at com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:310) at com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:293) at com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:41) at org.apache.calcite.sql.type.InferTypes$2.inferOperandTypes(InferTypes.java:83) at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1777) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:459) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandStar(SqlValidatorImpl.java:349) ... {code} This is due to the fact that Calcite inferOperandTypes does not expect to infer a struct RelDataType. was: If explicitly return a CompositeType in {{udf.getResultType}}, will result in some failures in chained operators. For example: consider a simple UDF, {code:scala} object Func extends ScalarFunction { def eval(row: Row): Row = { row } override def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = Array(Types.ROW(Types.INT)) override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.ROW(Types.INT) } {code} This should work perfectly since it's just a simple pass through, however {code:scala} @Test def testRowType(): Unit = { val data = List( Row.of(Row.of(12.asInstanceOf[Integer]), "1") ) val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.fromCollection(data)(Types.ROW(Types.ROW(Types.INT), Types.STRING)) val tEnv = TableEnvironment.getTableEnvironment(env) val table = stream.toTable(tEnv, 'a, 'b) tEnv.registerFunction("func", Func20) tEnv.registerTable("t", table) // This works perfectly val result1 = tEnv.sqlQuery("SELECT func(a) FROM t").toAppendStream[Row] result1.addSink(new StreamITCase.StringSink[Row]) // This throws exception val result2 = tEnv.sqlQuery("SELECT func(a) as myRow FROM t").toAppendStream[Row] result2.addSink(new StreamITCase.StringSink[Row]) env.execute() } {code} Exception code: {code:java} java.lang.IndexOutOfBoundsException: index (1) must be less than size (1) at com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:310) at com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:293) at com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:41) at org.apache.calcite.sql.type.InferTypes$2.inferOperandTypes(InferTypes.java:83) at org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1777) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:459) at org.apache.calcite.sql.validate.SqlValidatorImpl.expandStar(SqlValidatorImpl.java:349) ... {code} This is due to the fact that Calcite inferOperandTypes does not expect to infer a struct RelDataType. > Fix Composite getResultType of UDF cannot be chained with other operators > - > > Key: FLINK-10019 > URL: ht
[jira] [Updated] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-5315: - Description: Support distinct aggregations in Table API in the following format: For Expressions: {code:java} 'a.count.distinct // Expressions distinct modifier {code} For User-defined Function: {code:java} singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier {code} was: Such as {code} t.select("count(distinct a), sum(b)") {code} or {code} t.select('a.count.distinct), 'b.sum) {code} > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > > Support distinct aggregations in Table API in the following format: > For Expressions: > {code:java} > 'a.count.distinct // Expressions distinct modifier > {code} > For User-defined Function: > {code:java} > singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier > multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong updated FLINK-5315: - Description: Support distinct aggregations in Table API in the following format: For Expressions: {code:scala} 'a.count.distinct // Expressions distinct modifier {code} For User-defined Function: {code:scala} singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier {code} was: Support distinct aggregations in Table API in the following format: For Expressions: {code:java} 'a.count.distinct // Expressions distinct modifier {code} For User-defined Function: {code:java} singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier {code} > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > > Support distinct aggregations in Table API in the following format: > For Expressions: > {code:scala} > 'a.count.distinct // Expressions distinct modifier > {code} > For User-defined Function: > {code:scala} > singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier > multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)