[jira] [Created] (FLINK-5168) Scaladoc annotation link use [[]] instead of {@link}
shijinkui created FLINK-5168: Summary: Scaladoc annotation link use [[]] instead of {@link} Key: FLINK-5168 URL: https://issues.apache.org/jira/browse/FLINK-5168 Project: Flink Issue Type: Improvement Components: Scala API Reporter: shijinkui {@link StreamExecutionEnvironment#readFile(FileInputFormat, * String, FileProcessingMode, long)} ==> [[StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long)]] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2875: [FLINK-5168] Scaladoc annotation link use [[]] ins...
GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/2875 [FLINK-5168] Scaladoc annotation link use [[]] instead of {@link} `{@link StreamExecutionEnvironment#readFile(FileInputFormat, * String, FileProcessingMode, long)}` ==> `[StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long)]` - [X] General - The pull request references the related JIRA issue ("[FLINK-5168] Scaladoc annotation link use [[]] instead of {@link}") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/flink FLINK-5168 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2875.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2875 commit c5deec42aa40efd788fa961d2979447f606a3cab Author: shijinkui Date: 2016-11-26T08:14:13Z [FLINK-5168] Scaladoc annotation link use [[]] instead of {@link} --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5168) Scaladoc annotation link use [[]] instead of {@link}
[ https://issues.apache.org/jira/browse/FLINK-5168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697531#comment-15697531 ] ASF GitHub Bot commented on FLINK-5168: --- GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/2875 [FLINK-5168] Scaladoc annotation link use [[]] instead of {@link} `{@link StreamExecutionEnvironment#readFile(FileInputFormat, * String, FileProcessingMode, long)}` ==> `[StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long)]` - [X] General - The pull request references the related JIRA issue ("[FLINK-5168] Scaladoc annotation link use [[]] instead of {@link}") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/flink FLINK-5168 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2875.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2875 commit c5deec42aa40efd788fa961d2979447f606a3cab Author: shijinkui Date: 2016-11-26T08:14:13Z [FLINK-5168] Scaladoc annotation link use [[]] instead of {@link} > Scaladoc annotation link use [[]] instead of {@link} > > > Key: FLINK-5168 > URL: https://issues.apache.org/jira/browse/FLINK-5168 > Project: Flink > Issue Type: Improvement > Components: Scala API >Reporter: shijinkui > > {@link StreamExecutionEnvironment#readFile(FileInputFormat, > * String, FileProcessingMode, long)} > ==> > [[StreamExecutionEnvironment#readFile(FileInputFormat, String, > FileProcessingMode, long)]] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5166) TextInputFormatTest.testNestedFileRead
[ https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-5166: - Description: mvn clean package -P \\!scala-2.11,scala-2.11 -U Failed tests: TextInputFormatTest.testNestedFileRead:140 Test erroneous Tests run: 846, Failures: 1, Errors: 0, Skipped: 0 was: mvn clean package -P \!scala-2.11,scala-2.11 -U Failed tests: TextInputFormatTest.testNestedFileRead:140 Test erroneous Tests run: 846, Failures: 1, Errors: 0, Skipped: 0 > TextInputFormatTest.testNestedFileRead > -- > > Key: FLINK-5166 > URL: https://issues.apache.org/jira/browse/FLINK-5166 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: shijinkui > > mvn clean package -P \\!scala-2.11,scala-2.11 -U > Failed tests: > TextInputFormatTest.testNestedFileRead:140 Test erroneous > Tests run: 846, Failures: 1, Errors: 0, Skipped: 0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5166) TextInputFormatTest.testNestedFileRead
[ https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-5166: - Description: `mvn clean package -P \!scala-2.11,scala-2.11 -U` Failed tests: TextInputFormatTest.testNestedFileRead:140 Test erroneous Tests run: 846, Failures: 1, Errors: 0, Skipped: 0 was: mvn clean package -P \\!scala-2.11,scala-2.11 -U Failed tests: TextInputFormatTest.testNestedFileRead:140 Test erroneous Tests run: 846, Failures: 1, Errors: 0, Skipped: 0 > TextInputFormatTest.testNestedFileRead > -- > > Key: FLINK-5166 > URL: https://issues.apache.org/jira/browse/FLINK-5166 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: shijinkui > > `mvn clean package -P \!scala-2.11,scala-2.11 -U` > Failed tests: > TextInputFormatTest.testNestedFileRead:140 Test erroneous > Tests run: 846, Failures: 1, Errors: 0, Skipped: 0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2874: [FLINK-5167] StreamExecutionEnvironment set function retu...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2874 The PR changes several `@Public` and `@PublicEvolving` methods. We do not touch @Public interfaces for 1.x releases and also try to avoid to modify `@PublicEvolving` interface except for very good reasons. We have a Maven plugin that prevents changes that break the binary compatibility of methods and classes annotated with `@Public`. By building the code before opening a pull request, you can early detect such issues. I don't think that we can and should merge this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5167) StreamExecutionEnvironment's set function return `this` instead of void
[ https://issues.apache.org/jira/browse/FLINK-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697575#comment-15697575 ] ASF GitHub Bot commented on FLINK-5167: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2874 The PR changes several `@Public` and `@PublicEvolving` methods. We do not touch @Public interfaces for 1.x releases and also try to avoid to modify `@PublicEvolving` interface except for very good reasons. We have a Maven plugin that prevents changes that break the binary compatibility of methods and classes annotated with `@Public`. By building the code before opening a pull request, you can early detect such issues. I don't think that we can and should merge this PR. > StreamExecutionEnvironment's set function return `this` instead of void > --- > > Key: FLINK-5167 > URL: https://issues.apache.org/jira/browse/FLINK-5167 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: shijinkui > > for example : > public void setNumberOfExecutionRetries(int numberOfExecutionRetries) { > config.setNumberOfExecutionRetries(numberOfExecutionRetries); > } > change to: > public StreamExecutionEnvironment setNumberOfExecutionRetries(int > numberOfExecutionRetries) { > config.setNumberOfExecutionRetries(numberOfExecutionRetries); > return this; > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5166) TextInputFormatTest.testNestedFileRead
[ https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697607#comment-15697607 ] Chesnay Schepler commented on FLINK-5166: - can you include the exception that you got? > TextInputFormatTest.testNestedFileRead > -- > > Key: FLINK-5166 > URL: https://issues.apache.org/jira/browse/FLINK-5166 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats, Tests >Reporter: shijinkui > > `mvn clean package -P \!scala-2.11,scala-2.11 -U` > Failed tests: > TextInputFormatTest.testNestedFileRead:140 Test erroneous > Tests run: 846, Failures: 1, Errors: 0, Skipped: 0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-5166) TextInputFormatTest.testNestedFileRead
[ https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-5166: Component/s: (was: Build System) Tests Batch Connectors and Input/Output Formats > TextInputFormatTest.testNestedFileRead > -- > > Key: FLINK-5166 > URL: https://issues.apache.org/jira/browse/FLINK-5166 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats, Tests >Reporter: shijinkui > > `mvn clean package -P \!scala-2.11,scala-2.11 -U` > Failed tests: > TextInputFormatTest.testNestedFileRead:140 Test erroneous > Tests run: 846, Failures: 1, Errors: 0, Skipped: 0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2874: [FLINK-5167] StreamExecutionEnvironment set function retu...
Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/2874 > The PR changes several @Public and @PublicEvolving methods. hi, @fhueske Today when set some config, find some function cant't chained one by one. So want to complete all the function needed `return this` conveniently. If chained setting config, the setting will be together, avoid setting config evrywhere. It's only better used, not important. You can close this. Failed building is noisy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5167) StreamExecutionEnvironment's set function return `this` instead of void
[ https://issues.apache.org/jira/browse/FLINK-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697631#comment-15697631 ] ASF GitHub Bot commented on FLINK-5167: --- Github user shijinkui commented on the issue: https://github.com/apache/flink/pull/2874 > The PR changes several @Public and @PublicEvolving methods. hi, @fhueske Today when set some config, find some function cant't chained one by one. So want to complete all the function needed `return this` conveniently. If chained setting config, the setting will be together, avoid setting config evrywhere. It's only better used, not important. You can close this. Failed building is noisy. > StreamExecutionEnvironment's set function return `this` instead of void > --- > > Key: FLINK-5167 > URL: https://issues.apache.org/jira/browse/FLINK-5167 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: shijinkui > > for example : > public void setNumberOfExecutionRetries(int numberOfExecutionRetries) { > config.setNumberOfExecutionRetries(numberOfExecutionRetries); > } > change to: > public StreamExecutionEnvironment setNumberOfExecutionRetries(int > numberOfExecutionRetries) { > config.setNumberOfExecutionRetries(numberOfExecutionRetries); > return this; > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5166) TextInputFormatTest.testNestedFileRead
[ https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697644#comment-15697644 ] shijinkui commented on FLINK-5166: -- Debug, found that: there four tmp file in the `expectedFiles` list, as there are two file in `paths`, so when execute `assertTrue(expectedFiles.get(i).equals(paths.get(i)));`, get null by the index. tmp file such as: flink/flink-java/tmp/first/TextInputFormatTest3439492861909451525.tmp It's strangely when i delete the tmp files, then run the test, it's ok. I think we can clear the directory before test. I add two line code in the PR. > TextInputFormatTest.testNestedFileRead > -- > > Key: FLINK-5166 > URL: https://issues.apache.org/jira/browse/FLINK-5166 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats, Tests >Reporter: shijinkui > > `mvn clean package -P \!scala-2.11,scala-2.11 -U` > Failed tests: > TextInputFormatTest.testNestedFileRead:140 Test erroneous > Tests run: 846, Failures: 1, Errors: 0, Skipped: 0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2876: [FLINK-5166] TextInputFormatTest.testNestedFileRea...
GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/2876 [FLINK-5166] TextInputFormatTest.testNestedFileRead - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/flink FLINK-5166-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2876.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2876 commit 085b0fb94e20877c2eec3d2facd593045e26fb1b Author: shijinkui Date: 2016-11-26T09:42:17Z [FLINK-5166] TextInputFormatTest.testNestedFileRead --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5166) TextInputFormatTest.testNestedFileRead
[ https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697655#comment-15697655 ] ASF GitHub Bot commented on FLINK-5166: --- GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/2876 [FLINK-5166] TextInputFormatTest.testNestedFileRead - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/flink FLINK-5166-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2876.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2876 commit 085b0fb94e20877c2eec3d2facd593045e26fb1b Author: shijinkui Date: 2016-11-26T09:42:17Z [FLINK-5166] TextInputFormatTest.testNestedFileRead > TextInputFormatTest.testNestedFileRead > -- > > Key: FLINK-5166 > URL: https://issues.apache.org/jira/browse/FLINK-5166 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats, Tests >Reporter: shijinkui > > `mvn clean package -P \!scala-2.11,scala-2.11 -U` > Failed tests: > TextInputFormatTest.testNestedFileRead:140 Test erroneous > Tests run: 846, Failures: 1, Errors: 0, Skipped: 0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89246096 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} +import org.apache.flink.streaming.api.datastream.DataStream + +import scala.collection.JavaConverters._ + +class StreamProjectableTableSourceScan( +cluster: RelOptCluster, +traitSet: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType, +table: RelOptTable) + extends StreamScan(cluster, traitSet, table, projectionRowType) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new StreamProjectableTableSourceScan(cluster, + traitSet, + input, + projects, + projectionRowType, + getTable) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { --- End diff -- We would like not to override `computeSelfCost` for DataStreamRel. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671736 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala --- @@ -131,6 +101,33 @@ class TableSourceITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testCsvTableSourceWithProjection(): Unit = { + +val csvTable = CommonTestData.getCsvTableSource + +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +tEnv.registerTableSource("csvTable", csvTable) + +val results = tEnv + .scan("csvTable") + .select('last, 'id, 'score) + .sortPartition(0, Order.ASCENDING) --- End diff -- Do we have to `sortPartition` here? It seems that it is needless. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89670471 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, StreamProjectableTableSourceScan} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} + +class StreamProjectableTableSourceScanRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { + + override def matches(call: RelOptRuleCall): Boolean = { +// check that table scan supports projections +val project = call.rel(0).asInstanceOf[LogicalProject] +val original = project.getInput.asInstanceOf[RelSubset].getOriginal + +original match { + case scan: TableScan => +val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable]) +dataSetTable match { + case tst: TableSourceTable => +tst.tableSource match { + case s: StreamTableSource[_] => +s.isInstanceOf[ProjectableTableSource[_]] + case _ => +false +} + case _ => +false +} + case _ => +false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val project = call.rel(0).asInstanceOf[LogicalProject] +val scan: TableScan = call.rel(1).asInstanceOf[TableScan] + +val convInput = RelOptRule.convert(project.getInput, DataStreamConvention.INSTANCE) +val traitSet: RelTraitSet = project.getTraitSet.replace(DataStreamConvention.INSTANCE) + +val newRel = new StreamProjectableTableSourceScan( + scan.getCluster, + traitSet, + convInput, + project.getProjects, + project.getRowType, + scan.getTable +) +call.transformTo(newRel) + } + +} + +object StreamProjectableTableSourceScanRule { + val INSTANCE: RelOptRule = new StreamProjectableTableSourceScanRule( +operand(classOf[LogicalProject], operand(classOf[TableScan], none())), +"StreamTableSourceProjectRule") --- End diff -- Please rename the description the same as the class name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697950#comment-15697950 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89670435 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, DataSetConvention} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + +class BatchProjectableTableSourceScanRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { + + override def matches(call: RelOptRuleCall): Boolean = { +// check that table scan supports projections +val project: LogicalProject = call.rel(0).asInstanceOf[LogicalProject] +val original: RelNode = project.getInput.asInstanceOf[RelSubset].getOriginal + +original match { + case scan: TableScan => +val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable]) +dataSetTable match { + case tst: TableSourceTable => +tst.tableSource match { + case s: BatchTableSource[_] => +s.isInstanceOf[ProjectableTableSource[_]] + case _ => +false +} + case _ => +false +} + case _ => +false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val project = call.rel(0).asInstanceOf[LogicalProject] +val scan: TableScan = call.rel(1).asInstanceOf[TableScan] + +val convInput = RelOptRule.convert(scan, DataSetConvention.INSTANCE) +val traitSet: RelTraitSet = project.getTraitSet.replace(DataSetConvention.INSTANCE) + +val newRel = new BatchProjectableTableSourceScan( + scan.getCluster, + traitSet, + convInput, + project.getProjects, + project.getRowType, + scan.getTable +) +call.transformTo(newRel) + } +} + +object BatchProjectableTableSourceScanRule { + val INSTANCE = new BatchProjectableTableSourceScanRule( +operand(classOf[LogicalProject], operand(classOf[TableScan], none())), +"BatchTableSourceProjectRule") --- End diff -- Please rename the description the same as the class name. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model >
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671128 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala --- @@ -102,11 +107,43 @@ class CsvTableSource( * Do not use it in Table API programs. */ override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = { -streamExecEnv.createInput(createCsvInput(), returnType) +streamExecEnv.createInput(createCsvInput(), projectionReturnTypeWithOrder.getOrElse(returnType)) + } + + /** Returns a [[TableSource]] with ability to project fields */ + override def setProjection(fields: Array[Int]): CsvTableSource = { +val mask = new Array[Boolean](fieldNames.length) +fields.foreach(mask(_) = true) +val orderedReturnType = fields.map(fieldTypes(_)) +val indexDiff = fieldNames.length - fields.length +val order = fields.map(f => f - indexDiff).zipWithIndex.sortBy(_._1).map(_._2) +val source = copy --- End diff -- I would like use the `newFieldNames` and `newFieldTypes` to create a new `CsvTableSource` instead of a same copy, so that we do not need `orderedReturnType`. What do you think ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671556 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchProjectableTableSourceScan.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + +import scala.collection.JavaConverters._ + +class BatchProjectableTableSourceScan( +cluster: RelOptCluster, +traits: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType, +table: RelOptTable) + extends BatchScan(cluster, traits, table, projectionRowType) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new BatchProjectableTableSourceScan(cluster, + traitSet, + input, + projects, + projectionRowType, + getTable) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: BatchTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { --- End diff -- Please indent like `BatchTableSourceScan.translateToPlan` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671544 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} +import org.apache.flink.streaming.api.datastream.DataStream + +import scala.collection.JavaConverters._ + +class StreamProjectableTableSourceScan( +cluster: RelOptCluster, +traitSet: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType, +table: RelOptTable) + extends StreamScan(cluster, traitSet, table, projectionRowType) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new StreamProjectableTableSourceScan(cluster, + traitSet, + input, + projects, + projectionRowType, + getTable) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: StreamTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { --- End diff -- Please indent like `BatchTableSourceScan.translateToPlan` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697938#comment-15697938 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671736 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala --- @@ -131,6 +101,33 @@ class TableSourceITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testCsvTableSourceWithProjection(): Unit = { + +val csvTable = CommonTestData.getCsvTableSource + +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +tEnv.registerTableSource("csvTable", csvTable) + +val results = tEnv + .scan("csvTable") + .select('last, 'id, 'score) + .sortPartition(0, Order.ASCENDING) --- End diff -- Do we have to `sortPartition` here? It seems that it is needless. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671180 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, StreamProjectableTableSourceScan} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} + +class StreamProjectableTableSourceScanRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { --- End diff -- Please indent this like `BatchTableSourceScan`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697948#comment-15697948 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89246193 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} +import org.apache.flink.streaming.api.datastream.DataStream + +import scala.collection.JavaConverters._ + +class StreamProjectableTableSourceScan( +cluster: RelOptCluster, +traitSet: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType, +table: RelOptTable) + extends StreamScan(cluster, traitSet, table, projectionRowType) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new StreamProjectableTableSourceScan(cluster, + traitSet, + input, + projects, + projectionRowType, + getTable) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: StreamTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { + +val tableSourceTable = getTable.unwrap(classOf[TableSourceTable]) +val projectableSource = tableSourceTable.tableSource.asInstanceOf[ProjectableTableSource[_]] + +val indexes = projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray + +val tableSource = projectableSource.setProjection(indexes).asInstanceOf[StreamTableSource[_]] + +val config = tableEnv.getConfig +val inputDataSet = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] --- End diff -- `inputDataSet` => `inputDataStream` > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697953#comment-15697953 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671128 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala --- @@ -102,11 +107,43 @@ class CsvTableSource( * Do not use it in Table API programs. */ override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = { -streamExecEnv.createInput(createCsvInput(), returnType) +streamExecEnv.createInput(createCsvInput(), projectionReturnTypeWithOrder.getOrElse(returnType)) + } + + /** Returns a [[TableSource]] with ability to project fields */ + override def setProjection(fields: Array[Int]): CsvTableSource = { +val mask = new Array[Boolean](fieldNames.length) +fields.foreach(mask(_) = true) +val orderedReturnType = fields.map(fieldTypes(_)) +val indexDiff = fieldNames.length - fields.length +val order = fields.map(f => f - indexDiff).zipWithIndex.sortBy(_._1).map(_._2) +val source = copy --- End diff -- I would like use the `newFieldNames` and `newFieldTypes` to create a new `CsvTableSource` instead of a same copy, so that we do not need `orderedReturnType`. What do you think ? > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89670435 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, DataSetConvention} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + +class BatchProjectableTableSourceScanRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { + + override def matches(call: RelOptRuleCall): Boolean = { +// check that table scan supports projections +val project: LogicalProject = call.rel(0).asInstanceOf[LogicalProject] +val original: RelNode = project.getInput.asInstanceOf[RelSubset].getOriginal + +original match { + case scan: TableScan => +val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable]) +dataSetTable match { + case tst: TableSourceTable => +tst.tableSource match { + case s: BatchTableSource[_] => +s.isInstanceOf[ProjectableTableSource[_]] + case _ => +false +} + case _ => +false +} + case _ => +false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val project = call.rel(0).asInstanceOf[LogicalProject] +val scan: TableScan = call.rel(1).asInstanceOf[TableScan] + +val convInput = RelOptRule.convert(scan, DataSetConvention.INSTANCE) +val traitSet: RelTraitSet = project.getTraitSet.replace(DataSetConvention.INSTANCE) + +val newRel = new BatchProjectableTableSourceScan( + scan.getCluster, + traitSet, + convInput, + project.getProjects, + project.getRowType, + scan.getTable +) +call.transformTo(newRel) + } +} + +object BatchProjectableTableSourceScanRule { + val INSTANCE = new BatchProjectableTableSourceScanRule( +operand(classOf[LogicalProject], operand(classOf[TableScan], none())), +"BatchTableSourceProjectRule") --- End diff -- Please rename the description the same as the class name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89246193 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} +import org.apache.flink.streaming.api.datastream.DataStream + +import scala.collection.JavaConverters._ + +class StreamProjectableTableSourceScan( +cluster: RelOptCluster, +traitSet: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType, +table: RelOptTable) + extends StreamScan(cluster, traitSet, table, projectionRowType) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new StreamProjectableTableSourceScan(cluster, + traitSet, + input, + projects, + projectionRowType, + getTable) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: StreamTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { + +val tableSourceTable = getTable.unwrap(classOf[TableSourceTable]) +val projectableSource = tableSourceTable.tableSource.asInstanceOf[ProjectableTableSource[_]] + +val indexes = projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray + +val tableSource = projectableSource.setProjection(indexes).asInstanceOf[StreamTableSource[_]] + +val config = tableEnv.getConfig +val inputDataSet = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] --- End diff -- `inputDataSet` => `inputDataStream` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697955#comment-15697955 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671437 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, DataSetConvention} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + +class BatchProjectableTableSourceScanRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { + + override def matches(call: RelOptRuleCall): Boolean = { +// check that table scan supports projections +val project: LogicalProject = call.rel(0).asInstanceOf[LogicalProject] +val original: RelNode = project.getInput.asInstanceOf[RelSubset].getOriginal + +original match { + case scan: TableScan => +val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable]) +dataSetTable match { + case tst: TableSourceTable => +tst.tableSource match { + case s: BatchTableSource[_] => +s.isInstanceOf[ProjectableTableSource[_]] + case _ => +false +} + case _ => +false +} + case _ => +false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val project = call.rel(0).asInstanceOf[LogicalProject] +val scan: TableScan = call.rel(1).asInstanceOf[TableScan] + +val convInput = RelOptRule.convert(scan, DataSetConvention.INSTANCE) +val traitSet: RelTraitSet = project.getTraitSet.replace(DataSetConvention.INSTANCE) + +val newRel = new BatchProjectableTableSourceScan( + scan.getCluster, + traitSet, + convInput, + project.getProjects, + project.getRowType, + scan.getTable +) --- End diff -- I would like indent the `)` to the previous line. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697943#comment-15697943 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671175 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, DataSetConvention} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + +class BatchProjectableTableSourceScanRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { --- End diff -- Please indent this like `BatchTableSourceScan`. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697939#comment-15697939 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671603 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, DataSetConvention} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + +class BatchProjectableTableSourceScanRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { + + override def matches(call: RelOptRuleCall): Boolean = { +// check that table scan supports projections +val project: LogicalProject = call.rel(0).asInstanceOf[LogicalProject] +val original: RelNode = project.getInput.asInstanceOf[RelSubset].getOriginal + +original match { + case scan: TableScan => --- End diff -- The rule has make sure the first RelNode is `LogicalProject` and the second is `TableScan`, so I think we do not need to check whether `original` is a `TableScan` again here. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697945#comment-15697945 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89246096 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} +import org.apache.flink.streaming.api.datastream.DataStream + +import scala.collection.JavaConverters._ + +class StreamProjectableTableSourceScan( +cluster: RelOptCluster, +traitSet: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType, +table: RelOptTable) + extends StreamScan(cluster, traitSet, table, projectionRowType) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new StreamProjectableTableSourceScan(cluster, + traitSet, + input, + projects, + projectionRowType, + getTable) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { --- End diff -- We would like not to override `computeSelfCost` for DataStreamRel. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697947#comment-15697947 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671556 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchProjectableTableSourceScan.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + +import scala.collection.JavaConverters._ + +class BatchProjectableTableSourceScan( +cluster: RelOptCluster, +traits: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType, +table: RelOptTable) + extends BatchScan(cluster, traits, table, projectionRowType) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new BatchProjectableTableSourceScan(cluster, + traitSet, + input, + projects, + projectionRowType, + getTable) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: BatchTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { --- End diff -- Please indent like `BatchTableSourceScan.translateToPlan` > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697952#comment-15697952 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671523 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchProjectableTableSourceScan.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + +import scala.collection.JavaConverters._ + +class BatchProjectableTableSourceScan( +cluster: RelOptCluster, +traits: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType, +table: RelOptTable) + extends BatchScan(cluster, traits, table, projectionRowType) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new BatchProjectableTableSourceScan(cluster, + traitSet, + input, + projects, + projectionRowType, + getTable) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: BatchTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val tableSourceTable = getTable.unwrap(classOf[TableSourceTable]) +val projectableSource = tableSourceTable.tableSource.asInstanceOf[ProjectableTableSource[_]] + +val indexes = projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray --- End diff -- It will be error-prone, not all projects is `RexInputRef` (e.g. we `select(a, floor(b), c * 2, 1)`, only `a` is a `RexLiteral`) a exception will be thrown here. Maybe we can add a check to the rule to make sure all the projects is `RexInputRef`. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671175 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, DataSetConvention} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + +class BatchProjectableTableSourceScanRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { --- End diff -- Please indent this like `BatchTableSourceScan`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671523 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchProjectableTableSourceScan.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + +import scala.collection.JavaConverters._ + +class BatchProjectableTableSourceScan( +cluster: RelOptCluster, +traits: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType, +table: RelOptTable) + extends BatchScan(cluster, traits, table, projectionRowType) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new BatchProjectableTableSourceScan(cluster, + traitSet, + input, + projects, + projectionRowType, + getTable) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: BatchTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + +val tableSourceTable = getTable.unwrap(classOf[TableSourceTable]) +val projectableSource = tableSourceTable.tableSource.asInstanceOf[ProjectableTableSource[_]] + +val indexes = projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray --- End diff -- It will be error-prone, not all projects is `RexInputRef` (e.g. we `select(a, floor(b), c * 2, 1)`, only `a` is a `RexLiteral`) a exception will be thrown here. Maybe we can add a check to the rule to make sure all the projects is `RexInputRef`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671437 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, DataSetConvention} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + +class BatchProjectableTableSourceScanRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { + + override def matches(call: RelOptRuleCall): Boolean = { +// check that table scan supports projections +val project: LogicalProject = call.rel(0).asInstanceOf[LogicalProject] +val original: RelNode = project.getInput.asInstanceOf[RelSubset].getOriginal + +original match { + case scan: TableScan => +val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable]) +dataSetTable match { + case tst: TableSourceTable => +tst.tableSource match { + case s: BatchTableSource[_] => +s.isInstanceOf[ProjectableTableSource[_]] + case _ => +false +} + case _ => +false +} + case _ => +false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val project = call.rel(0).asInstanceOf[LogicalProject] +val scan: TableScan = call.rel(1).asInstanceOf[TableScan] + +val convInput = RelOptRule.convert(scan, DataSetConvention.INSTANCE) +val traitSet: RelTraitSet = project.getTraitSet.replace(DataSetConvention.INSTANCE) + +val newRel = new BatchProjectableTableSourceScan( + scan.getCluster, + traitSet, + convInput, + project.getProjects, + project.getRowType, + scan.getTable +) --- End diff -- I would like indent the `)` to the previous line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697942#comment-15697942 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89670471 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, StreamProjectableTableSourceScan} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} + +class StreamProjectableTableSourceScanRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { + + override def matches(call: RelOptRuleCall): Boolean = { +// check that table scan supports projections +val project = call.rel(0).asInstanceOf[LogicalProject] +val original = project.getInput.asInstanceOf[RelSubset].getOriginal + +original match { + case scan: TableScan => +val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable]) +dataSetTable match { + case tst: TableSourceTable => +tst.tableSource match { + case s: StreamTableSource[_] => +s.isInstanceOf[ProjectableTableSource[_]] + case _ => +false +} + case _ => +false +} + case _ => +false +} + } + + override def onMatch(call: RelOptRuleCall): Unit = { +val project = call.rel(0).asInstanceOf[LogicalProject] +val scan: TableScan = call.rel(1).asInstanceOf[TableScan] + +val convInput = RelOptRule.convert(project.getInput, DataStreamConvention.INSTANCE) +val traitSet: RelTraitSet = project.getTraitSet.replace(DataStreamConvention.INSTANCE) + +val newRel = new StreamProjectableTableSourceScan( + scan.getCluster, + traitSet, + convInput, + project.getProjects, + project.getRowType, + scan.getTable +) +call.transformTo(newRel) + } + +} + +object StreamProjectableTableSourceScanRule { + val INSTANCE: RelOptRule = new StreamProjectableTableSourceScanRule( +operand(classOf[LogicalProject], operand(classOf[TableScan], none())), +"StreamTableSourceProjectRule") --- End diff -- Please rename the description the same as the class name. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671731 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, StreamProjectableTableSourceScan} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} + --- End diff -- Please add a short comment to describe what is this rule used for. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671462 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchProjectableTableSourceScan.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + +import scala.collection.JavaConverters._ + +class BatchProjectableTableSourceScan( +cluster: RelOptCluster, +traits: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType, +table: RelOptTable) + extends BatchScan(cluster, traits, table, projectionRowType) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new BatchProjectableTableSourceScan(cluster, --- End diff -- I would like indent the `cluster,` to the next line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697937#comment-15697937 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671180 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, StreamProjectableTableSourceScan} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} + +class StreamProjectableTableSourceScanRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { --- End diff -- Please indent this like `BatchTableSourceScan`. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697941#comment-15697941 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671462 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchProjectableTableSourceScan.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import java.util + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.BatchTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + +import scala.collection.JavaConverters._ + +class BatchProjectableTableSourceScan( +cluster: RelOptCluster, +traits: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType, +table: RelOptTable) + extends BatchScan(cluster, traits, table, projectionRowType) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new BatchProjectableTableSourceScan(cluster, --- End diff -- I would like indent the `cluster,` to the next line. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697944#comment-15697944 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671537 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} +import org.apache.flink.streaming.api.datastream.DataStream + +import scala.collection.JavaConverters._ + +class StreamProjectableTableSourceScan( +cluster: RelOptCluster, +traitSet: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType, +table: RelOptTable) + extends StreamScan(cluster, traitSet, table, projectionRowType) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new StreamProjectableTableSourceScan(cluster, + traitSet, + input, + projects, + projectionRowType, + getTable) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: StreamTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { + +val tableSourceTable = getTable.unwrap(classOf[TableSourceTable]) +val projectableSource = tableSourceTable.tableSource.asInstanceOf[ProjectableTableSource[_]] + +val indexes = projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray --- End diff -- The same problem will happen here. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697946#comment-15697946 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671544 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} +import org.apache.flink.streaming.api.datastream.DataStream + +import scala.collection.JavaConverters._ + +class StreamProjectableTableSourceScan( +cluster: RelOptCluster, +traitSet: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType, +table: RelOptTable) + extends StreamScan(cluster, traitSet, table, projectionRowType) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new StreamProjectableTableSourceScan(cluster, + traitSet, + input, + projects, + projectionRowType, + getTable) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: StreamTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { --- End diff -- Please indent like `BatchTableSourceScan.translateToPlan` > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697954#comment-15697954 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89670370 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, DataSetConvention} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + --- End diff -- Add a short comment to describe what is this rule used for. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89246478 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.sources + +// TODO FLINK-3848 --- End diff -- Could we remove this comment ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671537 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamProjectableTableSourceScan.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.{RexInputRef, RexNode} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.StreamTableEnvironment +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} +import org.apache.flink.streaming.api.datastream.DataStream + +import scala.collection.JavaConverters._ + +class StreamProjectableTableSourceScan( +cluster: RelOptCluster, +traitSet: RelTraitSet, +input: RelNode, +projects: util.List[_ <: RexNode], +projectionRowType: RelDataType, +table: RelOptTable) + extends StreamScan(cluster, traitSet, table, projectionRowType) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new StreamProjectableTableSourceScan(cluster, + traitSet, + input, + projects, + projectionRowType, + getTable) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +super.computeSelfCost(planner, mq).multiplyBy(0.8) + } + + override def translateToPlan( +tableEnv: StreamTableEnvironment, +expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { + +val tableSourceTable = getTable.unwrap(classOf[TableSourceTable]) +val projectableSource = tableSourceTable.tableSource.asInstanceOf[ProjectableTableSource[_]] + +val indexes = projects.asScala.map(_.asInstanceOf[RexInputRef].getIndex).toArray --- End diff -- The same problem will happen here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89670370 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, DataSetConvention} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + --- End diff -- Add a short comment to describe what is this rule used for. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671607 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, StreamProjectableTableSourceScan} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} + +class StreamProjectableTableSourceScanRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { + + override def matches(call: RelOptRuleCall): Boolean = { +// check that table scan supports projections +val project = call.rel(0).asInstanceOf[LogicalProject] +val original = project.getInput.asInstanceOf[RelSubset].getOriginal + +original match { + case scan: TableScan => --- End diff -- The rule has make sure the first RelNode is `LogicalProject` and the second is `TableScan`, so I think we do not need to check whether `original` is a `TableScan` again here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697951#comment-15697951 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671731 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, StreamProjectableTableSourceScan} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} + --- End diff -- Please add a short comment to describe what is this rule used for. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2810: [FLINK-3848] Add ProjectableTableSource interface ...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671603 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/BatchProjectableTableSourceScanRule.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.dataSet + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.dataset.{BatchProjectableTableSourceScan, DataSetConvention} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource} + +class BatchProjectableTableSourceScanRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { + + override def matches(call: RelOptRuleCall): Boolean = { +// check that table scan supports projections +val project: LogicalProject = call.rel(0).asInstanceOf[LogicalProject] +val original: RelNode = project.getInput.asInstanceOf[RelSubset].getOriginal + +original match { + case scan: TableScan => --- End diff -- The rule has make sure the first RelNode is `LogicalProject` and the second is `TableScan`, so I think we do not need to check whether `original` is a `TableScan` again here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697949#comment-15697949 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89246478 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.sources + +// TODO FLINK-3848 --- End diff -- Could we remove this comment ? > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3848) Add ProjectableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15697940#comment-15697940 ] ASF GitHub Bot commented on FLINK-3848: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2810#discussion_r89671607 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamProjectableTableSourceScanRule.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules.datastream + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, StreamProjectableTableSourceScan} +import org.apache.flink.api.table.plan.schema.TableSourceTable +import org.apache.flink.api.table.sources.{ProjectableTableSource, StreamTableSource} + +class StreamProjectableTableSourceScanRule( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { + + override def matches(call: RelOptRuleCall): Boolean = { +// check that table scan supports projections +val project = call.rel(0).asInstanceOf[LogicalProject] +val original = project.getInput.asInstanceOf[RelSubset].getOriginal + +original match { + case scan: TableScan => --- End diff -- The rule has make sure the first RelNode is `LogicalProject` and the second is `TableScan`, so I think we do not need to check whether `original` is a `TableScan` again here. > Add ProjectableTableSource interface and translation rule > - > > Key: FLINK-3848 > URL: https://issues.apache.org/jira/browse/FLINK-3848 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Anton Solovev > > Add a {{ProjectableTableSource}} interface for {{TableSource}} implementation > that support projection push-down. > The interface could look as follows > {code} > def trait ProjectableTableSource { > def setProjection(fields: Array[String]): Unit > } > {code} > In addition we need Calcite rules to push a projection into a TableScan that > refers to a {{ProjectableTableSource}}. We might need to tweak the cost model > as well to push the optimizer in the right direction. > Moreover, the {{CsvTableSource}} could be extended to implement > {{ProjectableTableSource}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4692) Add tumbling group-windows for batch tables
[ https://issues.apache.org/jira/browse/FLINK-4692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15698090#comment-15698090 ] Jark Wu commented on FLINK-4692: Hi [~fhueske] [~twalthr], I have proposed a [design doc|https://docs.google.com/document/d/1lzpnNUmNzn9yuCGf1RSjHuHAWm-O_v2in7y90muXI2o/edit#] for this issue and made a prototype. Could you have a look at the design ? Any feedbacks are welcome! > Add tumbling group-windows for batch tables > --- > > Key: FLINK-4692 > URL: https://issues.apache.org/jira/browse/FLINK-4692 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Jark Wu > > Add Tumble group-windows for batch tables as described in > [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations]. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5166) TextInputFormatTest.testNestedFileRead
[ https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15698373#comment-15698373 ] ASF GitHub Bot commented on FLINK-5166: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2876 The test should be changed to use more specific folder names including a random component. If the directory we are trying to create already exists we should most definitely not just delete everything there, but instead try a different directory. > TextInputFormatTest.testNestedFileRead > -- > > Key: FLINK-5166 > URL: https://issues.apache.org/jira/browse/FLINK-5166 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats, Tests >Reporter: shijinkui > > `mvn clean package -P \!scala-2.11,scala-2.11 -U` > Failed tests: > TextInputFormatTest.testNestedFileRead:140 Test erroneous > Tests run: 846, Failures: 1, Errors: 0, Skipped: 0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2876: [FLINK-5166] TextInputFormatTest.testNestedFileRead
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2876 The test should be changed to use more specific folder names including a random component. If the directory we are trying to create already exists we should most definitely not just delete everything there, but instead try a different directory. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89680317 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -444,6 +445,134 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** +* This test ensures that when explicitly set to start from earliest record, the consumer +* ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. +*/ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** +* This test ensures that when explicitly set to start from latest record, the consumer +* ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. +*/ + public void runStartFromLatestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + consumeThread.start(); + + Thread.sleep(5000); --- End diff -- Actually, the sleep here isn't waiting for the readSequence call to finish. I'm waiting a bit to make sure that the consume job has fully started. It won't be able to read anything until new latest data is generated afterwards, which is done below by `DataGenerators.generateRandomizedIntegerSequence`. So, what the test is doing is: 1. Write 50 records to each partition. 2. Commit some random offsets. 3. Start a job to read from latest in a separate thread. (should not read any of the previous data, offsets also ignored). The `readSequence` is expected to read 30 more records from each partition 4. Make sure the job
[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker
[ https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15698724#comment-15698724 ] ASF GitHub Bot commented on FLINK-4280: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2509#discussion_r89680317 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -444,6 +445,134 @@ public void run() { kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + + /** +* This test ensures that when explicitly set to start from earliest record, the consumer +* ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. +*/ + public void runStartFromEarliestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "latest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** +* This test ensures that when explicitly set to start from latest record, the consumer +* ignores the "auto.offset.reset" behaviour as well as any committed group offsets in Kafka. +*/ + public void runStartFromLatestOffsets() throws Exception { + // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + final int parallelism = 3; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + final Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // this should be ignored + + // the committed offsets should be ignored + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Thread consumeThread = new Thread(new Runnable() { + @Override + public void run() { + try { + readSequence(env, StartupMode.LATEST, readProps, parallelism, topicName, 30, 50); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + consumeThread.start(); + + Thread.sleep(5000); --- End diff -- Actually, the sleep here isn't waiting for the readSequence call to finish. I'm waiting a bit to make sure that the consume job has fully started. It won't be able to read anything until new latest data is generated afterwards, which is done below by `DataGenerators.generateRandomizedIntegerSequence`. So, what the test is doing is: 1. Write 50 records to each partition. 2.
[jira] [Updated] (FLINK-1707) Add an Affinity Propagation Library Method
[ https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Rubió updated FLINK-1707: --- Description: This issue proposes adding the an implementation of the Affinity Propagation algorithm as a Gelly library method and a corresponding example. The algorithm is described in paper [1] and a description of a vertex-centric implementation can be found is [2]. [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf Design doc: https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing Example spreadsheet: https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing Graph: https://docs.google.com/drawings/d/1PC3S-6AEt2Gp_TGrSfiWzkTcL7vXhHSxvM6b9HglmtA/edit?usp=sharing was: This issue proposes adding the an implementation of the Affinity Propagation algorithm as a Gelly library method and a corresponding example. The algorithm is described in paper [1] and a description of a vertex-centric implementation can be found is [2]. [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf Design doc: https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing Example spreadsheet: https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing > Add an Affinity Propagation Library Method > -- > > Key: FLINK-1707 > URL: https://issues.apache.org/jira/browse/FLINK-1707 > Project: Flink > Issue Type: New Feature > Components: Gelly >Reporter: Vasia Kalavri >Assignee: Josep Rubió >Priority: Minor > Labels: requires-design-doc > Attachments: Binary_Affinity_Propagation_in_Flink_design_doc.pdf > > > This issue proposes adding the an implementation of the Affinity Propagation > algorithm as a Gelly library method and a corresponding example. > The algorithm is described in paper [1] and a description of a vertex-centric > implementation can be found is [2]. > [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf > [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf > Design doc: > https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing > Example spreadsheet: > https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing > Graph: > https://docs.google.com/drawings/d/1PC3S-6AEt2Gp_TGrSfiWzkTcL7vXhHSxvM6b9HglmtA/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332)